liuxiaolong
2021-07-20 232227035c8d6a31eaaf193863cbadda949c08fd
box/node_center.h
@@ -18,6 +18,7 @@
#ifndef NODE_CENTER_KY67RJ1Q
#define NODE_CENTER_KY67RJ1Q
#include "json.h"
#include "shm_socket.h"
#include <unordered_map>
@@ -48,17 +49,16 @@
class MsgRecords
{
   typedef int64_t MsgId;
   typedef int64_t Offset;
public:
   void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
   void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
   void FreeMsg(MsgId id);
   void AutoRemove();
   size_t size() const { return msgs_.size(); }
   void DebugPrint() const;
private:
   std::unordered_map<MsgId, Offset> msgs_;
   std::unordered_map<MsgId, MsgI> msgs_;
   int64_t time_to_clean_ = 0;
};
@@ -80,20 +80,29 @@
   struct ProcState {
      int64_t timestamp_ = 0;
      uint32_t flag_ = 0; // reserved
      void PutOffline(const int64_t offline_time);
      void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
   };
   typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
   struct NodeInfo;
   typedef std::shared_ptr<NodeInfo> Node;
   typedef std::weak_ptr<NodeInfo> WeakNode;
   struct NodeInfo {
      NodeCenter &center_;
      SharedMemory &shm_;
      ProcState state_;               // state
      std::map<MQId, int64_t> addrs_; // registered mqs
      ProcInfo proc_;                 //
      AddressTopics services_;        // address: topics
      AddressTopics subscriptions_;   // address: topics
      AddressTopics local_sub_;       // address: topics
      AddressTopics net_sub_;         // address: topics
      NodeInfo(NodeCenter &center, SharedMemory &shm) :
          center_(center), shm_(shm) {}
      void PutOffline(const int64_t offline_time);
      void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
      void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
      void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
   };
   typedef std::shared_ptr<NodeInfo> Node;
   typedef std::weak_ptr<NodeInfo> WeakNode;
   struct TopicDest {
      MQId mq_id_;
@@ -109,8 +118,8 @@
public:
   typedef std::set<TopicDest> Clients;
   NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
       id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
   NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
       id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
   // center name, no relative to shm.
   const std::string &id() const { return id_; }
@@ -118,6 +127,10 @@
   void RecordMsg(const MsgI &msg);
   bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
   bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
   bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
   bool RemotePublish(BHMsgHead &head, const std::string &body_content);
   bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
   void OnAlloc(ShmSocket &socket, const int64_t val);
   void OnFree(ShmSocket &socket, const int64_t val);
   bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -144,8 +157,8 @@
               return op(node);
            }
         }
      } catch (...) {
         //TODO error log
      } catch (std::exception &e) {
         LOG_ERROR() << "handle msg exception: " << e.what();
         return MakeReply<Reply>(eError, "internal error.");
      }
   }
@@ -154,22 +167,39 @@
   {
      return HandleMsg<MsgCommonReply, Func>(head, op);
   }
   template <class Reply>
   bool CheckMsg(const BHMsgHead &head, Reply &reply)
   {
      bool r = false;
      auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
      reply = HandleMsg<Reply>(head, onOk);
      return r;
   }
   MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
   MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
   MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
   MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
   MsgQueryProcReply QueryProc(const std::string &proc_id);
   MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
   MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
   MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
   Clients DoFindClients(const std::string &topic);
   bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
   MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
   void OnTimer();
   // remote hosts records
   std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); }
   std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); }
   void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); }
private:
   void CheckNodes();
   bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
   void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
   void Notify(const Topic &topic, NodeInfo &node);
   void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
   Clients DoFindClients(const std::string &topic, bool from_remote);
   bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
   bool Valid(const WeakNode &weak)
   {
@@ -177,19 +207,48 @@
      return node && Valid(*node);
   }
   void RemoveNode(Node &node);
   Node GetNode(const MQId mq);
   std::string id_; // center proc id;
   std::unordered_map<Topic, Clients> service_map_;
   std::unordered_map<Topic, Clients> subscribe_map_;
   std::unordered_map<Topic, Clients> local_sub_map_;
   std::unordered_map<Topic, Clients> net_sub_map_;
   std::unordered_map<Address, Node> nodes_;
   std::unordered_map<ProcId, Address> online_node_addr_map_;
   ProcRecords procs_; // To get a short index for msg alloc.
   MsgRecords msgs_;   // record all msgs alloced.
   Cleaner cleaner_; // remove mqs.
   int64_t offline_time_;
   int64_t kill_time_;
   int64_t last_check_time_;
   // net hosts info
   class NetRecords
   {
   public:
      typedef std::set<std::string> Hosts;
      void ParseData(const ssjson::Json &input);
      Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); }
      Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); }
   private:
      typedef std::unordered_map<Topic, Hosts> TopicMap;
      TopicMap sub_hosts_;
      TopicMap rpc_hosts_;
      Hosts FindHosts(const Topic &topic, const TopicMap &tmap)
      {
         auto pos = tmap.find(topic);
         if (pos != tmap.end()) {
            return pos->second;
         } else {
            return Hosts();
         }
      }
      std::string host_id_;
      std::string ip_;
   };
   NetRecords net_records_;
};
#endif // end of include guard: NODE_CENTER_KY67RJ1Q