lichao
2021-04-23 02ba913dc7bb5d711471b27f2ea23a897d0f2e28
src/topic_node.cpp
@@ -111,7 +111,7 @@
      return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
   } else {
      MsgI reply;
      DEFER1(reply.Release(shm_););
      DEFER1(reply.Release(););
      BHMsgHead reply_head;
      bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
      if (r) {
@@ -139,7 +139,7 @@
      return sock.Send(&BHTopicCenterAddress(), head, body);
   } else {
      MsgI reply;
      DEFER1(reply.Release(shm_););
      DEFER1(reply.Release(););
      BHMsgHead reply_head;
      bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
      r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
@@ -172,7 +172,7 @@
      return sock.Send(&BHTopicCenterAddress(), head, body);
   } else {
      MsgI reply;
      DEFER1(reply.Release(shm_););
      DEFER1(reply.Release(););
      BHMsgHead reply_head;
      bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
      r = r && reply_head.type() == kMsgTypeCommonReply;
@@ -366,7 +366,7 @@
         head.set_topic(request.topic());
         MsgI reply_msg;
         DEFER1(reply_msg.Release(shm_););
         DEFER1(reply_msg.Release(););
         BHMsgHead reply_head;
         if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) &&
@@ -403,7 +403,7 @@
   AddRoute(head, sock.id());
   MsgI reply;
   DEFER1(reply.Release(shm_));
   DEFER1(reply.Release());
   BHMsgHead reply_head;
   if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) {
@@ -442,7 +442,7 @@
         return sock.Send(&BHTopicBusAddress(), head, pub);
      } else {
         MsgI reply;
         DEFER1(reply.Release(shm()););
         DEFER1(reply.Release(););
         BHMsgHead reply_head;
         MsgCommonReply reply_body;
         return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
@@ -475,7 +475,7 @@
         return sock.Send(&BHTopicBusAddress(), head, sub);
      } else {
         MsgI reply;
         DEFER1(reply.Release(shm()););
         DEFER1(reply.Release(););
         BHMsgHead reply_head;
         return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
                reply_head.type() == kMsgTypeCommonReply &&
@@ -515,7 +515,7 @@
   auto &sock = SockSub();
   MsgI msg;
   DEFER1(msg.Release(shm()););
   DEFER1(msg.Release(););
   BHMsgHead head;
   //TODO error msg.
   if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {