lichao
2021-05-18 3788226ee9332945e90066b58f2b85026c2a0460
box/center.cpp
@@ -212,30 +212,27 @@
   // center name, no relative to shm.
   const std::string &id() const { return id_; }
   void OnNodeInit(ShmSocket &socket, const int64_t val)
   int64_t OnNodeInit(ShmSocket &socket, const int64_t val)
   {
      LOG_FUNCTION;
      SharedMemory &shm = socket.shm();
      MQId ssn = (val >> 4) & MaskBits(60);
      MQId ssn = (val >> 4) & MaskBits(56);
      int reply = EncodeCmd(eCmdNodeInitReply);
      if (nodes_.find(ssn) != nodes_.end()) {
         return; // ignore in exists.
         return reply; // ignore if exists.
      }
      auto UpdateRegInfo = [&](Node &node) {
         node->state_.timestamp_ = NowSec() - offline_time_;
         node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
         // create sockets.
         const int nsocks = 4;
         try {
            for (int i = 0; i < nsocks; ++i) {
               ShmSocket tmp(shm, true, ssn + i, 16);
               node->addrs_.emplace(ssn + i, tmp.AbsAddr());
            }
            ShmSocket tmp(shm, true, ssn, 16);
            node->addrs_.emplace(ssn, tmp.AbsAddr());
            return true;
         } catch (...) {
            for (int i = 0; i < nsocks; ++i) {
               ShmSocket::Remove(shm, ssn + i);
            }
            return false;
         }
      };
@@ -243,25 +240,29 @@
      auto PrepareProcInit = [&](Node &node) {
         bool r = false;
         ShmMsg init_msg;
         if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
            // 31bit pointer, 4bit cmd+flag
            int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
            r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg);
         }
         return r;
         DEFER1(init_msg.Release());
         MsgProcInit body;
         auto head = InitMsgHead(GetType(body), id(), ssn);
         return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) &&
                init_msg.Fill(ShmMsg::Serialize(head, body)) &&
                SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
      };
      Node node(new NodeInfo);
      if (UpdateRegInfo(node) && PrepareProcInit(node)) {
         reply |= (node->addrs_[ssn] << 4);
         nodes_[ssn] = node;
         LOG_INFO() << "new node ssn (" << ssn << ") init";
      } else {
         for (int i = 0; i < 10; ++i) {
            ShmSocket::Remove(shm, ssn + i);
         }
         ShmSocket::Remove(shm, ssn);
      }
      return reply;
   }
   void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
   void RecordMsg(const MsgI &msg)
   {
      msg.reset_managed(true);
      msgs_.RecordMsg(msg);
   }
   bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
   {
@@ -325,7 +326,6 @@
      assert(IsCmd(val));
      int cmd = DecodeCmd(val);
      switch (cmd) {
      case eCmdNodeInit: OnNodeInit(socket, val); break;
      case eCmdAllocRequest0: OnAlloc(socket, val); break;
      case eCmdFree: OnFree(socket, val); break;
      default: return false;
@@ -336,10 +336,28 @@
   MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
   {
      LOG_DEBUG() << "center got proc init.";
      auto pos = nodes_.find(head.ssn_id());
      if (pos == nodes_.end()) {
         return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised");
      }
      auto index = procs_.Put(head.proc_id(), head.ssn_id());
      auto reply(MakeReply<MsgProcInitReply>(eSuccess));
      reply.set_proc_index(index);
      return reply;
      auto &node = pos->second;
      try {
         for (int i = 0; i < msg.extra_mq_num(); ++i) {
            ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
            node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
            auto addr = reply.add_extra_mqs();
            addr->set_mq_id(tmp.id());
            addr->set_abs_addr(tmp.AbsAddr());
         }
         return reply;
      } catch (...) {
         LOG_ERROR() << "proc init create mq error";
         return MakeReply<MsgProcInitReply>(eError, "Create mq failed.");
      }
   }
   MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -711,6 +729,10 @@
   // now we can talk.
   auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
      auto &center = *center_ptr;
      auto onInit = [&](const int64_t request) {
         return center->OnNodeInit(socket, request);
      };
      BHCenterHandleInit(onInit);
      center->OnTimer();
   };