lichao
2021-05-08 36e6a35a886252516f168b90f7a9a7c1c5177312
box/center.cpp
@@ -102,22 +102,43 @@
   // center name, no relative to shm.
   const std::string &id() const { return id_; }
   void OnNodeInit(const int64_t msg)
   void OnNodeInit(SharedMemory &shm, const int64_t msg)
   {
      MQId ssn = msg;
      if (nodes_.find(ssn) != nodes_.end()) {
         return; // ignore in exists.
      }
      auto UpdateRegInfo = [&](Node &node) {
         for (int i = 0; i < 10; ++i) {
            node->addrs_.insert(ssn + i);
         }
         node->state_.timestamp_ = NowSec() - offline_time_;
         node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
         // create sockets.
         try {
            auto CreateSocket = [](SharedMemory &shm, const MQId id) {
               ShmSocket tmp(shm, true, id, 16);
            };
            // alloc(-1), node, server, sub, request,
            for (int i = -1; i < 4; ++i) {
               CreateSocket(shm, ssn + i);
               node->addrs_.insert(ssn + i);
            }
            return true;
         } catch (...) {
            return false;
         }
      };
      Node node(new NodeInfo);
      UpdateRegInfo(node);
      nodes_[ssn] = node;
      LOG_INFO() << "new node ssn (" << ssn << ") init";
      if (UpdateRegInfo(node)) {
         nodes_[ssn] = node;
         LOG_INFO() << "new node ssn (" << ssn << ") init";
      }
   }
   MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
   {
      if (msg.proc().proc_id() != head.proc_id()) {
@@ -475,7 +496,7 @@
{
   auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
      auto &center = *center_ptr;
      center->OnNodeInit(msg.Offset());
      center->OnNodeInit(socket.shm(), msg.Offset());
   };
   auto Nothing = [](ShmSocket &socket) {};