| | |
| | | { |
| | | node_proc_index_ = proc_index; |
| | | socket_index_ = socket_index; |
| | | LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; |
| | | } |
| | | // start recv. |
| | | bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); |
| | |
| | | bool Stop(); |
| | | |
| | | template <class Body> |
| | | bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) |
| | | bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) |
| | | { |
| | | try { |
| | | //TODO alloc outsiez and use send. |
| | |
| | | bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); |
| | | |
| | | template <class Body> |
| | | bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | { |
| | | std::string msg_id(head.msg_id()); |
| | | std::string content(MsgI::Serialize(head, body)); |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | | Send(remote, msg); |
| | | } else { |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | } |
| | | }; |
| | | |
| | | return RequestAlloc(size, OnResult); |
| | | return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQId remote, const MsgI &imsg, T &&...t) |
| | | bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) |
| | | { |
| | | return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQId remote, const int64_t cmd, T &&...t) |
| | | bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) |
| | | { |
| | | return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); |
| | | } |
| | |
| | | bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | { |
| | | struct State { |
| | | std::mutex mutex; |
| | |
| | | |
| | | try { |
| | | std::shared_ptr<State> st(new State); |
| | | |
| | | auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); |
| | | |
| | | auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | |
| | | bool StopNoLock(); |
| | | bool RunningNoLock() { return !workers_.empty(); } |
| | | |
| | | bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); |
| | | |
| | | template <class... Rest> |
| | | bool SendImpl(const MQId remote, Rest &&...rest) |
| | | bool SendImpl(const MQInfo &remote, Rest &&...rest) |
| | | { |
| | | // TODO send alloc request, and pack later, higher bit means alloc? |
| | | send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | } |
| | | |
| | | std::vector<std::thread> workers_; |