From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- src/socket.h | 63 ++++++++++++++++++++++--------- 1 files changed, 45 insertions(+), 18 deletions(-) diff --git a/src/socket.h b/src/socket.h index 7c4f83f..f73bee5 100644 --- a/src/socket.h +++ b/src/socket.h @@ -19,6 +19,7 @@ #ifndef SOCKET_GWTJHBPO #define SOCKET_GWTJHBPO +#include "bh_util.h" #include "defs.h" #include "shm_queue.h" #include <atomic> @@ -34,6 +35,15 @@ class ShmSocket : private boost::noncopyable { + template <class DoSend> + inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) + { + bool r = false; + DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); + r = doSend(msg); + return r; + } + protected: typedef bhome_shm::ShmMsgQueue Queue; @@ -55,30 +65,28 @@ bool Stop(); size_t Pending() const { return mq().Pending(); } - bool Send(const void *id, const MsgI &imsg, const int timeout_ms) + bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) { - return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); + assert(valid_remote); + return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); } //TODO reimplment, using async. bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) { - assert(valid_remote); - try { - if (cb) { - auto RegisterCB = [&]() { - std::lock_guard<std::mutex> lock(mutex()); - async_cbs_.emplace(head.msg_id(), cb); - }; - return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); - } else { - return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); - } - } catch (...) { - return false; - } + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); }; + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); + } + + template <class Body> + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) + { + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); } template <class Body> @@ -133,7 +141,26 @@ std::atomic<bool> run_; Queue mq_; - std::unordered_map<std::string, RecvCB> async_cbs_; + class AsyncCBs + { + std::unordered_map<std::string, RecvCB> store_; + + public: + bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } + bool Find(const std::string &id, RecvCB &cb) + { + auto pos = store_.find(id); + if (pos != store_.end()) { + cb.swap(pos->second); + store_.erase(pos); + return true; + } else { + return false; + } + } + }; + + Synced<AsyncCBs> async_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO -- Gitblit v1.8.0