From 377e395a5fdc6ad44bdd5a2d41d2930f45fc4384 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 18:25:33 +0800 Subject: [PATCH] add node init msg, alloc msgq on success. --- src/socket.h | 2 utest/api_test.cpp | 2 box/center.cpp | 75 ++++++++++++++---- src/socket.cpp | 42 +++++++++ src/defs.h | 2 src/topic_node.h | 7 + box/center.h | 3 src/topic_node.cpp | 63 ++++++++++++--- src/shm_msg_queue.cpp | 2 9 files changed, 159 insertions(+), 39 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0952ca7..aa6f285 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -103,7 +103,22 @@ // center name, no relative to shm. const std::string &id() const { return id_; } + void OnNodeInit(const int64_t msg) + { + MQId ssn = msg; + auto UpdateRegInfo = [&](Node &node) { + for (int i = 0; i < 10; ++i) { + node->addrs_.insert(ssn + i); + } + node->state_.timestamp_ = NowSec() - offline_time_; + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + }; + Node node(new NodeInfo); + UpdateRegInfo(node); + nodes_[ssn] = node; + printf("new node ssn (%ld) init\n", ssn); + } MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { @@ -132,17 +147,19 @@ Node node(new NodeInfo); UpdateRegInfo(node); nodes_[ssn] = node; + } + printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); - printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); - auto old = online_node_addr_map_.find(head.proc_id()); - if (old != online_node_addr_map_.end()) { // old session - auto &old_ssn = old->second; + auto old = online_node_addr_map_.find(head.proc_id()); + if (old != online_node_addr_map_.end()) { // old session + auto &old_ssn = old->second; + if (old_ssn != ssn) { nodes_[old_ssn]->state_.PutOffline(offline_time_); printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); old_ssn = ssn; - } else { - online_node_addr_map_.emplace(head.proc_id(), ssn); } + } else { + online_node_addr_map_.emplace(head.proc_id(), ssn); } return MakeReply(eSuccess); } catch (...) { @@ -446,16 +463,24 @@ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; -bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) +auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); - auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { - return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); - auto remote = head.route(0).mq_id(); - socket.Send(remote, reply_head, rep_body); - }; + return [&](auto &&rep_body) { + auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); + auto remote = head.route(0).mq_id(); + socket.Send(remote, reply_head, rep_body); }; +} + +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +{ + auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { + auto ¢er = *center_ptr; + center->OnNodeInit(msg.Offset()); + }; + auto Nothing = [](ShmSocket &socket) {}; + + BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; @@ -475,6 +500,7 @@ default: return false; } }; + BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { @@ -515,7 +541,6 @@ } }; - BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); return true; @@ -533,7 +558,12 @@ bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) { - Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; + return true; +} +bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) +{ + Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; return true; } @@ -541,10 +571,13 @@ { auto gc = [&](const MQId id) { auto r = ShmSocket::Remove(shm, id); - printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed")); + if (r) { + printf("remove mq %ld ok\n", id); + } }; - AddCenter("#bhome_center", gc); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2); + AddCenter(center_ptr); for (auto &kv : Centers()) { auto &info = kv.second; @@ -556,7 +589,11 @@ { for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_]->Start(info.handler_, info.idle_); + if (info.handler_) { + sockets_[info.name_]->Start(info.handler_, info.idle_); + } else { + sockets_[info.name_]->Start(info.raw_handler_, info.idle_); + } } return true; diff --git a/box/center.h b/box/center.h index ab8b15f..4d71bc9 100644 --- a/box/center.h +++ b/box/center.h @@ -29,8 +29,10 @@ public: typedef Socket::PartialRecvCB MsgHandler; + typedef Socket::RawRecvCB MsgIHandler; typedef Socket::IdleCB IdleHandler; static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); + static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); BHCenter(Socket::Shm &shm); ~BHCenter() { Stop(); } @@ -41,6 +43,7 @@ struct CenterInfo { std::string name_; MsgHandler handler_; + MsgIHandler raw_handler_; IdleHandler idle_; MQId mqid_; int mq_len_ = 0; diff --git a/src/defs.h b/src/defs.h index 43375bf..a95c81f 100644 --- a/src/defs.h +++ b/src/defs.h @@ -23,9 +23,11 @@ typedef uint64_t MQId; +const MQId kBHNodeInit = 10; const MQId kBHTopicCenter = 100; const MQId kBHTopicBus = 101; const MQId kBHUniCenter = 102; +inline const MQId BHInitAddress() { return kBHNodeInit; } inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } inline const MQId BHTopicBusAddress() { return kBHTopicBus; } inline const MQId BHUniCenterAddress() { return kBHUniCenter; } diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index cd8cd66..38c5f1c 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -34,7 +34,7 @@ ShmMsgQueue::MQId ShmMsgQueue::NewId() { static auto &id = GetData(); - return ++id; + return (++id) * 10; } // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : diff --git a/src/socket.cpp b/src/socket.cpp index 2127260..9580529 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -40,6 +40,44 @@ Stop(); } +bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker) +{ + auto ioProc = [this, onData, onIdle]() { + auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; + auto DoRecv = [=] { + // do not recv if no cb is set. + if (!onData) { + return false; + } + auto onMsg = [&](MsgI &imsg) { + DEFER1(imsg.Release()); + onData(*this, imsg); + }; + MsgI imsg; + return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; + }; + + try { + bool more_to_send = DoSend(); + bool more_to_recv = DoRecv(); + if (onIdle) { onIdle(*this); } + if (!more_to_send && !more_to_recv) { + robust::QuickSleep(); + } + } catch (...) { + } + }; + + std::lock_guard<std::mutex> lock(mutex_); + StopNoLock(); + + run_.store(true); + for (int i = 0; i < nworker; ++i) { + workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); + } + return true; +} + bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { auto ioProc = [this, onData, onIdle]() { @@ -74,9 +112,7 @@ bool more_to_recv = DoRecv(); if (onIdle) { onIdle(*this); } if (!more_to_send && !more_to_recv) { - std::this_thread::yield(); - using namespace std::chrono_literals; - std::this_thread::sleep_for(10000ns); + robust::QuickSleep(); } } catch (...) { } diff --git a/src/socket.h b/src/socket.h index a5dd72c..bd85fec 100644 --- a/src/socket.h +++ b/src/socket.h @@ -42,6 +42,7 @@ public: typedef ShmMsgQueue::MQId MQId; typedef bhome_shm::SharedMemory Shm; + typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB; typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; typedef std::function<void(ShmSocket &sock)> IdleCB; @@ -53,6 +54,7 @@ bool Remove() { return Remove(shm(), id()); } MQId id() const { return mq().Id(); } // start recv. + bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1); bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } diff --git a/src/topic_node.cpp b/src/topic_node.cpp index d274c4b..f629597 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -37,30 +37,52 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) + shm_(shm), state_(eStateUnregistered) { - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_[i].reset(new ShmSocket(shm_, kMqLen)); - } - // recv msgs to avoid memory leak. - auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; - SockNode().Start(default_ignore_msg); - // for (auto &p : sockets_) { - // p->Start(default_ignore_msg); - // } + Init(); } TopicNode::~TopicNode() { + printf("~TopicNode()\n"); Stop(); - SockNode().Stop(); - if (state() == eStateUnregistered) { - for (auto &p : sockets_) { p->Remove(); } +} + +bool TopicNode::Init() +{ + std::lock_guard<std::mutex> lk(mutex_); + + if (Valid()) { + return true; } + + if (ssn_id_ == 0) { + ssn_id_ = ShmMsgQueue::NewId(); + } + printf("Node Init, id %ld \n", ssn_id_); + MsgI msg; + msg.OffsetRef() = ssn_id_; + if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { + sockets_.resize(eSockEnd); + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); + } + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); + return true; + } + return false; } void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { + std::lock_guard<std::mutex> lk(mutex_); + + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return; + } if (nworker < 1) { nworker = 1; } else if (nworker > 16) { @@ -73,11 +95,18 @@ } void TopicNode::Stop() { + printf("Node Stopping\n"); for (auto &p : sockets_) { p->Stop(); } + printf("Node Stopped\n"); } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!Init()) { + SetLastError(eError, "BHome Node Not Inited."); + return false; + } + info_ = proc; auto &sock = SockNode(); @@ -123,6 +152,11 @@ } bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsOnline()) { + SetLastError(eNotRegistered, "Not Registered."); + return false; + } + info_.Clear(); state_cas(eStateOnline, eStateOffline); @@ -404,8 +438,6 @@ reply_head.mutable_proc_id()->swap(out_proc_id); return true; } - } else { - SetLastError(eNotFound, "remote not found."); } } catch (...) { SetLastError(eError, __func__ + std::string(" internal errer.")); @@ -448,6 +480,7 @@ return true; } } + SetLastError(eNotFound, "remote not found."); return false; } diff --git a/src/topic_node.h b/src/topic_node.h index 3c90e5b..afce4fc 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -22,6 +22,7 @@ #include "socket.h" #include <atomic> #include <memory> +#include <mutex> #include <vector> using namespace bhome_shm; @@ -137,7 +138,11 @@ void state(const State st) { state_.store(st); } void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); } State state() const { return state_.load(); } - bool IsOnline() const { return state() == eStateOnline; } + bool IsOnline() { return Init() && state() == eStateOnline; } + bool Init(); + bool Valid() const { return !sockets_.empty(); } + std::mutex mutex_; + MQId ssn_id_ = 0; std::atomic<State> state_; TopicQueryCache topic_query_cache_; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index cf7baf9..38483eb 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -118,6 +118,8 @@ printf("maxsec: %ld\n", CountSeconds(max_time)); + // BHCleanup(); + // return; bool reg = false; for (int i = 0; i < 3 && !reg; ++i) { ProcInfo proc; -- Gitblit v1.8.0