lichao
2021-05-13 db322f33ba13592f2492317e3f1a070454c97059
src/socket.h
@@ -42,7 +42,7 @@
public:
   typedef ShmMsgQueue::MQId MQId;
   typedef bhome_shm::SharedMemory Shm;
   typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
   typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
   typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
   typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
   typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -54,39 +54,74 @@
   static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
   bool Remove() { return Remove(shm(), id()); }
   MQId id() const { return mq().Id(); }
   void SetNodeProc(const int proc_index, const int socket_index)
   {
      node_proc_index_ = proc_index;
      socket_index_ = socket_index;
      LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
   }
   // start recv.
   bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
   bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
   bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
   bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
   bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
   bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
   bool Stop();
   template <class Body>
   bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
   bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
   {
      try {
         if (!cb) {
            return SendImpl(remote, MsgI::Serialize(head, body));
         } else {
            std::string msg_id(head.msg_id());
            per_msg_cbs_->Store(msg_id, std::move(cb));
            auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
               RecvCB cb_no_use;
               per_msg_cbs_->Pick(msg_id, cb_no_use);
            };
            return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
         }
         //TODO alloc outsiez and use send.
         MsgI msg;
         if (!msg.Make(head, body)) { return false; }
         DEFER1(msg.Release());
         return Send(remote, msg);
      } catch (...) {
         SetLastError(eError, "Send internal error.");
         return false;
      }
   }
   bool Send(const MQId remote, const MsgI &imsg)
   {
      return SendImpl(remote, imsg);
   }
   bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
   template <class Body>
   bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
   {
      std::string msg_id(head.msg_id());
      std::string content(MsgI::Serialize(head, body));
      size_t size = content.size();
      auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
         if (!msg.Fill(content)) { return; }
         try {
            if (!cb) {
               Send(remote, msg);
            } else {
               per_msg_cbs_->Store(msg_id, std::move(cb));
               auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                  RecvCB cb_no_use;
                  per_msg_cbs_->Pick(msg_id, cb_no_use);
               };
               Send(remote, msg, onExpireRemoveCB);
            }
         } catch (...) {
            SetLastError(eError, "Send internal error.");
         }
      };
      return RequestAlloc(size, OnResult);
   }
   template <class... T>
   bool Send(const MQId remote, const MsgI &imsg, T &&...t)
   {
      return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
   }
   template <class... T>
   bool Send(const MQId remote, const int64_t cmd, T &&...t)
   {
      return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
   }
   bool SyncRecv(int64_t &cmd, const int timeout_ms);
   bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
   template <class Body>
@@ -153,15 +188,15 @@
   std::atomic<bool> run_;
   Queue mq_;
   template <class Key>
   template <class Key, class CB>
   class CallbackRecords
   {
      std::unordered_map<Key, RecvCB> store_;
      std::unordered_map<Key, CB> store_;
   public:
      bool empty() const { return store_.empty(); }
      bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
      bool Pick(const Key &id, RecvCB &cb)
      bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
      bool Pick(const Key &id, CB &cb)
      {
         auto pos = store_.find(id);
         if (pos != store_.end()) {
@@ -174,9 +209,14 @@
      }
   };
   Synced<CallbackRecords<std::string>> per_msg_cbs_;
   Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
   Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
   SendQ send_buffer_;
   // node request center alloc memory.
   int node_proc_index_ = -1;
   int socket_index_ = -1;
   std::atomic<int> alloc_id_;
};
#endif // end of include guard: SOCKET_GWTJHBPO