liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
box/node_center.cpp
@@ -16,7 +16,25 @@
 * =====================================================================================
 */
#include "node_center.h"
#include "json.h"
#include "log.h"
using ssjson::Json;
namespace
{
std::string Join(const std::string &parent, const std::string &child)
{
   return parent + kTopicSep + child;
}
const std::string kTopicCenterRoot = "#center";
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
const std::string kTopicNodeService = Join(kTopicNode, "service");
const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
} // namespace
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
{
@@ -42,7 +60,7 @@
{
   auto pos = msgs_.find(id);
   if (pos != msgs_.end()) {
      ShmMsg(pos->second).Free();
      pos->second.Free();
      msgs_.erase(pos);
   } else {
      LOG_TRACE() << "ignore late free request.";
@@ -55,8 +73,9 @@
      return;
   }
   // LOG_FUNCTION;
   const size_t total = msgs_.size();
   time_to_clean_ = now + 1;
   int64_t limit = std::max(10000ul, msgs_.size() / 10);
   int64_t limit = std::max(10000ul, total / 10);
   int64_t n = 0;
   auto it = msgs_.begin();
   while (it != msgs_.end() && --limit > 0) {
@@ -67,49 +86,65 @@
         ++n;
      };
      int n = now - msg.timestamp();
      if (n < 10) {
      if (msg.Count() == 0) {
         Free();
      } else if (n > NodeTimeoutSec()) {
         Free();
      } else {
         ++it;
      } else if (msg.Count() == 0) {
         Free();
      } else if (n > 60) {
         Free();
      }
   }
   if (n > 0) {
      LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
      LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
   }
}
void MsgRecords::DebugPrint() const
{
   LOG_DEBUG() << "msgs : " << size();
   LOG_TRACE() << "msgs : " << size();
   int i = 0;
   int total_count = 0;
   for (auto &kv : msgs_) {
      MsgI msg(kv.second);
      auto &msg = kv.second;
      total_count += msg.Count();
      LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
      LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
   }
   LOG_DEBUG() << "total count: " << total_count;
   LOG_TRACE() << "total count: " << total_count;
}
// NodeCenter::ProcState
void NodeCenter::ProcState::PutOffline(const int64_t offline_time)
void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time)
{
   timestamp_ = NowSec() - offline_time;
   flag_ = kStateOffline;
   state_.timestamp_ = NowSec() - offline_time;
   state_.flag_ = kStateOffline;
   center_.Notify(kTopicNodeOffline, *this);
}
void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
{
   auto diff = now - timestamp_;
   LOG_DEBUG() << "state " << this << " diff: " << diff;
   if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
   Json json;
   json.put("proc_id", node.proc_.proc_id());
   Publish(node.shm_, topic, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
   auto old = state_.flag_;
   auto diff = now - state_.timestamp_;
   LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
   if (diff < offline_time) {
      flag_ = kStateNormal;
      state_.flag_ = kStateNormal;
      if (old != state_.flag_) {
         center_.Notify(kTopicNodeOnline, *this);
      }
   } else if (diff < kill_time) {
      flag_ = kStateOffline;
      state_.flag_ = kStateOffline;
      if (old != state_.flag_) {
         center_.Notify(kTopicNodeOffline, *this);
      }
   } else {
      flag_ = kStateKillme;
      state_.flag_ = kStateKillme;
   }
}
@@ -126,11 +161,11 @@
   auto UpdateRegInfo = [&](Node &node) {
      node->state_.timestamp_ = NowSec() - offline_time_;
      node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
      node->UpdateState(NowSec(), offline_time_, kill_time_);
      // create sockets.
      try {
         ShmSocket tmp(shm, true, ssn, 16);
         ShmSocket tmp(shm, ssn, eCreate);
         node->addrs_.emplace(ssn, tmp.AbsAddr());
         return true;
      } catch (...) {
@@ -140,7 +175,7 @@
   auto PrepareProcInit = [&](Node &node) {
      bool r = false;
      ShmMsg init_msg;
      ShmMsg init_msg(shm);
      DEFER1(init_msg.Release());
      MsgProcInit body;
      auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -149,7 +184,7 @@
             SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
   };
   Node node(new NodeInfo);
   Node node(new NodeInfo(*this, shm));
   if (UpdateRegInfo(node) && PrepareProcInit(node)) {
      reply |= (node->addrs_[ssn] << 4);
      nodes_[ssn] = node;
@@ -175,6 +210,121 @@
{
   RecordMsg(msg);
   return socket.Send(dest, msg);
}
NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
{
   Node node;
   auto ssn = mq_id - (mq_id % 10);
   auto pos = nodes_.find(ssn);
   if (pos != nodes_.end()) {
      node = pos->second;
   }
   return node;
}
bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
   Node node;
   auto FindDest = [&]() {
      auto pos = service_map_.find(head.topic());
      if (pos != service_map_.end() && !pos->second.empty()) {
         auto &clients = pos->second;
         for (auto &cli : clients) {
            node = cli.weak_node_.lock();
            if (node && Valid(*node)) {
               dest.id_ = cli.mq_id_;
               dest.offset_ = cli.mq_abs_addr_;
               return true;
            }
         }
      }
      return false;
   };
   if (dest.id_ == 0) {
      if (!FindDest()) {
         LOG_ERROR() << id() << " pass remote request, topic dest not found.";
         return false;
      }
   } else {
      node = GetNode(dest.id_);
      if (!node || !Valid(*node)) {
         LOG_ERROR() << id() << " pass remote request, dest not found.";
         return false;
      }
   }
   ShmSocket &sender(DefaultSender(node->shm_));
   auto route = head.add_route();
   route->set_mq_id(sender.id());
   route->set_abs_addr(sender.AbsAddr());
   ShmMsg msg(node->shm_);
   if (!msg.Make(head, body_content)) { return false; }
   DEFER1(msg.Release(););
   RecordMsg(msg);
   return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
   // LOG_FUNCTION;
   auto &topic = head.topic();
   auto clients = DoFindClients(topic, true);
   if (clients.empty()) { return true; }
   std::vector<MsgI> msgs;
   auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
   DEFER1(ReleaseAll(););
   for (auto &cli : clients) {
      auto Send1 = [&](Node node) {
         auto &shm = node->shm_;
         for (auto &msg : msgs) {
            if (msg.shm().name() == shm.name()) {
               DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
               return;
            }
         }
         MsgI msg(shm);
         if (msg.Make(head, body_content)) {
            RecordMsg(msg);
            msgs.push_back(msg);
            // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
            DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
         }
      };
      auto node = cli.weak_node_.lock();
      if (node) {
         Send1(node);
         // should also make sure that mq is not killed before msg expires.
         // it would be ok if (kill_time - offline_time) is longer than expire time.
      }
   }
   return true;
}
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
{
   Node node(GetNode(dest.id_));
   if (!node) {
      LOG_ERROR() << id() << " pass remote reply , ssn not found.";
      return false;
   }
   auto offset = node->addrs_[dest.id_];
   if (offset != dest.offset_) {
      LOG_ERROR() << id() << " pass remote reply, dest address not match";
      return false;
   }
   ShmMsg msg(node->shm_);
   if (!msg.Make(head, body_content)) { return false; }
   DEFER1(msg.Release(););
   RecordMsg(msg);
   return DefaultSender(node->shm_).Send(dest, msg);
}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
@@ -205,7 +355,7 @@
   if (!FindMq()) { return; }
   auto size = GetAllocSize((val >> 52) & MaskBits(8));
   MsgI new_msg;
   MsgI new_msg(socket.shm());
   if (new_msg.Make(size)) {
      // 31bit proc index, 28bit id, ,4bit cmd+flag
      int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -248,7 +398,7 @@
   auto &node = pos->second;
   try {
      for (int i = 0; i < msg.extra_mq_num(); ++i) {
         ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
         ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
         node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
         auto addr = reply.add_extra_mqs();
         addr->set_mq_id(tmp.id());
@@ -271,33 +421,33 @@
      MQId ssn = head.ssn_id();
      // when node restart, ssn will change,
      // and old node will be removed after timeout.
      auto UpdateRegInfo = [&](Node &node) {
         node->proc_.Swap(msg.mutable_proc());
         node->state_.timestamp_ = head.timestamp();
         node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
      };
      auto pos = nodes_.find(ssn);
      if (pos == nodes_.end()) {
         return MakeReply(eInvalidInput, "invalid session.");
      }
      // update proc info
      Node &node = pos->second;
      UpdateRegInfo(node);
      LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
      // try to remove old session
      auto old = online_node_addr_map_.find(head.proc_id());
      if (old != online_node_addr_map_.end()) { // old session
         auto &old_ssn = old->second;
         if (old_ssn != ssn) {
            nodes_[old_ssn]->state_.PutOffline(offline_time_);
            nodes_[old_ssn]->PutOffline(offline_time_);
            LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline";
            old_ssn = ssn;
         }
      } else {
         online_node_addr_map_.emplace(head.proc_id(), ssn);
      }
      // update proc info
      Node &node = pos->second;
      node->proc_.Swap(msg.mutable_proc());
      node->state_.timestamp_ = head.timestamp();
      node->UpdateState(NowSec(), offline_time_, kill_time_);
      LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
      return MakeReply(eSuccess);
   } catch (...) {
      return MakeReply(eError, "register node error.");
@@ -309,7 +459,7 @@
   return HandleMsg(
       head, [&](Node node) -> MsgCommonReply {
          NodeInfo &ni = *node;
          ni.state_.PutOffline(offline_time_);
          ni.PutOffline(offline_time_);
          return MakeReply(eSuccess);
       });
}
@@ -329,6 +479,7 @@
          for (auto &topic : topics) {
             LOG_DEBUG() << "\t" << topic;
          }
          Notify(kTopicNodeService, *node);
          return MakeReply(eSuccess);
       });
}
@@ -338,7 +489,7 @@
   return HandleMsg(head, [&](Node node) {
      NodeInfo &ni = *node;
      ni.state_.timestamp_ = head.timestamp();
      ni.state_.UpdateState(NowSec(), offline_time_, kill_time_);
      ni.UpdateState(NowSec(), offline_time_, kill_time_);
      auto &info = msg.proc();
      if (!info.public_info().empty()) {
@@ -350,44 +501,53 @@
      return MakeReply(eSuccess);
   });
}
MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id)
{
   typedef MsgQueryProcReply Reply;
   auto query = [&](Node self) -> Reply {
      auto Add1 = [](Reply &reply, Node node) {
         auto info = reply.add_proc_list();
         *info->mutable_proc() = node->proc_;
         info->set_online(node->state_.flag_ == kStateNormal);
         for (auto &addr_topics : node->services_) {
   auto Add1 = [](Reply &reply, Node node) {
      auto info = reply.add_proc_list();
      *info->mutable_proc() = node->proc_;
      info->mutable_proc()->clear_private_info();
      info->set_online(node->state_.flag_ == kStateNormal);
      auto AddTopics = [](auto &dst, auto &src) {
         for (auto &addr_topics : src) {
            for (auto &topic : addr_topics.second) {
               info->mutable_topics()->add_topic_list(topic);
               dst.add_topic_list(topic);
            }
         }
      };
      if (!req.proc_id().empty()) {
         auto pos = online_node_addr_map_.find(req.proc_id());
         if (pos == online_node_addr_map_.end()) {
            return MakeReply<Reply>(eNotFound, "proc not found.");
         } else {
            auto node_pos = nodes_.find(pos->second);
            if (node_pos == nodes_.end()) {
               return MakeReply<Reply>(eNotFound, "proc node not found.");
            } else {
               auto reply = MakeReply<Reply>(eSuccess);
               Add1(reply, node_pos->second);
               return reply;
            }
         }
      } else {
         Reply reply(MakeReply<Reply>(eSuccess));
         for (auto &kv : nodes_) {
            Add1(reply, kv.second);
         }
         return reply;
      }
      AddTopics(*info->mutable_service(), node->services_);
      AddTopics(*info->mutable_local_sub(), node->local_sub_);
      AddTopics(*info->mutable_net_sub(), node->net_sub_);
   };
   if (!proc_id.empty()) {
      auto pos = online_node_addr_map_.find(proc_id);
      if (pos == online_node_addr_map_.end()) {
         return MakeReply<Reply>(eNotFound, "proc not found.");
      } else {
         auto node_pos = nodes_.find(pos->second);
         if (node_pos == nodes_.end()) {
            return MakeReply<Reply>(eNotFound, "proc node not found.");
         } else {
            auto reply = MakeReply<Reply>(eSuccess);
            Add1(reply, node_pos->second);
            return reply;
         }
      }
   } else {
      Reply reply(MakeReply<Reply>(eSuccess));
      for (auto &kv : nodes_) {
         Add1(reply, kv.second);
      }
      return reply;
   }
}
MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
{
   typedef MsgQueryProcReply Reply;
   auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); };
   return HandleMsg<Reply>(head, query);
}
@@ -396,57 +556,93 @@
   typedef MsgQueryTopicReply Reply;
   auto query = [&](Node self) -> Reply {
      auto pos = service_map_.find(req.topic());
      if (pos != service_map_.end() && !pos->second.empty()) {
         auto &clients = pos->second;
         Reply reply = MakeReply<Reply>(eSuccess);
         for (auto &dest : clients) {
            Node dest_node(dest.weak_node_.lock());
            if (dest_node && Valid(*dest_node)) {
               auto node_addr = reply.add_node_address();
               node_addr->set_proc_id(dest_node->proc_.proc_id());
               node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
               node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
      Reply reply = MakeReply<Reply>(eSuccess);
      auto local = [&]() {
         auto pos = service_map_.find(req.topic());
         if (pos != service_map_.end() && !pos->second.empty()) {
            auto &clients = pos->second;
            for (auto &dest : clients) {
               Node dest_node(dest.weak_node_.lock());
               if (dest_node && Valid(*dest_node)) {
                  auto node_addr = reply.add_node_address();
                  node_addr->set_proc_id(dest_node->proc_.proc_id());
                  node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
                  node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
               }
            }
            return true;
         } else {
            return false;
         }
         return reply;
      } else {
      };
      auto net = [&]() {
         auto hosts(FindRemoteRPCServers(req.topic()));
         if (hosts.empty()) {
            return false;
         } else {
            for (auto &ip : hosts) {
               auto node_addr = reply.add_node_address();
               node_addr->mutable_addr()->set_ip(ip);
            }
            return true;
         }
      };
      local();
      net();
      if (reply.node_address_size() == 0) {
         return MakeReply<Reply>(eNotFound, "topic server not found.");
      } else {
         return reply;
      }
   };
   return HandleMsg<Reply>(head, query);
}
void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
{
   auto src = SrcAddr(head);
   auto Sub = [&](auto &sub, auto &sub_map) {
      auto &topics = msg.topics().topic_list();
      sub[src].insert(topics.begin(), topics.end());
      const TopicDest &dest = {src, SrcAbsAddr(head), node};
      for (auto &topic : topics) {
         sub_map[topic].insert(dest);
      }
   };
   if (msg.network()) {
      Sub(net_sub_, center_.net_sub_map_);
      center_.Notify(kTopicNodeSub, *this);
   } else {
      Sub(local_sub_, center_.local_sub_map_);
   }
}
MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
{
   return HandleMsg(head, [&](Node node) {
      auto src = SrcAddr(head);
      auto &topics = msg.topics().topic_list();
      node->subscriptions_[src].insert(topics.begin(), topics.end());
      TopicDest dest = {src, SrcAbsAddr(head), node};
      for (auto &topic : topics) {
         subscribe_map_[topic].insert(dest);
      }
      node->Subscribe(head, msg, node);
      return MakeReply(eSuccess);
   });
}
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
   return HandleMsg(head, [&](Node node) {
      auto src = SrcAddr(head);
      auto pos = node->subscriptions_.find(src);
      auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
         auto pos = subscribe_map_.find(topic);
         if (pos != subscribe_map_.end() &&
void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
{
   auto src = SrcAddr(head);
   auto Unsub = [&](auto &sub, auto &sub_map) {
      auto pos = sub.find(src);
      auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
         auto pos = sub_map.find(topic);
         if (pos != sub_map.end() &&
             pos->second.erase(dest) != 0 &&
             pos->second.empty()) {
            subscribe_map_.erase(pos);
            sub_map.erase(pos);
         }
      };
      if (pos != node->subscriptions_.end()) {
      if (pos != sub.end()) {
         const TopicDest &dest = {src, SrcAbsAddr(head), node};
         auto &topics = msg.topics().topic_list();
         // clear node sub records;
@@ -455,26 +651,55 @@
            RemoveSubTopicDestRecord(topic, dest);
         }
         if (pos->second.empty()) {
            node->subscriptions_.erase(pos);
            sub.erase(pos);
         }
      }
   };
   if (msg.network()) {
      Unsub(net_sub_, center_.net_sub_map_);
      center_.Notify(kTopicNodeUnsub, *this);
   } else {
      Unsub(local_sub_, center_.local_sub_map_);
   }
}
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
   return HandleMsg(head, [&](Node node) {
      node->Unsubscribe(head, msg, node);
      return MakeReply(eSuccess);
   });
}
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
   // LOG_FUNCTION;
   Clients dests;
   auto Find1 = [&](const std::string &t) {
      auto pos = subscribe_map_.find(topic);
      if (pos != subscribe_map_.end()) {
         auto &clients = pos->second;
         for (auto &cli : clients) {
            if (Valid(cli.weak_node_)) {
               dests.insert(cli);
   auto Find1 = [&](const std::string &exact) {
      auto FindIn = [&](auto &sub_map) {
         auto pos = sub_map.find(exact);
         if (pos != sub_map.end()) {
            auto &clients = pos->second;
            for (auto &cli : clients) {
               auto node = cli.weak_node_.lock();
               if (node) {
                  if (node->state_.flag_ == kStateNormal)
                     dests.insert(cli);
               }
               // if (Valid(cli.weak_node_)) {
               //    dests.insert(cli);
               // }
            }
         }
      };
      if (!from_remote) {
         FindIn(local_sub_map_);
         // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
      }
      // net subscripitions also work in local mode.
      FindIn(net_sub_map_);
      // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
   };
   Find1(topic);
@@ -485,21 +710,37 @@
         // Find1(std::string()); // sub all.
         break;
      } else {
         Find1(topic.substr(0, pos));
         Find1(topic.substr(0, pos - 1));
      }
   }
   return dests;
}
bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
{
   bool ret = false;
   HandleMsg(head, [&](Node node) {
      DoFindClients(msg.topic()).swap(out);
      ret = true;
   return HandleMsg(head, [&](Node node) {
      DoPublish(DefaultSender(node->shm_), topic, msg);
      return MakeReply(eSuccess);
   }).Swap(&reply);
   return ret;
   });
}
void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
{
   try {
      auto clients = DoFindClients(topic, false);
      if (clients.empty()) { return; }
      for (auto &cli : clients) {
         auto node = cli.weak_node_.lock();
         if (node) {
            // should also make sure that mq is not killed before msg expires.
            // it would be ok if (kill_time - offline_time) is longer than expire time.
            sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
         }
      }
   } catch (...) {
      LOG_ERROR() << "DoPublish error.";
   }
}
void NodeCenter::OnTimer()
@@ -517,7 +758,7 @@
   auto it = nodes_.begin();
   while (it != nodes_.end()) {
      auto &cli = *it->second;
      cli.state_.UpdateState(now, offline_time_, kill_time_);
      cli.UpdateState(now, offline_time_, kill_time_);
      if (cli.state_.flag_ == kStateKillme) {
         RemoveNode(it->second);
         it = nodes_.erase(it);
@@ -545,7 +786,8 @@
      }
   };
   EraseMapRec(service_map_, node->services_);
   EraseMapRec(subscribe_map_, node->subscriptions_);
   EraseMapRec(local_sub_map_, node->local_sub_);
   EraseMapRec(net_sub_map_, node->net_sub_);
   // remove online record.
   auto pos = online_node_addr_map_.find(node->proc_.proc_id());
@@ -556,8 +798,55 @@
   }
   for (auto &addr : node->addrs_) {
      cleaner_(addr.first);
      auto &id = addr.first;
      auto r = ShmSocket::Remove(node->shm_, id);
      LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
   }
   node->addrs_.clear();
}
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
   try {
      MsgPublish pub;
      pub.set_topic(topic);
      pub.set_data(content);
      BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
      MsgI msg(shm);
      if (msg.Make(head, pub)) {
         DEFER1(msg.Release());
         RecordMsg(msg);
         DoPublish(DefaultSender(shm), topic, msg);
      }
   } catch (...) {
      LOG_ERROR() << "center publish error.";
   }
}
void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
{
   // LOG_FUNCTION;
   sub_hosts_.clear();
   rpc_hosts_.clear();
   for (auto &host : info.array()) {
      if (host.get("isLocal", false)) {
         host_id_ = host.get("serverId", "");
         ip_ = host.get("ip", "");
      } else {
         auto ip = host.get("ip", "");
         auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
            for (auto &topic : lot) {
               auto t = topic.get_value<std::string>();
               rec[t].insert(ip);
               // LOG_DEBUG() << "net topic: " << t << ", " << ip;
            }
         };
         // LOG_DEBUG() << "serives:";
         UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
         // LOG_DEBUG() << "net sub:";
         UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
      }
   }
}