liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/topic_node.cpp
@@ -29,6 +29,11 @@
namespace
{
bool ValidUserSymbol(const std::string &s)
{
   return !s.empty() && s[0] != '#' && s[0] != '@';
}
inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
{
   auto route = head.add_route();
@@ -45,8 +50,8 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), state_(eStateUninited)
TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
    shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
{
}
@@ -143,8 +148,13 @@
   for (auto &p : sockets_) { p->Stop(); }
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!internal && !ValidUserSymbol(proc.proc_id())) {
      SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'");
      return false;
   }
   {
      std::lock_guard<std::mutex> lk(mutex_);
      info_ = proc;
@@ -309,8 +319,17 @@
           reply.ParseBody(reply_body));
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!internal) {
      for (auto &&topic : topics.topic_list()) {
         if (!ValidUserSymbol(topic)) {
            SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'");
            return false;
         }
      }
   }
   if (!IsOnline()) {
      SetLastError(eNotRegistered, kErrMsgNotRegistered);
      return false;
@@ -478,9 +497,10 @@
   out_msg_id = msg_id;
   auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
   auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
      auto &sock = SockClient();
      BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
      *head.mutable_dest() = remote_addr;
      AddRoute(head, sock);
      head.set_topic(req.topic());
@@ -500,8 +520,12 @@
   };
   try {
      BHAddress addr;
      return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
      if (remote_addr.ip().empty()) {
         BHAddress addr;
         return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
      } else {
         return SendTo(CenterAddr(), req, cb);
      }
   } catch (...) {
      SetLastError(eError, "internal error.");
      return false;
@@ -517,26 +541,36 @@
   try {
      auto &sock = SockClient();
      BHAddress addr;
      if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
         LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
         BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
         AddRoute(head, sock);
         head.set_topic(request.topic());
         MsgI reply_msg(shm());
         DEFER1(reply_msg.Release(););
         BHMsgHead reply_head;
         if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
             reply_head.type() == kMsgTypeRequestTopicReply &&
             reply_msg.ParseBody(out_reply)) {
            reply_head.mutable_proc_id()->swap(out_proc_id);
            return true;
      MQInfo dest;
      if (!remote_addr.ip().empty()) {
         dest = CenterAddr();
      } else {
         BHAddress addr;
         if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            dest.offset_ = addr.abs_addr();
            dest.id_ = addr.mq_id();
         } else {
            return false;
         }
      }
   } catch (...) {
      BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
      *head.mutable_dest() = remote_addr;
      AddRoute(head, sock);
      head.set_topic(request.topic());
      MsgI reply_msg(shm());
      DEFER1(reply_msg.Release(););
      BHMsgHead reply_head;
      if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
          reply_head.type() == kMsgTypeRequestTopicReply &&
          reply_msg.ParseBody(out_reply)) {
         reply_head.mutable_proc_id()->swap(out_proc_id);
         return true;
      }
   } catch (std::exception &e) {
      LOG_ERROR() << __func__ << " exception: " << e.what();
      SetLastError(eError, __func__ + std::string(" internal errer."));
   }
   return false;
@@ -594,6 +628,7 @@
      auto &sock = SockPub();
      BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
      AddRoute(head, sock);
      head.set_topic(pub.topic());
      if (timeout_ms == 0) {
         return sock.Send(BusAddr(), head, pub);
@@ -614,7 +649,7 @@
// subscribe
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsOnline()) {
      SetLastError(eNotRegistered, kErrMsgNotRegistered);
@@ -624,6 +659,7 @@
   try {
      auto &sock = SockSub();
      MsgSubscribe sub;
      sub.set_network(net);
      sub.mutable_topics()->Swap(&topics);
      BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
@@ -639,7 +675,6 @@
                reply.ParseBody(reply_body) &&
                IsSuccess(reply_body.errmsg().errcode());
      }
      // TODO wait for result?
   } catch (...) {
      return false;
   }
@@ -685,12 +720,12 @@
         return false;
      }
   }
   //TODO error msg.
   if (head.type() == kMsgTypePublish) {
      if (pub.ParseFromString(body)) {
         head.mutable_proc_id()->swap(proc_id);
         return true;
      }
   }
   SetLastError(eError, "invalid subcribe msg received.");
   return false;
}