lichao
2021-04-21 3931f83205f153f2bc7fc36d1a894cdc3f14b4db
src/topic_node.cpp
@@ -32,28 +32,29 @@
   std::string msg_id;
};
const int kMqLen = 700;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_client_(shm), sock_server_(shm), sock_sub_(shm), registered_(false), registered_ever_(false)
    shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
{
   for (int i = eSockStart; i < eSockEnd; ++i) {
      sockets_[i].reset(new ShmSocket(shm_, kMqLen));
   }
   // recv msgs to avoid memory leak.
   auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
   SockNode().Start(default_ignore_msg);
   SockClient().Start(default_ignore_msg);
   SockServer().Start(default_ignore_msg);
   SockSub().Start(default_ignore_msg);
   for (auto &p : sockets_) {
      p->Start(default_ignore_msg);
   }
}
TopicNode::~TopicNode()
{
   Stop();
   SockNode().Stop();
   if (!registered_ever_) {
      SockNode().Remove();
      SockClient().Remove();
      SockServer().Remove();
      SockSub().Remove();
   if (state() == eStateUnregistered) {
      for (auto &p : sockets_) { p->Remove(); }
   }
}
@@ -64,16 +65,14 @@
   } else if (nworker > 16) {
      nworker = 16;
   }
   SockNode().Start();
   ServerStart(server_cb, nworker);
   SubscribeStartWorker(sub_cb, nworker);
   ClientStartWorker(client_cb, nworker);
}
void TopicNode::Stop()
{
   SockSub().Stop();
   SockServer().Stop();
   SockClient().Stop();
   for (auto &p : sockets_) { p->Stop(); }
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
@@ -98,9 +97,10 @@
                msg.ParseBody(rbody) &&
                IsSuccess(rbody.errmsg().errcode());
      if (ok) {
         registered_ever_.store(true);
         state(eStateOnline);
      } else {
         state_cas(eStateOnline, eStateOffline);
      }
      registered_.store(ok);
   };
   if (timeout_ms == 0) {
@@ -117,13 +117,13 @@
      if (r) {
         CheckResult(reply, reply_head, reply_body);
      }
      return IsRegistered();
      return IsOnline();
   }
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -156,7 +156,7 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -223,7 +223,7 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -247,7 +247,7 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -285,7 +285,7 @@
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -351,7 +351,7 @@
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -386,7 +386,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -428,7 +428,7 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -459,7 +459,7 @@
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -508,7 +508,7 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }