lichao
2021-04-14 aa1542b6d6a4680088ac715c4ce40f97ada554fb
src/topic_node.cpp
@@ -17,7 +17,6 @@
 */
#include "topic_node.h"
#include "bh_util.h"
#include "failed_msg.h"
#include <chrono>
#include <list>
@@ -33,9 +32,8 @@
   std::string msg_id;
};
typedef FailedMsgQ ServerFailedQ;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
@@ -76,15 +74,20 @@
   auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
   AddRoute(head, sock.id());
   MsgI reply;
   DEFER1(reply.Release(shm_););
   BHMsgHead reply_head;
   bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
   r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
   if (r && IsSuccess(reply_body.errmsg().errcode())) {
      info_ = body;
   if (timeout_ms == 0) {
      return sock.Send(&BHTopicCenterAddress(), head, body);
   } else {
      MsgI reply;
      DEFER1(reply.Release(shm_););
      BHMsgHead reply_head;
      bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
      r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
      if (r && IsSuccess(reply_body.errmsg().errcode())) {
         info_ = body;
         return true;
      }
      return false;
   }
   return r;
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
@@ -96,22 +99,23 @@
   auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
   AddRoute(head, sock.id());
   MsgI reply;
   DEFER1(reply.Release(shm_););
   BHMsgHead reply_head;
   bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
   r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
   if (r && IsSuccess(reply_body.errmsg().errcode())) {
      // TODO update proc info
   if (timeout_ms == 0) {
      return sock.Send(&BHTopicCenterAddress(), head, body);
   } else {
      MsgI reply;
      DEFER1(reply.Release(shm_););
      BHMsgHead reply_head;
      bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
      r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
      return (r && IsSuccess(reply_body.errmsg().errcode()));
   }
   return r;
}
bool TopicNode::Heartbeat(const int timeout_ms)
{
   ProcInfo proc;
   proc.set_proc_id(proc_id());
   MsgCommonReply reply_body;
   return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode());
   return Heartbeat(proc, reply_body, timeout_ms);
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
@@ -124,50 +128,43 @@
   auto head(InitMsgHead(GetType(body), proc_id()));
   AddRoute(head, sock.id());
   MsgI reply;
   DEFER1(reply.Release(shm_););
   BHMsgHead reply_head;
   bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
   r = r && reply_head.type() == kMsgTypeCommonReply;
   r = r && reply.ParseBody(reply_body);
   return r;
   if (timeout_ms == 0) {
      return sock.Send(&BHTopicCenterAddress(), head, body);
   } else {
      MsgI reply;
      DEFER1(reply.Release(shm_););
      BHMsgHead reply_head;
      bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
      r = r && reply_head.type() == kMsgTypeCommonReply;
      r = r && reply.ParseBody(reply_body);
      return r;
   }
}
bool TopicNode::ServerStart(const ServerCB &rcb, int nworker)
{
   auto failed_q = std::make_shared<ServerFailedQ>();
   auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
      if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
      MsgRequestTopic req;
      if (!imsg.ParseBody(req)) { return; }
   auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
      MsgRequestTopicReply reply_body;
      if (rcb(head.proc_id(), req, reply_body)) {
         BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
   auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
      if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
         MsgRequestTopic req;
         if (imsg.ParseBody(req)) {
            MsgRequestTopicReply reply_body;
            if (rcb(head.proc_id(), req, reply_body)) {
               BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
               for (int i = 0; i < head.route_size() - 1; ++i) {
                  reply_head.add_route()->Swap(head.mutable_route(i));
               }
               MsgI msg;
               if (msg.Make(sock.shm(), reply_head, reply_body)) {
                  auto &remote = head.route().rbegin()->mq_id();
                  if (!sock.Send(remote.data(), msg, 10)) {
                     failed_q->Push(remote, msg, 10s);
                  }
               }
            }
         for (int i = 0; i < head.route_size() - 1; ++i) {
            reply_head.add_route()->Swap(head.mutable_route(i));
         }
      } else {
         // ignored, or dropped
         MsgI msg;
         if (msg.Make(sock.shm(), reply_head, reply_body)) {
            auto &remote = head.route().rbegin()->mq_id();
            sock.Send(remote.data(), msg);
         }
      }
      onIdle(sock);
   };
   auto &sock = SockServer();
   return rcb && sock.Start(onRecv, onIdle, nworker);
   return rcb && sock.Start(onRecv, nworker);
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -189,7 +186,7 @@
   return false;
}
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms)
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
   auto &sock = SockServer();
@@ -202,7 +199,7 @@
   for (unsigned i = 0; i < p->route.size() - 1; ++i) {
      head.add_route()->Swap(&p->route[i]);
   }
   return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
   return sock.Send(p->route.back().mq_id().data(), head, body);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -222,7 +219,7 @@
   return SockRequest().Start(onData, nworker);
}
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb)
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb)
{
   auto Call = [&](const void *remote) {
      auto &sock = SockRequest();
@@ -239,15 +236,15 @@
               }
            }
         };
         return sock.Send(remote, head, req, timeout_ms, onRecv);
         return sock.Send(remote, head, req, onRecv);
      } else {
         return sock.Send(remote, head, req, timeout_ms);
         return sock.Send(remote, head, req);
      }
   };
   try {
      BHAddress addr;
      if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) {
      if (ClientQueryRPCTopic(req.topic(), addr, 1000)) {
         return Call(addr.mq_id().data());
      } else {
         SetLastError(eNotFound, "remote not found.");
@@ -333,14 +330,18 @@
      BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
      AddRoute(head, sock.id());
      MsgI reply;
      DEFER1(reply.Release(shm()););
      BHMsgHead reply_head;
      MsgCommonReply reply_body;
      return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
             reply_head.type() == kMsgTypeCommonReply &&
             reply.ParseBody(reply_body) &&
             IsSuccess(reply_body.errmsg().errcode());
      if (timeout_ms == 0) {
         return sock.Send(&BHTopicBusAddress(), head, pub);
      } else {
         MsgI reply;
         DEFER1(reply.Release(shm()););
         BHMsgHead reply_head;
         MsgCommonReply reply_body;
         return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
                reply_head.type() == kMsgTypeCommonReply &&
                reply.ParseBody(reply_body) &&
                IsSuccess(reply_body.errmsg().errcode());
      }
   } catch (...) {
   }
   return false;
@@ -357,8 +358,19 @@
      BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
      AddRoute(head, sock.id());
      return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms);
      if (timeout_ms == 0) {
         return sock.Send(&BHTopicBusAddress(), head, sub);
      } else {
         MsgI reply;
         DEFER1(reply.Release(shm()););
         BHMsgHead reply_head;
         MsgCommonReply reply_body;
         return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
                reply_head.type() == kMsgTypeCommonReply &&
                reply.ParseBody(reply_body) &&
                IsSuccess(reply_body.errmsg().errcode());
      }
      // TODO wait for result?
   } catch (...) {
      return false;
   }