lichao
2021-05-18 e54b8e58780c7d9f37b06cc4e1dc88badb2129c9
src/topic_node.cpp
@@ -107,6 +107,14 @@
               }
               SetProcIndex(reply.proc_index());
               this->state_ = eStateUnregistered;
               auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
                  server_buffer_->Write(std::move(head), msg.body());
               };
               SockServer().Start(onRequest);
               auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
                  sub_buffer_->Write(std::move(head), msg.body());
               };
               SockSub().Start(onSub);
            }
         } break;
         default: break;
@@ -341,6 +349,7 @@
bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
{
   if (acb) {
   auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
      if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
      MsgRequestTopic req;
@@ -359,8 +368,13 @@
      }
   };
   auto &sock = SockServer();
   return acb && sock.Start(onRecv, nworker);
      return SockServer().Start(onRecv, nworker);
   } else {
      auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
         server_buffer_->Write(std::move(head), msg.body());
      };
      return SockServer().Start(onRequest, nworker);
   }
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -369,13 +383,19 @@
      SetLastError(eNotRegistered, kErrMsgNotRegistered);
      return false;
   }
   auto &sock = SockServer();
   MsgI imsg;
   BHMsgHead head;
   if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
      if (imsg.ParseBody(request)) {
   std::string body;
   auto end_time = steady_clock::now() + milliseconds(timeout_ms);
   while (!server_buffer_->Read(head, body)) {
      if (steady_clock::now() < end_time) {
         robust::QuickSleep();
      } else {
         return false;
      }
   }
   if (head.type() == kMsgTypeRequestTopic) {
      if (request.ParseFromString(body)) {
         head.mutable_proc_id()->swap(proc_id);
         try {
            SrcInfo *p = new SrcInfo;
@@ -614,8 +634,7 @@
bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker)
{
   auto &sock = SockSub();
   if (tdcb) {
   auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
      if (head.type() == kMsgTypePublish) {
         MsgPublish pub;
@@ -626,8 +645,13 @@
         // ignored, or dropped
      }
   };
   return tdcb && sock.Start(AsyncRecvProc, nworker);
      return SockSub().Start(AsyncRecvProc, nworker);
   } else {
      auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
         sub_buffer_->Write(std::move(head), msg.body());
      };
      return SockSub().Start(onSub, nworker);
   }
}
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
@@ -637,13 +661,19 @@
      return false;
   }
   auto &sock = SockSub();
   MsgI msg;
   DEFER1(msg.Release(););
   BHMsgHead head;
   std::string body;
   auto end_time = steady_clock::now() + milliseconds(timeout_ms);
   while (!sub_buffer_->Read(head, body)) {
      if (steady_clock::now() < end_time) {
         robust::QuickSleep();
      } else {
         return false;
      }
   }
   //TODO error msg.
   if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
      if (msg.ParseBody(pub)) {
   if (head.type() == kMsgTypePublish) {
      if (pub.ParseFromString(body)) {
         head.mutable_proc_id()->swap(proc_id);
         return true;
      }