zhangmeng
2021-07-02 056f71f24cefaf88f2a93714c6678c03ed5f1e0e
box/node_center.cpp
@@ -31,6 +31,9 @@
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
const std::string kTopicNodeService = Join(kTopicNode, "service");
const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
} // namespace
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -114,33 +117,31 @@
{
   state_.timestamp_ = NowSec() - offline_time;
   state_.flag_ = kStateOffline;
   Json json;
   json.put("proc_id", proc_.proc_id());
   center_.Publish(shm_, kTopicNodeOffline, json.dump());
   center_.Notify(kTopicNodeOffline, *this);
}
void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
{
   if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
   Json json;
   json.put("proc_id", node.proc_.proc_id());
   Publish(node.shm_, topic, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
   auto old = state_.flag_;
   auto diff = now - state_.timestamp_;
   auto publish = [this](const std::string &topic) {
      if (proc_.proc_id().empty()) { return; } // node init, ignore.
      Json json;
      json.put("proc_id", proc_.proc_id());
      center_.Publish(shm_, topic, json.dump());
   };
   LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
   if (diff < offline_time) {
      state_.flag_ = kStateNormal;
      if (old != state_.flag_) {
         publish(kTopicNodeOnline);
         center_.Notify(kTopicNodeOnline, *this);
      }
   } else if (diff < kill_time) {
      state_.flag_ = kStateOffline;
      if (old != state_.flag_) {
         publish(kTopicNodeOffline);
         center_.Notify(kTopicNodeOffline, *this);
      }
   } else {
      state_.flag_ = kStateKillme;
@@ -269,6 +270,7 @@
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
   // LOG_FUNCTION;
   auto &topic = head.topic();
   auto clients = DoFindClients(topic, true);
   if (clients.empty()) { return true; }
@@ -287,9 +289,10 @@
            }
         }
         MsgI msg(shm);
         if (msg.Make(body_content)) {
         if (msg.Make(head, body_content)) {
            RecordMsg(msg);
            msgs.push_back(msg);
            // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
            DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
         }
      };
@@ -476,6 +479,7 @@
          for (auto &topic : topics) {
             LOG_DEBUG() << "\t" << topic;
          }
          Notify(kTopicNodeService, *node);
          return MakeReply(eSuccess);
       });
}
@@ -552,22 +556,43 @@
   typedef MsgQueryTopicReply Reply;
   auto query = [&](Node self) -> Reply {
      auto pos = service_map_.find(req.topic());
      if (pos != service_map_.end() && !pos->second.empty()) {
         auto &clients = pos->second;
         Reply reply = MakeReply<Reply>(eSuccess);
         for (auto &dest : clients) {
            Node dest_node(dest.weak_node_.lock());
            if (dest_node && Valid(*dest_node)) {
               auto node_addr = reply.add_node_address();
               node_addr->set_proc_id(dest_node->proc_.proc_id());
               node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
               node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
      Reply reply = MakeReply<Reply>(eSuccess);
      auto local = [&]() {
         auto pos = service_map_.find(req.topic());
         if (pos != service_map_.end() && !pos->second.empty()) {
            auto &clients = pos->second;
            for (auto &dest : clients) {
               Node dest_node(dest.weak_node_.lock());
               if (dest_node && Valid(*dest_node)) {
                  auto node_addr = reply.add_node_address();
                  node_addr->set_proc_id(dest_node->proc_.proc_id());
                  node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
                  node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
               }
            }
            return true;
         } else {
            return false;
         }
         return reply;
      } else {
      };
      auto net = [&]() {
         auto hosts(FindRemoteRPCServers(req.topic()));
         if (hosts.empty()) {
            return false;
         } else {
            for (auto &ip : hosts) {
               auto node_addr = reply.add_node_address();
               node_addr->mutable_addr()->set_ip(ip);
            }
            return true;
         }
      };
      local();
      net();
      if (reply.node_address_size() == 0) {
         return MakeReply<Reply>(eNotFound, "topic server not found.");
      } else {
         return reply;
      }
   };
@@ -585,9 +610,9 @@
         sub_map[topic].insert(dest);
      }
   };
   LOG_DEBUG() << "subscribe net : " << msg.network();
   if (msg.network()) {
      Sub(net_sub_, center_.net_sub_map_);
      center_.Notify(kTopicNodeSub, *this);
   } else {
      Sub(local_sub_, center_.local_sub_map_);
   }
@@ -632,6 +657,7 @@
   };
   if (msg.network()) {
      Unsub(net_sub_, center_.net_sub_map_);
      center_.Notify(kTopicNodeUnsub, *this);
   } else {
      Unsub(local_sub_, center_.local_sub_map_);
   }
@@ -647,6 +673,7 @@
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
   // LOG_FUNCTION;
   Clients dests;
   auto Find1 = [&](const std::string &exact) {
      auto FindIn = [&](auto &sub_map) {
@@ -654,16 +681,25 @@
         if (pos != sub_map.end()) {
            auto &clients = pos->second;
            for (auto &cli : clients) {
               if (Valid(cli.weak_node_)) {
                  dests.insert(cli);
               auto node = cli.weak_node_.lock();
               if (node) {
                  if (node->state_.flag_ == kStateNormal)
                     dests.insert(cli);
               }
               // if (Valid(cli.weak_node_)) {
               //    dests.insert(cli);
               // }
            }
         }
      };
      if (!from_remote) {
         FindIn(local_sub_map_);
         // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
      }
      // net subscripitions also work in local mode.
      FindIn(net_sub_map_);
      // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
   };
   Find1(topic);
@@ -789,8 +825,28 @@
   }
}
std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
{
   //TODO search synced full list;
   return std::vector<std::string>();
   // LOG_FUNCTION;
   sub_hosts_.clear();
   rpc_hosts_.clear();
   for (auto &host : info.array()) {
      if (host.get("isLocal", false)) {
         host_id_ = host.get("serverId", "");
         ip_ = host.get("ip", "");
      } else {
         auto ip = host.get("ip", "");
         auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
            for (auto &topic : lot) {
               auto t = topic.get_value<std::string>();
               rec[t].insert(ip);
               // LOG_DEBUG() << "net topic: " << t << ", " << ip;
            }
         };
         // LOG_DEBUG() << "serives:";
         UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
         // LOG_DEBUG() << "net sub:";
         UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
      }
   }
}