lichao
2021-06-23 c1e39e20ca42b21eeac8b5068fa1f921bf9a070f
box/node_center.h
@@ -82,6 +82,10 @@
   };
   typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
   struct NodeInfo;
   typedef std::shared_ptr<NodeInfo> Node;
   typedef std::weak_ptr<NodeInfo> WeakNode;
   struct NodeInfo {
      NodeCenter &center_;
      SharedMemory &shm_;
@@ -89,14 +93,15 @@
      std::map<MQId, int64_t> addrs_; // registered mqs
      ProcInfo proc_;                 //
      AddressTopics services_;        // address: topics
      AddressTopics subscriptions_;   // address: topics
      AddressTopics local_sub_;       // address: topics
      AddressTopics net_sub_;         // address: topics
      NodeInfo(NodeCenter &center, SharedMemory &shm) :
          center_(center), shm_(shm) {}
      void PutOffline(const int64_t offline_time);
      void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
      void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
      void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
   };
   typedef std::shared_ptr<NodeInfo> Node;
   typedef std::weak_ptr<NodeInfo> WeakNode;
   struct TopicDest {
      MQId mq_id_;
@@ -121,7 +126,9 @@
   void RecordMsg(const MsgI &msg);
   bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
   bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
   bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
   bool RemotePublish(BHMsgHead &head, const std::string &body_content);
   bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
   void OnAlloc(ShmSocket &socket, const int64_t val);
   void OnFree(ShmSocket &socket, const int64_t val);
@@ -176,15 +183,19 @@
   MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
   MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
   MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
   Clients DoFindClients(const std::string &topic);
   bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
   MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
   void OnTimer();
   // remote hosts records
   std::vector<std::string> FindRemoteSubClients(const Topic &topic);
private:
   void CheckNodes();
   bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
   void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
   void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
   Clients DoFindClients(const std::string &topic, bool from_remote);
   bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
   bool Valid(const WeakNode &weak)
   {
@@ -197,7 +208,8 @@
   std::string id_; // center proc id;
   std::unordered_map<Topic, Clients> service_map_;
   std::unordered_map<Topic, Clients> subscribe_map_;
   std::unordered_map<Topic, Clients> local_sub_map_;
   std::unordered_map<Topic, Clients> net_sub_map_;
   std::unordered_map<Address, Node> nodes_;
   std::unordered_map<ProcId, Address> online_node_addr_map_;
   ProcRecords procs_; // To get a short index for msg alloc.