lichao
2021-05-21 b2484c8bd77a9d21bcf1827f554444535196953d
box/node_center.cpp
@@ -116,7 +116,7 @@
   Json json;
   json.put("proc_id", proc_.proc_id());
   center_.Publish(kTopicNodeOffline, json.dump());
   center_.Publish(shm_, kTopicNodeOffline, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
@@ -127,7 +127,7 @@
      if (proc_.proc_id().empty()) { return; } // node init, ignore.
      Json json;
      json.put("proc_id", proc_.proc_id());
      center_.Publish(topic, json.dump());
      center_.Publish(shm_, topic, json.dump());
   };
   LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
@@ -182,7 +182,7 @@
             SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
   };
   Node node(new NodeInfo(*this));
   Node node(new NodeInfo(*this, shm));
   if (UpdateRegInfo(node) && PrepareProcInit(node)) {
      reply |= (node->addrs_[ssn] << 4);
      nodes_[ssn] = node;
@@ -281,7 +281,7 @@
   auto &node = pos->second;
   try {
      for (int i = 0; i < msg.extra_mq_num(); ++i) {
         ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
         ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
         node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
         auto addr = reply.add_extra_mqs();
         addr->set_mq_id(tmp.id());
@@ -593,13 +593,15 @@
   }
   for (auto &addr : node->addrs_) {
      cleaner_(addr.first);
      auto &id = addr.first;
      auto r = ShmSocket::Remove(node->shm_, id);
      LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
   }
   node->addrs_.clear();
}
void NodeCenter::Publish(const Topic &topic, const std::string &content)
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
   try {
      // LOG_DEBUG() << "center publish: " << topic << ": " << content;
@@ -615,8 +617,8 @@
         DEFER1(msg.Release());
         RecordMsg(msg);
         auto &mq = GetCenterInfo(BHomeShm())->mq_sender_;
         ShmSocket sender(mq.offset_, BHomeShm(), mq.id_);
         auto &mq = GetCenterInfo(shm)->mq_sender_;
         ShmSocket sender(mq.offset_, shm, mq.id_);
         for (auto &cli : clients) {
            auto node = cli.weak_node_.lock();