From c6964d5af25d4ec7ed9dbe7674dc4e3896b36ead Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 16 四月 2021 16:10:02 +0800 Subject: [PATCH] node remove mq if never registered; refactor. --- src/socket.h | 39 +++++++++++++++++++++------------------ 1 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/socket.h b/src/socket.h index 9b47e42..dbb161c 100644 --- a/src/socket.h +++ b/src/socket.h @@ -36,9 +36,11 @@ class ShmSocket : private boost::noncopyable { - bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) + bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) { - send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); + // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) { + send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); + // } return true; } @@ -55,6 +57,7 @@ ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); } + bool Remove() { return Remove(shm(), id()); } const MQId &id() const { return mq().Id(); } // start recv. bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); @@ -64,25 +67,24 @@ size_t Pending() const { return mq().Pending(); } template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB()) { - MsgI msg; - return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg); - } - - template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb) - { - //TODO send_buffer_ need flag, and remove callback on expire. MsgI msg; if (msg.Make(shm(), head, body)) { + DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); std::string msg_id(head.msg_id()); - per_msg_cbs_->Add(msg_id, cb); - auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Find(msg_id, cb_no_use); - }; - return SendImpl(valid_remote, msg, onExpireRemoveCB); + if (!cb) { + return SendImpl(valid_remote, msg); + } else { + per_msg_cbs_->Add(msg_id, cb); + auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Find(msg_id, cb_no_use); + }; + return SendImpl(valid_remote, msg, onExpireRemoveCB); + } + } else { + SetLastError(ENOMEM, "Out of mem"); } return false; } @@ -170,7 +172,8 @@ }; Synced<AsyncCBs> per_msg_cbs_; - Synced<SendQ> send_buffer_; + SendQ send_buffer_; + // Synced<SendQ> send_buffer_; }; #endif // end of include guard: SOCKET_GWTJHBPO -- Gitblit v1.8.0