liuxiaolong
2021-04-21 d1f89c1eb52a08b97d5be5dd991c5991b6f0bf93
src/topic_node.cpp
@@ -32,10 +32,12 @@
   std::string msg_id;
};
const int kMqLen = 700;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_client_(shm), sock_server_(shm), sock_sub_(shm), registered_(false), registered_ever_(false)
    shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
{
   // recv msgs to avoid memory leak.
   auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
@@ -49,7 +51,7 @@
{
   Stop();
   SockNode().Stop();
   if (!registered_ever_) {
   if (state() == eStateUnregistered) {
      SockNode().Remove();
      SockClient().Remove();
      SockServer().Remove();
@@ -98,9 +100,10 @@
                msg.ParseBody(rbody) &&
                IsSuccess(rbody.errmsg().errcode());
      if (ok) {
         registered_ever_.store(true);
         state(eStateOnline);
      } else {
         state_cas(eStateOnline, eStateOffline);
      }
      registered_.store(ok);
   };
   if (timeout_ms == 0) {
@@ -117,13 +120,13 @@
      if (r) {
         CheckResult(reply, reply_head, reply_body);
      }
      return IsRegistered();
      return IsOnline();
   }
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -156,7 +159,7 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -223,7 +226,7 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -247,7 +250,7 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -285,7 +288,7 @@
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -351,7 +354,7 @@
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -386,7 +389,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -428,7 +431,7 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -459,7 +462,7 @@
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }
@@ -508,7 +511,7 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
   if (!IsRegistered()) {
   if (!IsOnline()) {
      SetLastError(eNotRegistered, "Not Registered.");
      return false;
   }