lichao
2021-06-02 993c556000a414011626770540678948f16eaa9e
box/node_center.h
@@ -48,17 +48,16 @@
class MsgRecords
{
   typedef int64_t MsgId;
   typedef int64_t Offset;
public:
   void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
   void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
   void FreeMsg(MsgId id);
   void AutoRemove();
   size_t size() const { return msgs_.size(); }
   void DebugPrint() const;
private:
   std::unordered_map<MsgId, Offset> msgs_;
   std::unordered_map<MsgId, MsgI> msgs_;
   int64_t time_to_clean_ = 0;
};
@@ -122,6 +121,8 @@
   void RecordMsg(const MsgI &msg);
   bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
   bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
   bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
   bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
   void OnAlloc(ShmSocket &socket, const int64_t val);
   void OnFree(ShmSocket &socket, const int64_t val);
   bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -158,6 +159,14 @@
   {
      return HandleMsg<MsgCommonReply, Func>(head, op);
   }
   template <class Reply>
   bool CheckMsg(const BHMsgHead &head, Reply &reply)
   {
      bool r = false;
      auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
      reply = HandleMsg<Reply>(head, onOk);
      return r;
   }
   MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
   MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
@@ -183,6 +192,8 @@
      return node && Valid(*node);
   }
   void RemoveNode(Node &node);
   Node GetNode(const MQId mq);
   std::string id_; // center proc id;
   std::unordered_map<Topic, Clients> service_map_;