From 377e395a5fdc6ad44bdd5a2d41d2930f45fc4384 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 18:25:33 +0800 Subject: [PATCH] add node init msg, alloc msgq on success. --- src/topic_node.cpp | 63 ++++++++++++++++++++++++------- 1 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index d274c4b..f629597 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -37,30 +37,52 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) + shm_(shm), state_(eStateUnregistered) { - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_[i].reset(new ShmSocket(shm_, kMqLen)); - } - // recv msgs to avoid memory leak. - auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; - SockNode().Start(default_ignore_msg); - // for (auto &p : sockets_) { - // p->Start(default_ignore_msg); - // } + Init(); } TopicNode::~TopicNode() { + printf("~TopicNode()\n"); Stop(); - SockNode().Stop(); - if (state() == eStateUnregistered) { - for (auto &p : sockets_) { p->Remove(); } +} + +bool TopicNode::Init() +{ + std::lock_guard<std::mutex> lk(mutex_); + + if (Valid()) { + return true; } + + if (ssn_id_ == 0) { + ssn_id_ = ShmMsgQueue::NewId(); + } + printf("Node Init, id %ld \n", ssn_id_); + MsgI msg; + msg.OffsetRef() = ssn_id_; + if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { + sockets_.resize(eSockEnd); + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); + } + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); + return true; + } + return false; } void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { + std::lock_guard<std::mutex> lk(mutex_); + + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return; + } if (nworker < 1) { nworker = 1; } else if (nworker > 16) { @@ -73,11 +95,18 @@ } void TopicNode::Stop() { + printf("Node Stopping\n"); for (auto &p : sockets_) { p->Stop(); } + printf("Node Stopped\n"); } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return false; + } + info_ = proc; auto &sock = SockNode(); @@ -123,6 +152,11 @@ } bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsOnline()) { + SetLastError(eNotRegistered, "Not Registered."); + return false; + } + info_.Clear(); state_cas(eStateOnline, eStateOffline); @@ -404,8 +438,6 @@ reply_head.mutable_proc_id()->swap(out_proc_id); return true; } - } else { - SetLastError(eNotFound, "remote not found."); } } catch (...) { SetLastError(eError, __func__ + std::string(" internal errer.")); @@ -448,6 +480,7 @@ return true; } } + SetLastError(eNotFound, "remote not found."); return false; } -- Gitblit v1.8.0