lichao
2021-04-09 2197cf91e7a3bd5941327ba630a42946b88f069e
src/center.cpp
@@ -336,7 +336,7 @@
   auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
      return [&](auto &&rep_body) {
         auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
         bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
         bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
         if (!r) {
            printf("send reply failed.\n");
         }
@@ -364,18 +364,20 @@
         MsgPublish pub;
         NodeCenter::Clients clients;
         MsgCommonReply reply;
         MsgI pubmsg;
         if (head.route_size() != 1 || !msg.ParseBody(pub)) {
            return;
         } else if (!center->FindClients(head, pub, clients, reply)) {
            // send error reply.
            MakeReplyer(socket, head, center->id())(reply);
         } else if (pubmsg.MakeRC(socket.shm(), msg)) {
            DEFER1(pubmsg.Release(socket.shm()));
         } else {
            MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
            if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
            for (auto &cli : clients) {
               auto node = cli.weak_node_.lock();
               if (node) {
                  socket.Send(cli.mq_.data(), pubmsg, 10);
                  if (!socket.Send(cli.mq_.data(), msg, 100)) {
                     printf("center route publish failed. need resend.\n");
                  }
               }
            }
         }