From 36e6a35a886252516f168b90f7a9a7c1c5177312 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期六, 08 五月 2021 15:57:01 +0800 Subject: [PATCH] center alloc node queue; node just find them. --- src/socket.h | 15 +++-- box/center.cpp | 31 ++++++++- src/shm_msg_queue.h | 1 src/socket.cpp | 5 + src/topic_node.h | 2 src/topic_node.cpp | 56 +++++++++++------- src/shm_msg_queue.cpp | 12 ++- src/defs.cpp | 2 8 files changed, 85 insertions(+), 39 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index c57d34d..d6ac804 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -102,22 +102,43 @@ // center name, no relative to shm. const std::string &id() const { return id_; } - void OnNodeInit(const int64_t msg) + void OnNodeInit(SharedMemory &shm, const int64_t msg) { MQId ssn = msg; + if (nodes_.find(ssn) != nodes_.end()) { + return; // ignore in exists. + } + auto UpdateRegInfo = [&](Node &node) { for (int i = 0; i < 10; ++i) { node->addrs_.insert(ssn + i); } node->state_.timestamp_ = NowSec() - offline_time_; node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + + // create sockets. + try { + auto CreateSocket = [](SharedMemory &shm, const MQId id) { + ShmSocket tmp(shm, true, id, 16); + }; + // alloc(-1), node, server, sub, request, + for (int i = -1; i < 4; ++i) { + CreateSocket(shm, ssn + i); + node->addrs_.insert(ssn + i); + } + return true; + } catch (...) { + return false; + } }; Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[ssn] = node; - LOG_INFO() << "new node ssn (" << ssn << ") init"; + if (UpdateRegInfo(node)) { + nodes_[ssn] = node; + LOG_INFO() << "new node ssn (" << ssn << ") init"; + } } + MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { @@ -475,7 +496,7 @@ { auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { auto ¢er = *center_ptr; - center->OnNodeInit(msg.Offset()); + center->OnNodeInit(socket.shm(), msg.Offset()); }; auto Nothing = [](ShmSocket &socket) {}; diff --git a/src/defs.cpp b/src/defs.cpp index cc6f23b..6d688b2 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -50,7 +50,7 @@ MsgI::BindShm(shm); typedef std::atomic<MQId> IdSrc; IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); - return ShmMsgQueue::SetData(*psrc); + return psrc && ShmMsgQueue::SetData(*psrc); } void SetLastError(const int ec, const std::string &msg) diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index 38c5f1c..17558de 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -36,21 +36,23 @@ static auto &id = GetData(); return (++id) * 10; } -// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 + ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : id_(id), queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) { } -ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : - id_(NewId()), - queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) +ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) : + id_(id), + queue_(segment, create_or_else_find, MsgQIdToName(id_)) { if (!queue_.IsOk()) { - throw("error create msgq " + std::to_string(id_)); + throw("error create/find msgq " + std::to_string(id_)); } } +ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : + ShmMsgQueue(NewId(), true, segment, len) {} ShmMsgQueue::~ShmMsgQueue() {} diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index aff931c..f496f0f 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -38,6 +38,7 @@ static MQId NewId(); ShmMsgQueue(const MQId id, ShmType &segment, const int len); + ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len); ShmMsgQueue(ShmType &segment, const int len); ~ShmMsgQueue(); static bool Remove(SharedMemory &shm, const MQId id); diff --git a/src/socket.cpp b/src/socket.cpp index 9580529..6231579 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -29,6 +29,11 @@ { Start(); } +ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : + run_(false), mq_(id, create_or_else_find, shm, len) +{ + Start(); +} ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : run_(false), mq_(shm, len) { diff --git a/src/socket.h b/src/socket.h index 08a4b0a..981677f 100644 --- a/src/socket.h +++ b/src/socket.h @@ -48,6 +48,7 @@ typedef std::function<void(ShmSocket &sock)> IdleCB; ShmSocket(Shm &shm, const MQId id, const int len); + ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len); ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } @@ -142,6 +143,7 @@ template <class... Rest> bool SendImpl(const MQId remote, Rest &&...rest) { + // TODO send alloc request, and pack later, higher bit means alloc? send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); return true; } @@ -151,14 +153,15 @@ std::atomic<bool> run_; Queue mq_; - class AsyncCBs + template <class Key> + class CallbackRecords { - std::unordered_map<std::string, RecvCB> store_; + std::unordered_map<Key, RecvCB> store_; public: bool empty() const { return store_.empty(); } - bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } - bool Pick(const std::string &id, RecvCB &cb) + bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } + bool Pick(const Key &id, RecvCB &cb) { auto pos = store_.find(id); if (pos != store_.end()) { @@ -171,9 +174,9 @@ } }; - Synced<AsyncCBs> per_msg_cbs_; + Synced<CallbackRecords<std::string>> per_msg_cbs_; + SendQ send_buffer_; - // Synced<SendQ> send_buffer_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 3883062..d8d6a42 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -23,6 +23,9 @@ using namespace std::chrono; using namespace std::chrono_literals; +const char *const kErrMsgNotInit = "BHome node NOT initialized."; +const char *const kErrMsgNotRegistered = "BHome node NOT registered."; + namespace { inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); } @@ -63,14 +66,25 @@ MsgI msg; msg.OffsetRef() = ssn_id_; if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { - sockets_.resize(eSockEnd); - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); + + auto end_time = steady_clock::now() + 3s; + do { + try { + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); + } + break; + } catch (...) { + sockets_.clear(); + std::this_thread::sleep_for(100ms); + } + } while (steady_clock::now() < end_time); + if (!sockets_.empty()) { + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); + return true; } - // recv msgs to avoid memory leak. - auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; - SockNode().Start(default_ignore_msg); - return true; } return false; } @@ -78,7 +92,7 @@ void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { if (!Init()) { - SetLastError(eError, "BHome Node Not Inited."); + SetLastError(eError, kErrMsgNotInit); return; } if (nworker < 1) { @@ -101,7 +115,7 @@ bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { if (!Init()) { - SetLastError(eError, "BHome Node Not Inited."); + SetLastError(eError, kErrMsgNotInit); return false; } @@ -151,7 +165,7 @@ bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -190,7 +204,7 @@ bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -223,7 +237,7 @@ bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } auto &sock = SockNode(); @@ -242,7 +256,7 @@ bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -309,7 +323,7 @@ bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -333,7 +347,7 @@ bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -371,7 +385,7 @@ bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -412,7 +426,7 @@ bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -463,7 +477,7 @@ bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -487,7 +501,7 @@ bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -518,7 +532,7 @@ bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } @@ -567,7 +581,7 @@ bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) { if (!IsOnline()) { - SetLastError(eNotRegistered, "Not Registered."); + SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } diff --git a/src/topic_node.h b/src/topic_node.h index afce4fc..338a6e3 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -122,7 +122,7 @@ eSockSub, eSockEnd, }; - std::vector<std::unique_ptr<ShmSocket>> sockets_; + std::vector<std::shared_ptr<ShmSocket>> sockets_; ShmSocket &SockNode() { return *sockets_[eSockNode]; } ShmSocket &SockPub() { return *sockets_[eSockPub]; } -- Gitblit v1.8.0