lichao
2021-05-28 9243710ca372de26823c2225c7b46b072458c671
box/node_center.cpp
@@ -70,8 +70,9 @@
      return;
   }
   // LOG_FUNCTION;
   const size_t total = msgs_.size();
   time_to_clean_ = now + 1;
   int64_t limit = std::max(10000ul, msgs_.size() / 10);
   int64_t limit = std::max(10000ul, total / 10);
   int64_t n = 0;
   auto it = msgs_.begin();
   while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
         ++n;
      };
      int n = now - msg.timestamp();
      if (n < 10) {
      if (msg.Count() == 0) {
         Free();
      } else if (n > NodeTimeoutSec()) {
         Free();
      } else {
         ++it;
      } else if (msg.Count() == 0) {
         Free();
      } else if (n > 60) {
         Free();
      }
   }
   if (n > 0) {
      LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
      LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
   }
}
@@ -209,17 +210,25 @@
   RecordMsg(msg);
   return socket.Send(dest, msg);
}
bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
{
   auto ssn = dest.id_ - (dest.id_ % 10);
   LOG_DEBUG() << "prox ssn " << ssn;
   Node node;
   auto ssn = mq_id - (mq_id % 10);
   auto pos = nodes_.find(ssn);
   if (pos == nodes_.end()) {
      LOG_ERROR() << "proxy msg, ssn not found.";
   if (pos != nodes_.end()) {
      node = pos->second;
   }
   return node;
}
bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
   Node node(GetNode(dest.id_));
   if (!node || !Valid(*node)) {
      LOG_ERROR() << id() << " pass remote request, dest not found.";
      return false;
   }
   auto &node = pos->second;
   if (!Valid(*node)) { return false; }
   ShmSocket &sender(DefaultSender(node->shm_));
   auto route = head.add_route();
@@ -233,6 +242,26 @@
   return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
{
   Node node(GetNode(dest.id_));
   if (!node) {
      LOG_ERROR() << id() << " pass remote reply , ssn not found.";
      return false;
   }
   auto offset = node->addrs_[dest.id_];
   if (offset != dest.offset_) {
      LOG_ERROR() << id() << " pass remote reply, dest address not match";
      return false;
   }
   ShmMsg msg(node->shm_);
   if (!msg.Make(head, body_content)) { return false; }
   DEFER1(msg.Release(););
   RecordMsg(msg);
   return DefaultSender(node->shm_).Send(dest, msg);
}
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
   // LOG_FUNCTION;