liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/topic_node.cpp
@@ -50,8 +50,8 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), state_(eStateUninited)
TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
    shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
{
}
@@ -569,7 +569,8 @@
         reply_head.mutable_proc_id()->swap(out_proc_id);
         return true;
      }
   } catch (...) {
   } catch (std::exception &e) {
      LOG_ERROR() << __func__ << " exception: " << e.what();
      SetLastError(eError, __func__ + std::string(" internal errer."));
   }
   return false;
@@ -627,6 +628,7 @@
      auto &sock = SockPub();
      BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
      AddRoute(head, sock);
      head.set_topic(pub.topic());
      if (timeout_ms == 0) {
         return sock.Send(BusAddr(), head, pub);
@@ -647,7 +649,7 @@
// subscribe
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsOnline()) {
      SetLastError(eNotRegistered, kErrMsgNotRegistered);
@@ -657,6 +659,7 @@
   try {
      auto &sock = SockSub();
      MsgSubscribe sub;
      sub.set_network(net);
      sub.mutable_topics()->Swap(&topics);
      BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
@@ -672,7 +675,6 @@
                reply.ParseBody(reply_body) &&
                IsSuccess(reply_body.errmsg().errcode());
      }
      // TODO wait for result?
   } catch (...) {
      return false;
   }
@@ -718,12 +720,12 @@
         return false;
      }
   }
   //TODO error msg.
   if (head.type() == kMsgTypePublish) {
      if (pub.ParseFromString(body)) {
         head.mutable_proc_id()->swap(proc_id);
         return true;
      }
   }
   SetLastError(eError, "invalid subcribe msg received.");
   return false;
}