From 3e191ac65bd65f678e9a344163f74d181726f6bd Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 13 四月 2021 08:48:40 +0800 Subject: [PATCH] refactor, add TODO. --- src/socket.h | 69 +++++++++++++++++++++++++--------- 1 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/socket.h b/src/socket.h index 7c4f83f..5973ab6 100644 --- a/src/socket.h +++ b/src/socket.h @@ -19,6 +19,7 @@ #ifndef SOCKET_GWTJHBPO #define SOCKET_GWTJHBPO +#include "bh_util.h" #include "defs.h" #include "shm_queue.h" #include <atomic> @@ -34,6 +35,15 @@ class ShmSocket : private boost::noncopyable { + template <class DoSend> + inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) + { + bool r = false; + DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); + r = doSend(msg); + return r; + } + protected: typedef bhome_shm::ShmMsgQueue Queue; @@ -46,6 +56,7 @@ ShmSocket(Shm &shm, const MQId &id, const int len); ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); + static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); } const MQId &id() const { return mq().Id(); } Shm &shm() { return shm_; } // start recv. @@ -55,30 +66,28 @@ bool Stop(); size_t Pending() const { return mq().Pending(); } - bool Send(const void *id, const MsgI &imsg, const int timeout_ms) + bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) { - return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); + assert(valid_remote); + return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); } //TODO reimplment, using async. bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) { - assert(valid_remote); - try { - if (cb) { - auto RegisterCB = [&]() { - std::lock_guard<std::mutex> lock(mutex()); - async_cbs_.emplace(head.msg_id(), cb); - }; - return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); - } else { - return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); - } - } catch (...) { - return false; - } + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); }; + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); + } + + template <class Body> + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) + { + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); } template <class Body> @@ -100,12 +109,15 @@ reply.swap(msg); reply_head.Swap(&head); st->cv.notify_one(); - } else { + } else { // ignore } }; std::unique_lock<std::mutex> lk(st->mutex); bool sendok = Send(remote, head, body, timeout_ms, OnRecv); + if (!sendok) { + printf("send timeout\n"); + } if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { return true; } else { @@ -133,7 +145,26 @@ std::atomic<bool> run_; Queue mq_; - std::unordered_map<std::string, RecvCB> async_cbs_; + class AsyncCBs + { + std::unordered_map<std::string, RecvCB> store_; + + public: + bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } + bool Find(const std::string &id, RecvCB &cb) + { + auto pos = store_.find(id); + if (pos != store_.end()) { + cb.swap(pos->second); + store_.erase(pos); + return true; + } else { + return false; + } + } + }; + + Synced<AsyncCBs> per_msg_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO -- Gitblit v1.8.0