lichao
2021-06-23 c1e39e20ca42b21eeac8b5068fa1f921bf9a070f
box/center.cpp
@@ -135,27 +135,18 @@
   auto OnBusIdle = [=](ShmSocket &socket) {};
   auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
   auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
   auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
      auto &center = *center_ptr;
      auto replyer = MakeReplyer(socket, head, center);
      auto OnPublish = [&]() {
         MsgPublish pub;
         NodeCenter::Clients clients;
         MsgCommonReply reply;
         if (head.route_size() != 1 || !msg.ParseBody(pub)) {
            return;
         } else if (!center->FindClients(head, pub, clients, reply)) {
         if (head.route_size() == 1 && msg.ParseBody(pub)) {
            // replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
            auto reply(center->Publish(head, pub.topic(), msg));
            replyer(reply);
         } else {
            replyer(MakeReply(eSuccess));
            if (clients.empty()) { return; }
            for (auto &cli : clients) {
               auto node = cli.weak_node_.lock();
               if (node) {
                  // should also make sure that mq is not killed before msg expires.
                  // it would be ok if (kill_time - offline_time) is longer than expire time.
                  socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
               }
            auto hosts = center->FindRemoteSubClients(pub.topic());
            for (auto &host : hosts) {
               tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
            }
         }
      };