lichao
2021-04-13 65ef4d68321e56906920be75831b5e968f7abd7b
src/topic_node.cpp
@@ -39,17 +39,21 @@
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
   SockNode().Start();
   SockClient().Start();
   SockServer().Start();
   Start();
}
TopicNode::~TopicNode()
{
   StopAll();
   Stop();
}
void TopicNode::StopAll()
void TopicNode::Start()
{
   SockNode().Start();
   SockClient().Start();
   SockServer().Start();
}
void TopicNode::Stop()
{
   SockServer().Stop();
   SockClient().Stop();
@@ -76,12 +80,39 @@
   BHMsgHead reply_head;
   bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
   r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
   if (r) {
   if (r && IsSuccess(reply_body.errmsg().errcode())) {
      info_ = body;
   }
   return r;
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
   auto &sock = SockNode();
   MsgHeartbeat body;
   *body.mutable_proc() = proc;
   auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
   AddRoute(head, sock.id());
   MsgI reply;
   DEFER1(reply.Release(shm_););
   BHMsgHead reply_head;
   bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
   r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
   if (r && IsSuccess(reply_body.errmsg().errcode())) {
      // TODO update proc info
   }
   return r;
}
bool TopicNode::Heartbeat(const int timeout_ms)
{
   ProcInfo proc;
   proc.set_proc_id(proc_id());
   MsgCommonReply reply_body;
   return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode());
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   //TODO check registered