lichao
2021-06-02 993c556000a414011626770540678948f16eaa9e
box/node_center.cpp
@@ -57,7 +57,7 @@
{
   auto pos = msgs_.find(id);
   if (pos != msgs_.end()) {
      ShmMsg(pos->second).Free();
      pos->second.Free();
      msgs_.erase(pos);
   } else {
      LOG_TRACE() << "ignore late free request.";
@@ -70,8 +70,9 @@
      return;
   }
   // LOG_FUNCTION;
   const size_t total = msgs_.size();
   time_to_clean_ = now + 1;
   int64_t limit = std::max(10000ul, msgs_.size() / 10);
   int64_t limit = std::max(10000ul, total / 10);
   int64_t n = 0;
   auto it = msgs_.begin();
   while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
         ++n;
      };
      int n = now - msg.timestamp();
      if (n < 10) {
      if (msg.Count() == 0) {
         Free();
      } else if (n > NodeTimeoutSec()) {
         Free();
      } else {
         ++it;
      } else if (msg.Count() == 0) {
         Free();
      } else if (n > 60) {
         Free();
      }
   }
   if (n > 0) {
      LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
      LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
   }
}
@@ -101,9 +102,9 @@
   int i = 0;
   int total_count = 0;
   for (auto &kv : msgs_) {
      MsgI msg(kv.second);
      auto &msg = kv.second;
      total_count += msg.Count();
      LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
      LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
   }
   LOG_TRACE() << "total count: " << total_count;
}
@@ -163,7 +164,7 @@
      // create sockets.
      try {
         ShmSocket tmp(shm, true, ssn, 16);
         ShmSocket tmp(shm, ssn, eCreate);
         node->addrs_.emplace(ssn, tmp.AbsAddr());
         return true;
      } catch (...) {
@@ -173,7 +174,7 @@
   auto PrepareProcInit = [&](Node &node) {
      bool r = false;
      ShmMsg init_msg;
      ShmMsg init_msg(shm);
      DEFER1(init_msg.Release());
      MsgProcInit body;
      auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -210,6 +211,57 @@
   return socket.Send(dest, msg);
}
NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
{
   Node node;
   auto ssn = mq_id - (mq_id % 10);
   auto pos = nodes_.find(ssn);
   if (pos != nodes_.end()) {
      node = pos->second;
   }
   return node;
}
bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
   Node node(GetNode(dest.id_));
   if (!node || !Valid(*node)) {
      LOG_ERROR() << id() << " pass remote request, dest not found.";
      return false;
   }
   ShmSocket &sender(DefaultSender(node->shm_));
   auto route = head.add_route();
   route->set_mq_id(sender.id());
   route->set_abs_addr(sender.AbsAddr());
   ShmMsg msg(node->shm_);
   if (!msg.Make(head, body_content)) { return false; }
   DEFER1(msg.Release(););
   RecordMsg(msg);
   return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
{
   Node node(GetNode(dest.id_));
   if (!node) {
      LOG_ERROR() << id() << " pass remote reply , ssn not found.";
      return false;
   }
   auto offset = node->addrs_[dest.id_];
   if (offset != dest.offset_) {
      LOG_ERROR() << id() << " pass remote reply, dest address not match";
      return false;
   }
   ShmMsg msg(node->shm_);
   if (!msg.Make(head, body_content)) { return false; }
   DEFER1(msg.Release(););
   RecordMsg(msg);
   return DefaultSender(node->shm_).Send(dest, msg);
}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
   // LOG_FUNCTION;
@@ -238,7 +290,7 @@
   if (!FindMq()) { return; }
   auto size = GetAllocSize((val >> 52) & MaskBits(8));
   MsgI new_msg;
   MsgI new_msg(socket.shm());
   if (new_msg.Make(size)) {
      // 31bit proc index, 28bit id, ,4bit cmd+flag
      int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -281,7 +333,7 @@
   auto &node = pos->second;
   try {
      for (int i = 0; i < msg.extra_mq_num(); ++i) {
         ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
         ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
         node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
         auto addr = reply.add_extra_mqs();
         addr->set_mq_id(tmp.id());
@@ -612,18 +664,15 @@
      pub.set_topic(topic);
      pub.set_data(content);
      BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
      MsgI msg;
      MsgI msg(shm);
      if (msg.Make(head, pub)) {
         DEFER1(msg.Release());
         RecordMsg(msg);
         auto &mq = GetCenterInfo(shm)->mq_sender_;
         ShmSocket sender(mq.offset_, shm, mq.id_);
         for (auto &cli : clients) {
            auto node = cli.weak_node_.lock();
            if (node && node->state_.flag_ == kStateNormal) {
               sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
               DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
            }
         }
      }