From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- src/socket.h | 51 ++++++++++++++------------------------------------- 1 files changed, 14 insertions(+), 37 deletions(-) diff --git a/src/socket.h b/src/socket.h index d69b8d4..7557034 100644 --- a/src/socket.h +++ b/src/socket.h @@ -49,16 +49,16 @@ ShmSocket(Shm &shm, const MQId id, const int len); ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len); - ShmSocket(Shm &shm, const int len = 12); + ShmSocket(int64_t offset, Shm &shm, const MQId id); ~ShmSocket(); static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } bool Remove() { return Remove(shm(), id()); } MQId id() const { return mq().Id(); } + int64_t AbsAddr() const { return mq().AbsAddr(); } void SetNodeProc(const int proc_index, const int socket_index) { node_proc_index_ = proc_index; socket_index_ = socket_index; - LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; } // start recv. bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); @@ -67,7 +67,7 @@ bool Stop(); template <class Body> - bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) + bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) { try { //TODO alloc outsiez and use send. @@ -85,47 +85,23 @@ bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); template <class Body> - bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) { - std::string msg_id(head.msg_id()); - std::string content(MsgI::Serialize(head, body)); - size_t size = content.size(); - auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { - if (!msg.Fill(content)) { return; } - - try { - if (!cb) { - Send(remote, msg); - } else { - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - Send(remote, msg, onExpireRemoveCB); - } - } catch (...) { - SetLastError(eError, "Send internal error."); - } - }; - - return RequestAlloc(size, OnResult); + return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); } template <class... T> - bool Send(const MQId remote, const MsgI &imsg, T &&...t) + bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) { return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); } template <class... T> - bool Send(const MQId remote, const int64_t cmd, T &&...t) + bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) { return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); } - bool SyncRecv(int64_t &cmd, const int timeout_ms); - bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) + bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) { struct State { std::mutex mutex; @@ -135,6 +111,7 @@ try { std::shared_ptr<State> st(new State); + auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { @@ -175,12 +152,12 @@ bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } + bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); + template <class... Rest> - bool SendImpl(const MQId remote, Rest &&...rest) + bool SendImpl(const MQInfo &remote, Rest &&...rest) { - // TODO send alloc request, and pack later, higher bit means alloc? - send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); - return true; + return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); } std::vector<std::thread> workers_; @@ -211,8 +188,8 @@ Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; - SendQ send_buffer_; + // node request center alloc memory. int node_proc_index_ = -1; int socket_index_ = -1; -- Gitblit v1.8.0