lichao
2021-05-17 cab831748a2a9cc18b7f18f3b5e14a4374b7ab68
src/socket.h
@@ -59,7 +59,6 @@
   {
      node_proc_index_ = proc_index;
      socket_index_ = socket_index;
      LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
   }
   // start recv.
   bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
@@ -68,7 +67,7 @@
   bool Stop();
   template <class Body>
   bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
   bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
   {
      try {
         //TODO alloc outsiez and use send.
@@ -86,39 +85,17 @@
   bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
   template <class Body>
   bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
   bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
   {
      std::string msg_id(head.msg_id());
      std::string content(MsgI::Serialize(head, body));
      size_t size = content.size();
      auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
         if (!msg.Fill(content)) { return; }
         try {
            if (!cb) {
               Send(remote, msg);
            } else {
               per_msg_cbs_->Store(msg_id, std::move(cb));
               auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                  RecvCB cb_no_use;
                  per_msg_cbs_->Pick(msg_id, cb_no_use);
               };
               Send(remote, msg, onExpireRemoveCB);
            }
         } catch (...) {
            SetLastError(eError, "Send internal error.");
         }
      };
      return RequestAlloc(size, OnResult);
      return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
   }
   template <class... T>
   bool Send(const MQId remote, const MsgI &imsg, T &&...t)
   bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
   {
      return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
   }
   template <class... T>
   bool Send(const MQId remote, const int64_t cmd, T &&...t)
   bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
   {
      return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
   }
@@ -126,7 +103,7 @@
   bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
   template <class Body>
   bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
   bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
   {
      struct State {
         std::mutex mutex;
@@ -136,6 +113,7 @@
      try {
         std::shared_ptr<State> st(new State);
         auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
         auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
@@ -176,12 +154,12 @@
   bool StopNoLock();
   bool RunningNoLock() { return !workers_.empty(); }
   bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
   template <class... Rest>
   bool SendImpl(const MQId remote, Rest &&...rest)
   bool SendImpl(const MQInfo &remote, Rest &&...rest)
   {
      // TODO send alloc request, and pack later, higher bit means alloc?
      send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
      return true;
      return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
   }
   std::vector<std::thread> workers_;