lichao
2021-06-30 ae17d1439b35b55212c3a30712e0a60b1d6a99c0
box/center_topic_node.cpp
@@ -30,6 +30,7 @@
namespace
{
const std::string &kTopicQueryProc = "#center_query_procs";
const std::string &kTopicNotifyRemoteInfo = "pub-allRegisterInfo-to-center";
std::string ToJson(const MsgQueryProcReply &qpr)
{
@@ -92,10 +93,16 @@
      throw std::runtime_error("center node register failed.");
   }
   MsgTopicList topics;
   topics.add_topic_list(kTopicQueryProc);
   if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
   MsgTopicList services;
   services.add_topic_list(kTopicQueryProc);
   if (!pnode_->DoServerRegisterRPC(true, services, reply, timeout)) {
      throw std::runtime_error("center node register topics failed.");
   }
   MsgTopicList subs;
   subs.add_topic_list(kTopicNotifyRemoteInfo);
   if (!pnode_->Subscribe(subs, reply, timeout)) {
      throw std::runtime_error("center node subscribe topics failed.");
   }
   auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
@@ -117,6 +124,20 @@
      pnode_->ServerSendReply(src_info, reply);
   };
   auto OnSubRecv = [&](const std::string &proc_id, const MsgPublish &data) {
      if (data.topic() == kTopicNotifyRemoteInfo) {
         // parse other data.
         // LOG_DEBUG() << "center got net info.";
         ssjson::Json js;
         if (js.parse(data.data())) {
            if (js.is_array()) {
               auto &center = *pscenter_;
               center->ParseNetInfo(js);
            }
         }
      }
   };
   bool cur = false;
   if (run_.compare_exchange_strong(cur, true)) {
      auto heartbeat = [this]() {
@@ -126,7 +147,7 @@
         }
      };
      std::thread(heartbeat).swap(worker_);
      return pnode_->ServerStart(onRequest);
      return pnode_->ServerStart(onRequest) && pnode_->SubscribeStartWorker(OnSubRecv);
   } else {
      return false;
   }