lichao
2021-05-28 9243710ca372de26823c2225c7b46b072458c671
src/topic_node.cpp
@@ -497,9 +497,10 @@
   out_msg_id = msg_id;
   auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
   auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
      auto &sock = SockClient();
      BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
      *head.mutable_dest() = remote_addr;
      AddRoute(head, sock);
      head.set_topic(req.topic());
@@ -519,8 +520,12 @@
   };
   try {
      if (remote_addr.ip().empty()) {
      BHAddress addr;
      return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
      } else {
         return SendTo(CenterAddr(), req, cb);
      }
   } catch (...) {
      SetLastError(eError, "internal error.");
      return false;
@@ -536,11 +541,21 @@
   try {
      auto &sock = SockClient();
      MQInfo dest;
      if (!remote_addr.ip().empty()) {
         dest = CenterAddr();
      } else {
      BHAddress addr;
      if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
         LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
            dest.offset_ = addr.abs_addr();
            dest.id_ = addr.mq_id();
         } else {
            return false;
         }
      }
         BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
      *head.mutable_dest() = remote_addr;
         AddRoute(head, sock);
         head.set_topic(request.topic());
@@ -548,12 +563,11 @@
         DEFER1(reply_msg.Release(););
         BHMsgHead reply_head;
         if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
      if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
             reply_head.type() == kMsgTypeRequestTopicReply &&
             reply_msg.ParseBody(out_reply)) {
            reply_head.mutable_proc_id()->swap(out_proc_id);
            return true;
         }
      }
   } catch (...) {
      SetLastError(eError, __func__ + std::string(" internal errer."));