lichao
2021-04-09 4e5cb7960ce4e7e66d5190be67426aeca8b55c3d
src/topic_node.cpp
@@ -92,9 +92,17 @@
   SockNode().Stop();
}
bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
   auto &sock = SockNode();
   MsgRegister body;
   *body.mutable_proc() = proc;
   auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
   AddId(SockNode().id());
   AddId(SockServer().id());
   AddId(SockClient().id());
   AddId(SockSub().id());
   AddId(SockPub().id());
   auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
   AddRoute(head, sock.id());
@@ -110,10 +118,12 @@
   return r;
}
bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   //TODO check registered
   auto &sock = SockServer();
   MsgRegisterRPC body;
   body.mutable_topics()->Swap(&topics);
   auto head(InitMsgHead(GetType(body), proc_id()));
   AddRoute(head, sock.id());
@@ -361,14 +371,13 @@
// subscribe
bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms)
{
   try {
      auto &sock = SockSub();
      MsgSubscribe sub;
      for (auto &topic : topics) {
         sub.add_topics(topic);
      }
      sub.mutable_topics()->Swap(&topics);
      BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
      AddRoute(head, sock.id());