From db322f33ba13592f2492317e3f1a070454c97059 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 13 五月 2021 19:34:46 +0800 Subject: [PATCH] center alloc all msgs. --- src/socket.h | 90 ++++++++++++++++++++++++++++++++------------ 1 files changed, 65 insertions(+), 25 deletions(-) diff --git a/src/socket.h b/src/socket.h index 981677f..d69b8d4 100644 --- a/src/socket.h +++ b/src/socket.h @@ -42,7 +42,7 @@ public: typedef ShmMsgQueue::MQId MQId; typedef bhome_shm::SharedMemory Shm; - typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB; + typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB; typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; typedef std::function<void(ShmSocket &sock)> IdleCB; @@ -54,39 +54,74 @@ static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } bool Remove() { return Remove(shm(), id()); } MQId id() const { return mq().Id(); } + void SetNodeProc(const int proc_index, const int socket_index) + { + node_proc_index_ = proc_index; + socket_index_ = socket_index; + LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; + } // start recv. - bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1); - bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); - bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } + bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); + bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); } bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } bool Stop(); template <class Body> - bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) { try { - if (!cb) { - return SendImpl(remote, MsgI::Serialize(head, body)); - } else { - std::string msg_id(head.msg_id()); - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB); - } + //TODO alloc outsiez and use send. + MsgI msg; + if (!msg.Make(head, body)) { return false; } + DEFER1(msg.Release()); + + return Send(remote, msg); } catch (...) { SetLastError(eError, "Send internal error."); return false; } } - bool Send(const MQId remote, const MsgI &imsg) - { - return SendImpl(remote, imsg); - } + bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); + template <class Body> + bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + { + std::string msg_id(head.msg_id()); + std::string content(MsgI::Serialize(head, body)); + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + if (!msg.Fill(content)) { return; } + + try { + if (!cb) { + Send(remote, msg); + } else { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + Send(remote, msg, onExpireRemoveCB); + } + } catch (...) { + SetLastError(eError, "Send internal error."); + } + }; + + return RequestAlloc(size, OnResult); + } + template <class... T> + bool Send(const MQId remote, const MsgI &imsg, T &&...t) + { + return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); + } + template <class... T> + bool Send(const MQId remote, const int64_t cmd, T &&...t) + { + return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); + } + bool SyncRecv(int64_t &cmd, const int timeout_ms); bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> @@ -153,15 +188,15 @@ std::atomic<bool> run_; Queue mq_; - template <class Key> + template <class Key, class CB> class CallbackRecords { - std::unordered_map<Key, RecvCB> store_; + std::unordered_map<Key, CB> store_; public: bool empty() const { return store_.empty(); } - bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } - bool Pick(const Key &id, RecvCB &cb) + bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; } + bool Pick(const Key &id, CB &cb) { auto pos = store_.find(id); if (pos != store_.end()) { @@ -174,9 +209,14 @@ } }; - Synced<CallbackRecords<std::string>> per_msg_cbs_; + Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; + Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; SendQ send_buffer_; + // node request center alloc memory. + int node_proc_index_ = -1; + int socket_index_ = -1; + std::atomic<int> alloc_id_; }; #endif // end of include guard: SOCKET_GWTJHBPO -- Gitblit v1.8.0