From 9eb924f64a56918536d310dd89698aa7e148c727 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 06 五月 2021 09:41:26 +0800 Subject: [PATCH] refactor atomic queue. --- src/topic_node.cpp | 65 ++++++++++++++++++++++++-------- 1 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 1131816..f629597 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -37,30 +37,52 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) + shm_(shm), state_(eStateUnregistered) { - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_[i].reset(new ShmSocket(shm_, kMqLen)); - } - // recv msgs to avoid memory leak. - auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; - SockNode().Start(default_ignore_msg); - // for (auto &p : sockets_) { - // p->Start(default_ignore_msg); - // } + Init(); } TopicNode::~TopicNode() { + printf("~TopicNode()\n"); Stop(); - SockNode().Stop(); - if (state() == eStateUnregistered) { - for (auto &p : sockets_) { p->Remove(); } +} + +bool TopicNode::Init() +{ + std::lock_guard<std::mutex> lk(mutex_); + + if (Valid()) { + return true; } + + if (ssn_id_ == 0) { + ssn_id_ = ShmMsgQueue::NewId(); + } + printf("Node Init, id %ld \n", ssn_id_); + MsgI msg; + msg.OffsetRef() = ssn_id_; + if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { + sockets_.resize(eSockEnd); + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); + } + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); + return true; + } + return false; } void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { + std::lock_guard<std::mutex> lk(mutex_); + + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return; + } if (nworker < 1) { nworker = 1; } else if (nworker > 16) { @@ -73,11 +95,18 @@ } void TopicNode::Stop() { + printf("Node Stopping\n"); for (auto &p : sockets_) { p->Stop(); } + printf("Node Stopped\n"); } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return false; + } + info_ = proc; auto &sock = SockNode(); @@ -123,6 +152,11 @@ } bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsOnline()) { + SetLastError(eNotRegistered, "Not Registered."); + return false; + } + info_.Clear(); state_cas(eStateOnline, eStateOffline); @@ -389,7 +423,7 @@ BHAddress addr; if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { - printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); + // printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); AddRoute(head, sock.id()); head.set_topic(request.topic()); @@ -404,8 +438,6 @@ reply_head.mutable_proc_id()->swap(out_proc_id); return true; } - } else { - SetLastError(eNotFound, "remote not found."); } } catch (...) { SetLastError(eError, __func__ + std::string(" internal errer.")); @@ -448,6 +480,7 @@ return true; } } + SetLastError(eNotFound, "remote not found."); return false; } -- Gitblit v1.8.0