From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/shm_socket.h | 51 ++++++++++++++++++--------------------------------- 1 files changed, 18 insertions(+), 33 deletions(-) diff --git a/src/shm_socket.h b/src/shm_socket.h index 02500b2..9bb65fc 100644 --- a/src/shm_socket.h +++ b/src/shm_socket.h @@ -47,9 +47,11 @@ typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; typedef std::function<void(ShmSocket &sock)> IdleCB; - ShmSocket(Shm &shm, const MQId id, const int len); - ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len); - ShmSocket(int64_t offset, Shm &shm, const MQId id); + ShmSocket(Shm &shm, const MQId id, Mode mode) : + run_(false), mq_(shm, id, mode), alloc_id_(0), send_buffer_(shm) { Start(); } + ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : + run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); } + ~ShmSocket(); static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } bool Remove() { return Remove(shm(), id()); } @@ -66,34 +68,18 @@ bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } bool Stop(); - template <class Body> - bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) - { - try { - //TODO alloc outsiez and use send. - MsgI msg; - if (!msg.Make(head, body)) { return false; } - DEFER1(msg.Release()); - - return Send(remote, msg); - } catch (...) { - SetLastError(eError, "Send internal error."); - return false; - } - } - bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); + bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb); template <class Body> - bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) - { - return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); - } - template <class... T> - bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) - { - return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); - } + bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); } + bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb); + + template <class Body> + bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); } + bool Send(const MQInfo &remote, std::string &&content); + bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); } + template <class... T> bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) { @@ -152,12 +138,10 @@ bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } - bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); - template <class... Rest> - bool SendImpl(const MQInfo &remote, Rest &&...rest) + bool SendImpl(Rest &&...rest) { - return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); + return send_buffer_.Append(std::forward<decltype(rest)>(rest)...); } std::vector<std::thread> workers_; @@ -188,12 +172,13 @@ Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; - SendQ send_buffer_; // node request center alloc memory. int node_proc_index_ = -1; int socket_index_ = -1; std::atomic<int> alloc_id_; + + SendQ send_buffer_; }; #endif // end of include guard: SHM_SOCKET_GWTJHBPO -- Gitblit v1.8.0