From 0117d5f8ff386075b0c4cbec0cbe460fe3cfa680 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 06 五月 2021 18:57:25 +0800 Subject: [PATCH] add logging, use boost.log. --- src/topic_node.cpp | 65 ++++++++++++++++++++++++-------- 1 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 1131816..c31cfc3 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -37,30 +37,52 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) + shm_(shm), state_(eStateUnregistered) { - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_[i].reset(new ShmSocket(shm_, kMqLen)); - } - // recv msgs to avoid memory leak. - auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; - SockNode().Start(default_ignore_msg); - // for (auto &p : sockets_) { - // p->Start(default_ignore_msg); - // } + Init(); } TopicNode::~TopicNode() { + LOG_DEBUG() << "~TopicNode()"; Stop(); - SockNode().Stop(); - if (state() == eStateUnregistered) { - for (auto &p : sockets_) { p->Remove(); } +} + +bool TopicNode::Init() +{ + std::lock_guard<std::mutex> lk(mutex_); + + if (Valid()) { + return true; } + + if (ssn_id_ == 0) { + ssn_id_ = ShmMsgQueue::NewId(); + } + LOG_DEBUG() << "Node Init, id " << ssn_id_; + MsgI msg; + msg.OffsetRef() = ssn_id_; + if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { + sockets_.resize(eSockEnd); + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); + } + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); + return true; + } + return false; } void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { + std::lock_guard<std::mutex> lk(mutex_); + + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return; + } if (nworker < 1) { nworker = 1; } else if (nworker > 16) { @@ -73,11 +95,18 @@ } void TopicNode::Stop() { + LOG_DEBUG() << "Node Stopping"; for (auto &p : sockets_) { p->Stop(); } + LOG_INFO() << "Node Stopped"; } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return false; + } + info_ = proc; auto &sock = SockNode(); @@ -123,6 +152,11 @@ } bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsOnline()) { + SetLastError(eNotRegistered, "Not Registered."); + return false; + } + info_.Clear(); state_cas(eStateOnline, eStateOffline); @@ -389,7 +423,7 @@ BHAddress addr; if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { - printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); + LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id(); BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); AddRoute(head, sock.id()); head.set_topic(request.topic()); @@ -404,8 +438,6 @@ reply_head.mutable_proc_id()->swap(out_proc_id); return true; } - } else { - SetLastError(eNotFound, "remote not found."); } } catch (...) { SetLastError(eError, __func__ + std::string(" internal errer.")); @@ -448,6 +480,7 @@ return true; } } + SetLastError(eNotFound, "remote not found."); return false; } -- Gitblit v1.8.0