lichao
2021-06-23 c1e39e20ca42b21eeac8b5068fa1f921bf9a070f
box/node_center.cpp
@@ -267,6 +267,43 @@
   return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
   auto &topic = head.topic();
   auto clients = DoFindClients(topic, true);
   if (clients.empty()) { return true; }
   std::vector<MsgI> msgs;
   auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
   DEFER1(ReleaseAll(););
   for (auto &cli : clients) {
      auto Send1 = [&](Node node) {
         auto &shm = node->shm_;
         for (auto &msg : msgs) {
            if (msg.shm().name() == shm.name()) {
               DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
               return;
            }
         }
         MsgI msg(shm);
         if (msg.Make(body_content)) {
            RecordMsg(msg);
            msgs.push_back(msg);
            DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
         }
      };
      auto node = cli.weak_node_.lock();
      if (node) {
         Send1(node);
         // should also make sure that mq is not killed before msg expires.
         // it would be ok if (kill_time - offline_time) is longer than expire time.
      }
   }
   return true;
}
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
{
   Node node(GetNode(dest.id_));
@@ -469,11 +506,16 @@
      *info->mutable_proc() = node->proc_;
      info->mutable_proc()->clear_private_info();
      info->set_online(node->state_.flag_ == kStateNormal);
      for (auto &addr_topics : node->services_) {
         for (auto &topic : addr_topics.second) {
            info->mutable_topics()->add_topic_list(topic);
      auto AddTopics = [](auto &dst, auto &src) {
         for (auto &addr_topics : src) {
            for (auto &topic : addr_topics.second) {
               dst.add_topic_list(topic);
            }
         }
      }
      };
      AddTopics(*info->mutable_service(), node->services_);
      AddTopics(*info->mutable_local_sub(), node->local_sub_);
      AddTopics(*info->mutable_net_sub(), node->net_sub_);
   };
   if (!proc_id.empty()) {
@@ -532,35 +574,50 @@
   return HandleMsg<Reply>(head, query);
}
void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
{
   auto src = SrcAddr(head);
   auto Sub = [&](auto &sub, auto &sub_map) {
      auto &topics = msg.topics().topic_list();
      sub[src].insert(topics.begin(), topics.end());
      const TopicDest &dest = {src, SrcAbsAddr(head), node};
      for (auto &topic : topics) {
         sub_map[topic].insert(dest);
      }
   };
   LOG_DEBUG() << "subscribe net : " << msg.network();
   if (msg.network()) {
      Sub(net_sub_, center_.net_sub_map_);
   } else {
      Sub(local_sub_, center_.local_sub_map_);
   }
}
MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
{
   return HandleMsg(head, [&](Node node) {
      auto src = SrcAddr(head);
      auto &topics = msg.topics().topic_list();
      node->subscriptions_[src].insert(topics.begin(), topics.end());
      TopicDest dest = {src, SrcAbsAddr(head), node};
      for (auto &topic : topics) {
         subscribe_map_[topic].insert(dest);
      }
      node->Subscribe(head, msg, node);
      return MakeReply(eSuccess);
   });
}
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
   return HandleMsg(head, [&](Node node) {
      auto src = SrcAddr(head);
      auto pos = node->subscriptions_.find(src);
      auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
         auto pos = subscribe_map_.find(topic);
         if (pos != subscribe_map_.end() &&
void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
{
   auto src = SrcAddr(head);
   auto Unsub = [&](auto &sub, auto &sub_map) {
      auto pos = sub.find(src);
      auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
         auto pos = sub_map.find(topic);
         if (pos != sub_map.end() &&
             pos->second.erase(dest) != 0 &&
             pos->second.empty()) {
            subscribe_map_.erase(pos);
            sub_map.erase(pos);
         }
      };
      if (pos != node->subscriptions_.end()) {
      if (pos != sub.end()) {
         const TopicDest &dest = {src, SrcAbsAddr(head), node};
         auto &topics = msg.topics().topic_list();
         // clear node sub records;
@@ -569,26 +626,44 @@
            RemoveSubTopicDestRecord(topic, dest);
         }
         if (pos->second.empty()) {
            node->subscriptions_.erase(pos);
            sub.erase(pos);
         }
      }
   };
   if (msg.network()) {
      Unsub(net_sub_, center_.net_sub_map_);
   } else {
      Unsub(local_sub_, center_.local_sub_map_);
   }
}
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
   return HandleMsg(head, [&](Node node) {
      node->Unsubscribe(head, msg, node);
      return MakeReply(eSuccess);
   });
}
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
   Clients dests;
   auto Find1 = [&](const std::string &exact) {
      auto pos = subscribe_map_.find(exact);
      if (pos != subscribe_map_.end()) {
         auto &clients = pos->second;
         for (auto &cli : clients) {
            if (Valid(cli.weak_node_)) {
               dests.insert(cli);
      auto FindIn = [&](auto &sub_map) {
         auto pos = sub_map.find(exact);
         if (pos != sub_map.end()) {
            auto &clients = pos->second;
            for (auto &cli : clients) {
               if (Valid(cli.weak_node_)) {
                  dests.insert(cli);
               }
            }
         }
      };
      if (!from_remote) {
         FindIn(local_sub_map_);
      }
      FindIn(net_sub_map_);
   };
   Find1(topic);
@@ -605,15 +680,31 @@
   return dests;
}
bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
{
   bool ret = false;
   HandleMsg(head, [&](Node node) {
      DoFindClients(msg.topic()).swap(out);
      ret = true;
   return HandleMsg(head, [&](Node node) {
      DoPublish(DefaultSender(node->shm_), topic, msg);
      return MakeReply(eSuccess);
   }).Swap(&reply);
   return ret;
   });
}
void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
{
   try {
      auto clients = DoFindClients(topic, false);
      if (clients.empty()) { return; }
      for (auto &cli : clients) {
         auto node = cli.weak_node_.lock();
         if (node) {
            // should also make sure that mq is not killed before msg expires.
            // it would be ok if (kill_time - offline_time) is longer than expire time.
            sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
         }
      }
   } catch (...) {
      LOG_ERROR() << "DoPublish error.";
   }
}
void NodeCenter::OnTimer()
@@ -659,7 +750,8 @@
      }
   };
   EraseMapRec(service_map_, node->services_);
   EraseMapRec(subscribe_map_, node->subscriptions_);
   EraseMapRec(local_sub_map_, node->local_sub_);
   EraseMapRec(net_sub_map_, node->net_sub_);
   // remove online record.
   auto pos = online_node_addr_map_.find(node->proc_.proc_id());
@@ -681,10 +773,6 @@
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
   try {
      // LOG_DEBUG() << "center publish: " << topic << ": " << content;
      Clients clients(DoFindClients(topic));
      if (clients.empty()) { return; }
      MsgPublish pub;
      pub.set_topic(topic);
      pub.set_data(content);
@@ -693,16 +781,16 @@
      if (msg.Make(head, pub)) {
         DEFER1(msg.Release());
         RecordMsg(msg);
         for (auto &cli : clients) {
            auto node = cli.weak_node_.lock();
            if (node && node->state_.flag_ == kStateNormal) {
               DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
            }
         }
         DoPublish(DefaultSender(shm), topic, msg);
      }
   } catch (...) {
      LOG_ERROR() << "center publish error.";
   }
}
std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
{
   //TODO search synced full list;
   return std::vector<std::string>();
}