lichao
2021-05-19 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3
src/socket.h
@@ -49,16 +49,16 @@
   ShmSocket(Shm &shm, const MQId id, const int len);
   ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
   ShmSocket(Shm &shm, const int len = 12);
   ShmSocket(int64_t offset, Shm &shm, const MQId id);
   ~ShmSocket();
   static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
   bool Remove() { return Remove(shm(), id()); }
   MQId id() const { return mq().Id(); }
   int64_t AbsAddr() const { return mq().AbsAddr(); }
   void SetNodeProc(const int proc_index, const int socket_index)
   {
      node_proc_index_ = proc_index;
      socket_index_ = socket_index;
      LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
   }
   // start recv.
   bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
@@ -67,7 +67,7 @@
   bool Stop();
   template <class Body>
   bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
   bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
   {
      try {
         //TODO alloc outsiez and use send.
@@ -85,47 +85,23 @@
   bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
   template <class Body>
   bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
   bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
   {
      std::string msg_id(head.msg_id());
      std::string content(MsgI::Serialize(head, body));
      size_t size = content.size();
      auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
         if (!msg.Fill(content)) { return; }
         try {
            if (!cb) {
               Send(remote, msg);
            } else {
               per_msg_cbs_->Store(msg_id, std::move(cb));
               auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                  RecvCB cb_no_use;
                  per_msg_cbs_->Pick(msg_id, cb_no_use);
               };
               Send(remote, msg, onExpireRemoveCB);
            }
         } catch (...) {
            SetLastError(eError, "Send internal error.");
         }
      };
      return RequestAlloc(size, OnResult);
      return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
   }
   template <class... T>
   bool Send(const MQId remote, const MsgI &imsg, T &&...t)
   bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
   {
      return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
   }
   template <class... T>
   bool Send(const MQId remote, const int64_t cmd, T &&...t)
   bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
   {
      return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
   }
   bool SyncRecv(int64_t &cmd, const int timeout_ms);
   bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
   template <class Body>
   bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
   bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
   {
      struct State {
         std::mutex mutex;
@@ -135,6 +111,7 @@
      try {
         std::shared_ptr<State> st(new State);
         auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
         auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
@@ -175,12 +152,12 @@
   bool StopNoLock();
   bool RunningNoLock() { return !workers_.empty(); }
   bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
   template <class... Rest>
   bool SendImpl(const MQId remote, Rest &&...rest)
   bool SendImpl(const MQInfo &remote, Rest &&...rest)
   {
      // TODO send alloc request, and pack later, higher bit means alloc?
      send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
      return true;
      return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
   }
   std::vector<std::thread> workers_;
@@ -211,8 +188,8 @@
   Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
   Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
   SendQ send_buffer_;
   // node request center alloc memory.
   int node_proc_index_ = -1;
   int socket_index_ = -1;