From 101b5cf85397ef9350aaedd12cfcf9fd3d07a565 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 20 五月 2021 12:41:51 +0800 Subject: [PATCH] refactor node center. --- box/center.cpp | 690 --------------------------- box/node_center.h | 195 +++++++ box/node_center.cpp | 563 ++++++++++++++++++++++ 3 files changed, 760 insertions(+), 688 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index f140289..0f36719 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -16,12 +16,8 @@ * ===================================================================================== */ #include "center.h" -#include "bh_util.h" -#include "defs.h" -#include "log.h" -#include "shm.h" +#include "node_center.h" #include <chrono> -#include <unordered_map> using namespace std::chrono; using namespace std::chrono_literals; @@ -33,689 +29,7 @@ namespace { -typedef std::string ProcId; -typedef size_t ProcIndex; // max local procs. -const int kMaxProcs = 65536; - -// record all procs ever registered, always grow, never remove. -// mainly for node to request msg allocation. -// use index instead of MQId to save some bits. -class ProcRecords -{ -public: - struct ProcRec { - ProcId proc_; - MQId ssn_ = 0; - }; - - ProcRecords() { procs_.reserve(kMaxProcs); } - - ProcIndex Put(const ProcId &proc_id, const MQId ssn) - { - if (procs_.size() >= kMaxProcs) { - return -1; - } - auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); - int index = pos_isnew.first->second; - if (pos_isnew.second) { - procs_.emplace_back(ProcRec{proc_id, ssn}); - } else { // update ssn - procs_[index].ssn_ = ssn; - } - return index; - } - const ProcRec &Get(const ProcIndex index) const - { - static ProcRec empty_rec; - return (index < procs_.size()) ? procs_[index] : empty_rec; - } - -private: - std::unordered_map<ProcId, size_t> proc_index_; - std::vector<ProcRec> procs_; -}; - -class MsgRecords -{ - typedef int64_t MsgId; - typedef int64_t Offset; - -public: - void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } - void FreeMsg(MsgId id) - { - auto pos = msgs_.find(id); - if (pos != msgs_.end()) { - ShmMsg(pos->second).Free(); - msgs_.erase(pos); - } else { - LOG_TRACE() << "ignore late free request."; - } - } - void AutoRemove() - { - auto now = NowSec(); - if (now < time_to_clean_) { - return; - } - // LOG_FUNCTION; - time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); - int64_t n = 0; - auto it = msgs_.begin(); - while (it != msgs_.end() && --limit > 0) { - ShmMsg msg(it->second); - auto Free = [&]() { - msg.Free(); - it = msgs_.erase(it); - ++n; - }; - int n = now - msg.timestamp(); - if (n < 10) { - ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); - } - } - if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; - } - } - size_t size() const { return msgs_.size(); } - void DebugPrint() const - { - LOG_DEBUG() << "msgs : " << size(); - int i = 0; - int total_count = 0; - for (auto &kv : msgs_) { - MsgI msg(kv.second); - total_count += msg.Count(); - LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); - } - LOG_DEBUG() << "total count: " << total_count; - } - -private: - std::unordered_map<MsgId, Offset> msgs_; - int64_t time_to_clean_ = 0; -}; - //TODO check proc_id -class NodeCenter -{ -public: - typedef MQId Address; - typedef bhome_msg::ProcInfo ProcInfo; - typedef std::function<void(Address const)> Cleaner; - -private: - enum { - kStateInvalid, - kStateNormal, - kStateOffline, - kStateKillme, - }; - - struct ProcState { - int64_t timestamp_ = 0; - uint32_t flag_ = 0; // reserved - void PutOffline(const int64_t offline_time) - { - timestamp_ = NowSec() - offline_time; - flag_ = kStateOffline; - } - void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) - { - auto diff = now - timestamp_; - LOG_DEBUG() << "state " << this << " diff: " << diff; - if (diff < offline_time) { - flag_ = kStateNormal; - } else if (diff < kill_time) { - flag_ = kStateOffline; - } else { - flag_ = kStateKillme; - } - } - }; - typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; - - struct NodeInfo { - ProcState state_; // state - std::map<MQId, int64_t> addrs_; // registered mqs - ProcInfo proc_; // - AddressTopics services_; // address: topics - AddressTopics subscriptions_; // address: topics - }; - typedef std::shared_ptr<NodeInfo> Node; - typedef std::weak_ptr<NodeInfo> WeakNode; - - struct TopicDest { - MQId mq_id_; - int64_t mq_abs_addr_; - WeakNode weak_node_; - bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } - }; - inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } - inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } - inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } - - NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : - id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} - -public: - typedef std::set<TopicDest> Clients; - - NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) : - NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {} - - // center name, no relative to shm. - const std::string &id() const { return id_; } - int64_t OnNodeInit(ShmSocket &socket, const int64_t val) - { - LOG_FUNCTION; - SharedMemory &shm = socket.shm(); - MQId ssn = (val >> 4) & MaskBits(56); - int reply = EncodeCmd(eCmdNodeInitReply); - - if (nodes_.find(ssn) != nodes_.end()) { - return reply; // ignore if exists. - } - - auto UpdateRegInfo = [&](Node &node) { - node->state_.timestamp_ = NowSec() - offline_time_; - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - - // create sockets. - try { - ShmSocket tmp(shm, true, ssn, 16); - node->addrs_.emplace(ssn, tmp.AbsAddr()); - return true; - } catch (...) { - return false; - } - }; - - auto PrepareProcInit = [&](Node &node) { - bool r = false; - ShmMsg init_msg; - DEFER1(init_msg.Release()); - MsgProcInit body; - auto head = InitMsgHead(GetType(body), id(), ssn); - return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) && - init_msg.Fill(ShmMsg::Serialize(head, body)) && - SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); - }; - - Node node(new NodeInfo); - if (UpdateRegInfo(node) && PrepareProcInit(node)) { - reply |= (node->addrs_[ssn] << 4); - nodes_[ssn] = node; - LOG_INFO() << "new node ssn (" << ssn << ") init"; - } else { - ShmSocket::Remove(shm, ssn); - } - return reply; - } - void RecordMsg(const MsgI &msg) - { - msg.reset_managed(true); - msgs_.RecordMsg(msg); - } - - bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) - { - RecordMsg(msg); - auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; - return socket.Send(dest, reply, onExpireFree); - } - bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg) - { - RecordMsg(msg); - return socket.Send(dest, msg); - } - - void OnAlloc(ShmSocket &socket, const int64_t val) - { - // LOG_FUNCTION; - // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag - int64_t msg_id = (val >> 4) & MaskBits(28); - int proc_index = (val >> 32) & MaskBits(16); - int socket_index = ((val) >> 48) & MaskBits(4); - auto proc_rec(procs_.Get(proc_index)); - if (proc_rec.proc_.empty()) { - return; - } - - MQInfo dest = {proc_rec.ssn_ + socket_index, 0}; - auto FindMq = [&]() { - auto pos = nodes_.find(proc_rec.ssn_); - if (pos != nodes_.end()) { - for (auto &&mq : pos->second->addrs_) { - if (mq.first == dest.id_) { - dest.offset_ = mq.second; - return true; - } - } - } - return false; - }; - if (!FindMq()) { return; } - - auto size = GetAllocSize((val >> 52) & MaskBits(8)); - MsgI new_msg; - if (new_msg.Make(size)) { - // 31bit proc index, 28bit id, ,4bit cmd+flag - int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); - SendAllocReply(socket, dest, reply, new_msg); - } else { - int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. - socket.Send(dest, reply); - } - } - - void OnFree(ShmSocket &socket, const int64_t val) - { - int64_t msg_id = (val >> 4) & MaskBits(31); - msgs_.FreeMsg(msg_id); - } - - bool OnCommand(ShmSocket &socket, const int64_t val) - { - assert(IsCmd(val)); - int cmd = DecodeCmd(val); - switch (cmd) { - case eCmdAllocRequest0: OnAlloc(socket, val); break; - case eCmdFree: OnFree(socket, val); break; - default: return false; - } - return true; - } - - MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) - { - LOG_DEBUG() << "center got proc init."; - auto pos = nodes_.find(head.ssn_id()); - if (pos == nodes_.end()) { - return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised"); - } - auto index = procs_.Put(head.proc_id(), head.ssn_id()); - auto reply(MakeReply<MsgProcInitReply>(eSuccess)); - reply.set_proc_index(index); - - auto &node = pos->second; - try { - for (int i = 0; i < msg.extra_mq_num(); ++i) { - ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); - node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); - auto addr = reply.add_extra_mqs(); - addr->set_mq_id(tmp.id()); - addr->set_abs_addr(tmp.AbsAddr()); - } - return reply; - } catch (...) { - LOG_ERROR() << "proc init create mq error"; - return MakeReply<MsgProcInitReply>(eError, "Create mq failed."); - } - } - - MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) - { - if (msg.proc().proc_id() != head.proc_id()) { - return MakeReply(eInvalidInput, "invalid proc id."); - } - - try { - MQId ssn = head.ssn_id(); - // when node restart, ssn will change, - // and old node will be removed after timeout. - auto UpdateRegInfo = [&](Node &node) { - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - }; - - auto pos = nodes_.find(ssn); - if (pos == nodes_.end()) { - return MakeReply(eInvalidInput, "invalid session."); - } - - // update proc info - Node &node = pos->second; - UpdateRegInfo(node); - LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; - - auto old = online_node_addr_map_.find(head.proc_id()); - if (old != online_node_addr_map_.end()) { // old session - auto &old_ssn = old->second; - if (old_ssn != ssn) { - nodes_[old_ssn]->state_.PutOffline(offline_time_); - LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; - old_ssn = ssn; - } - } else { - online_node_addr_map_.emplace(head.proc_id(), ssn); - } - return MakeReply(eSuccess); - } catch (...) { - return MakeReply(eError, "register node error."); - } - } - - template <class Reply, class Func> - Reply HandleMsg(const BHMsgHead &head, Func const &op) - { - try { - auto pos = nodes_.find(head.ssn_id()); - if (pos == nodes_.end()) { - return MakeReply<Reply>(eNotRegistered, "Node is not registered."); - } else { - auto &node = pos->second; - if (!MatchAddr(node->addrs_, SrcAddr(head))) { - return MakeReply<Reply>(eAddressNotMatch, "Node address error."); - } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { - return op(node); - } else if (!Valid(*node)) { - return MakeReply<Reply>(eNoRespond, "Node is not alive."); - } else { - return op(node); - } - } - } catch (...) { - //TODO error log - return MakeReply<Reply>(eError, "internal error."); - } - } - template <class Func> - inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) - { - return HandleMsg<MsgCommonReply, Func>(head, op); - } - - MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg) - { - return HandleMsg( - head, [&](Node node) -> MsgCommonReply { - NodeInfo &ni = *node; - ni.state_.PutOffline(offline_time_); - return MakeReply(eSuccess); - }); - } - - MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) - { - return HandleMsg( - head, [&](Node node) -> MsgCommonReply { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->services_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - service_map_[topic].insert(dest); - } - LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n"; - for (auto &topic : topics) { - LOG_DEBUG() << "\t" << topic; - } - return MakeReply(eSuccess); - }); - } - - MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) - { - return HandleMsg(head, [&](Node node) { - NodeInfo &ni = *node; - ni.state_.timestamp_ = head.timestamp(); - ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); - - auto &info = msg.proc(); - if (!info.public_info().empty()) { - ni.proc_.set_public_info(info.public_info()); - } - if (!info.private_info().empty()) { - ni.proc_.set_private_info(info.private_info()); - } - return MakeReply(eSuccess); - }); - } - MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req) - { - typedef MsgQueryProcReply Reply; - auto query = [&](Node self) -> Reply { - auto Add1 = [](Reply &reply, Node node) { - auto info = reply.add_proc_list(); - *info->mutable_proc() = node->proc_; - info->set_online(node->state_.flag_ == kStateNormal); - for (auto &addr_topics : node->services_) { - for (auto &topic : addr_topics.second) { - info->mutable_topics()->add_topic_list(topic); - } - } - }; - - if (!req.proc_id().empty()) { - auto pos = online_node_addr_map_.find(req.proc_id()); - if (pos == online_node_addr_map_.end()) { - return MakeReply<Reply>(eNotFound, "proc not found."); - } else { - auto node_pos = nodes_.find(pos->second); - if (node_pos == nodes_.end()) { - return MakeReply<Reply>(eNotFound, "proc node not found."); - } else { - auto reply = MakeReply<Reply>(eSuccess); - Add1(reply, node_pos->second); - return reply; - } - } - } else { - Reply reply(MakeReply<Reply>(eSuccess)); - for (auto &kv : nodes_) { - Add1(reply, kv.second); - } - return reply; - } - }; - - return HandleMsg<Reply>(head, query); - } - - MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) - { - typedef MsgQueryTopicReply Reply; - - auto query = [&](Node self) -> Reply { - auto pos = service_map_.find(req.topic()); - if (pos != service_map_.end() && !pos->second.empty()) { - auto &clients = pos->second; - Reply reply = MakeReply<Reply>(eSuccess); - for (auto &dest : clients) { - Node dest_node(dest.weak_node_.lock()); - if (dest_node && Valid(*dest_node)) { - auto node_addr = reply.add_node_address(); - node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_id_); - node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); - } - } - return reply; - } else { - return MakeReply<Reply>(eNotFound, "topic server not found."); - } - }; - - return HandleMsg<Reply>(head, query); - } - - MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) - { - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - subscribe_map_[topic].insert(dest); - } - return MakeReply(eSuccess); - }); - } - MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) - { - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto pos = node->subscriptions_.find(src); - - auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end() && - pos->second.erase(dest) != 0 && - pos->second.empty()) { - subscribe_map_.erase(pos); - } - }; - - if (pos != node->subscriptions_.end()) { - const TopicDest &dest = {src, SrcAbsAddr(head), node}; - auto &topics = msg.topics().topic_list(); - // clear node sub records; - for (auto &topic : topics) { - pos->second.erase(topic); - RemoveSubTopicDestRecord(topic, dest); - } - if (pos->second.empty()) { - node->subscriptions_.erase(pos); - } - } - return MakeReply(eSuccess); - }); - } - - Clients DoFindClients(const std::string &topic) - { - Clients dests; - auto Find1 = [&](const std::string &t) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end()) { - auto &clients = pos->second; - for (auto &cli : clients) { - if (Valid(cli.weak_node_)) { - dests.insert(cli); - } - } - } - }; - Find1(topic); - - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - } - bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) - { - bool ret = false; - HandleMsg(head, [&](Node node) { - DoFindClients(msg.topic()).swap(out); - ret = true; - return MakeReply(eSuccess); - }).Swap(&reply); - return ret; - } - - void OnTimer() - { - CheckNodes(); - msgs_.AutoRemove(); - } - -private: - void CheckNodes() - { - auto now = NowSec(); - if (now <= last_check_time_) { return; } - last_check_time_ = now; - - auto it = nodes_.begin(); - while (it != nodes_.end()) { - auto &cli = *it->second; - cli.state_.UpdateState(now, offline_time_, kill_time_); - if (cli.state_.flag_ == kStateKillme) { - RemoveNode(it->second); - it = nodes_.erase(it); - } else { - ++it; - } - } - msgs_.DebugPrint(); - } - bool CanHeartbeat(const NodeInfo &node) - { - return Valid(node) || node.state_.flag_ == kStateOffline; - } - bool Valid(const NodeInfo &node) - { - return node.state_.flag_ == kStateNormal; - } - bool Valid(const WeakNode &weak) - { - auto node = weak.lock(); - return node && Valid(*node); - } - void RemoveNode(Node &node) - { - auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { - for (auto &addr_topics : node_rec) { - TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used. - for (auto &topic : addr_topics.second) { - auto pos = rec_map.find(topic); - if (pos != rec_map.end()) { - pos->second.erase(dest); - if (pos->second.empty()) { - rec_map.erase(pos); - } - } - } - } - }; - EraseMapRec(service_map_, node->services_); - EraseMapRec(subscribe_map_, node->subscriptions_); - - // remove online record. - auto pos = online_node_addr_map_.find(node->proc_.proc_id()); - if (pos != online_node_addr_map_.end()) { - if (node->addrs_.find(pos->second) != node->addrs_.end()) { - online_node_addr_map_.erase(pos); - } - } - - for (auto &addr : node->addrs_) { - cleaner_(addr.first); - } - - node->addrs_.clear(); - } - std::string id_; // center proc id; - - std::unordered_map<Topic, Clients> service_map_; - std::unordered_map<Topic, Clients> subscribe_map_; - std::unordered_map<Address, Node> nodes_; - std::unordered_map<ProcId, Address> online_node_addr_map_; - ProcRecords procs_; // To get a short index for msg alloc. - MsgRecords msgs_; // record all msgs alloced. - - Cleaner cleaner_; // remove mqs. - int64_t offline_time_; - int64_t kill_time_; - int64_t last_check_time_; -}; template <class Body, class OnMsg, class Replyer> inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) @@ -863,7 +177,7 @@ } }; - auto nsec = seconds(NodeTimeoutSec()); + auto nsec = NodeTimeoutSec(); auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. AddCenter(center_ptr); diff --git a/box/node_center.cpp b/box/node_center.cpp new file mode 100644 index 0000000..b970d44 --- /dev/null +++ b/box/node_center.cpp @@ -0,0 +1,563 @@ +/* + * ===================================================================================== + * + * Filename: node_center.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�20鏃� 11鏃�32鍒�55绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "node_center.h" +#include "log.h" + +ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) +{ + if (procs_.size() >= kMaxProcs) { + return -1; + } + auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); + int index = pos_isnew.first->second; + if (pos_isnew.second) { + procs_.emplace_back(ProcRec{proc_id, ssn}); + } else { // update ssn + procs_[index].ssn_ = ssn; + } + return index; +} +const ProcRecords::ProcRec &ProcRecords::Get(const ProcIndex index) const +{ + static ProcRec empty_rec; + return (index < procs_.size()) ? procs_[index] : empty_rec; +} + +void MsgRecords::FreeMsg(MsgId id) +{ + auto pos = msgs_.find(id); + if (pos != msgs_.end()) { + ShmMsg(pos->second).Free(); + msgs_.erase(pos); + } else { + LOG_TRACE() << "ignore late free request."; + } +} +void MsgRecords::AutoRemove() +{ + auto now = NowSec(); + if (now < time_to_clean_) { + return; + } + // LOG_FUNCTION; + time_to_clean_ = now + 1; + int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t n = 0; + auto it = msgs_.begin(); + while (it != msgs_.end() && --limit > 0) { + ShmMsg msg(it->second); + auto Free = [&]() { + msg.Free(); + it = msgs_.erase(it); + ++n; + }; + int n = now - msg.timestamp(); + if (n < 10) { + ++it; + } else if (msg.Count() == 0) { + Free(); + } else if (n > 60) { + Free(); + } + } + if (n > 0) { + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + } +} + +void MsgRecords::DebugPrint() const +{ + LOG_DEBUG() << "msgs : " << size(); + int i = 0; + int total_count = 0; + for (auto &kv : msgs_) { + MsgI msg(kv.second); + total_count += msg.Count(); + LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); + } + LOG_DEBUG() << "total count: " << total_count; +} + +// NodeCenter::ProcState +void NodeCenter::ProcState::PutOffline(const int64_t offline_time) +{ + timestamp_ = NowSec() - offline_time; + flag_ = kStateOffline; +} + +void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) +{ + auto diff = now - timestamp_; + LOG_DEBUG() << "state " << this << " diff: " << diff; + if (diff < offline_time) { + flag_ = kStateNormal; + } else if (diff < kill_time) { + flag_ = kStateOffline; + } else { + flag_ = kStateKillme; + } +} + +int64_t NodeCenter::OnNodeInit(ShmSocket &socket, const int64_t val) +{ + LOG_FUNCTION; + SharedMemory &shm = socket.shm(); + MQId ssn = (val >> 4) & MaskBits(56); + int reply = EncodeCmd(eCmdNodeInitReply); + + if (nodes_.find(ssn) != nodes_.end()) { + return reply; // ignore if exists. + } + + auto UpdateRegInfo = [&](Node &node) { + node->state_.timestamp_ = NowSec() - offline_time_; + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + + // create sockets. + try { + ShmSocket tmp(shm, true, ssn, 16); + node->addrs_.emplace(ssn, tmp.AbsAddr()); + return true; + } catch (...) { + return false; + } + }; + + auto PrepareProcInit = [&](Node &node) { + bool r = false; + ShmMsg init_msg; + DEFER1(init_msg.Release()); + MsgProcInit body; + auto head = InitMsgHead(GetType(body), id(), ssn); + return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) && + init_msg.Fill(ShmMsg::Serialize(head, body)) && + SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); + }; + + Node node(new NodeInfo); + if (UpdateRegInfo(node) && PrepareProcInit(node)) { + reply |= (node->addrs_[ssn] << 4); + nodes_[ssn] = node; + LOG_INFO() << "new node ssn (" << ssn << ") init"; + } else { + ShmSocket::Remove(shm, ssn); + } + return reply; +} +void NodeCenter::RecordMsg(const MsgI &msg) +{ + msg.reset_managed(true); + msgs_.RecordMsg(msg); +} + +bool NodeCenter::SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) +{ + RecordMsg(msg); + auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; + return socket.Send(dest, reply, onExpireFree); +} +bool NodeCenter::SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg) +{ + RecordMsg(msg); + return socket.Send(dest, msg); +} + +void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) +{ + // LOG_FUNCTION; + // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag + int64_t msg_id = (val >> 4) & MaskBits(28); + int proc_index = (val >> 32) & MaskBits(16); + int socket_index = ((val) >> 48) & MaskBits(4); + auto proc_rec(procs_.Get(proc_index)); + if (proc_rec.proc_.empty()) { + return; + } + + MQInfo dest = {proc_rec.ssn_ + socket_index, 0}; + auto FindMq = [&]() { + auto pos = nodes_.find(proc_rec.ssn_); + if (pos != nodes_.end()) { + for (auto &&mq : pos->second->addrs_) { + if (mq.first == dest.id_) { + dest.offset_ = mq.second; + return true; + } + } + } + return false; + }; + if (!FindMq()) { return; } + + auto size = GetAllocSize((val >> 52) & MaskBits(8)); + MsgI new_msg; + if (new_msg.Make(size)) { + // 31bit proc index, 28bit id, ,4bit cmd+flag + int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); + SendAllocReply(socket, dest, reply, new_msg); + } else { + int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. + socket.Send(dest, reply); + } +} + +void NodeCenter::OnFree(ShmSocket &socket, const int64_t val) +{ + int64_t msg_id = (val >> 4) & MaskBits(31); + msgs_.FreeMsg(msg_id); +} + +bool NodeCenter::OnCommand(ShmSocket &socket, const int64_t val) +{ + assert(IsCmd(val)); + int cmd = DecodeCmd(val); + switch (cmd) { + case eCmdAllocRequest0: OnAlloc(socket, val); break; + case eCmdFree: OnFree(socket, val); break; + default: return false; + } + return true; +} + +MsgProcInitReply NodeCenter::ProcInit(const BHMsgHead &head, MsgProcInit &msg) +{ + LOG_DEBUG() << "center got proc init."; + auto pos = nodes_.find(head.ssn_id()); + if (pos == nodes_.end()) { + return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised"); + } + auto index = procs_.Put(head.proc_id(), head.ssn_id()); + auto reply(MakeReply<MsgProcInitReply>(eSuccess)); + reply.set_proc_index(index); + + auto &node = pos->second; + try { + for (int i = 0; i < msg.extra_mq_num(); ++i) { + ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); + node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); + auto addr = reply.add_extra_mqs(); + addr->set_mq_id(tmp.id()); + addr->set_abs_addr(tmp.AbsAddr()); + } + return reply; + } catch (...) { + LOG_ERROR() << "proc init create mq error"; + return MakeReply<MsgProcInitReply>(eError, "Create mq failed."); + } +} + +MsgCommonReply NodeCenter::Register(const BHMsgHead &head, MsgRegister &msg) +{ + if (msg.proc().proc_id() != head.proc_id()) { + return MakeReply(eInvalidInput, "invalid proc id."); + } + + try { + MQId ssn = head.ssn_id(); + // when node restart, ssn will change, + // and old node will be removed after timeout. + auto UpdateRegInfo = [&](Node &node) { + node->proc_.Swap(msg.mutable_proc()); + node->state_.timestamp_ = head.timestamp(); + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + }; + + auto pos = nodes_.find(ssn); + if (pos == nodes_.end()) { + return MakeReply(eInvalidInput, "invalid session."); + } + + // update proc info + Node &node = pos->second; + UpdateRegInfo(node); + LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; + + auto old = online_node_addr_map_.find(head.proc_id()); + if (old != online_node_addr_map_.end()) { // old session + auto &old_ssn = old->second; + if (old_ssn != ssn) { + nodes_[old_ssn]->state_.PutOffline(offline_time_); + LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; + old_ssn = ssn; + } + } else { + online_node_addr_map_.emplace(head.proc_id(), ssn); + } + return MakeReply(eSuccess); + } catch (...) { + return MakeReply(eError, "register node error."); + } +} + +MsgCommonReply NodeCenter::Unregister(const BHMsgHead &head, MsgUnregister &msg) +{ + return HandleMsg( + head, [&](Node node) -> MsgCommonReply { + NodeInfo &ni = *node; + ni.state_.PutOffline(offline_time_); + return MakeReply(eSuccess); + }); +} + +MsgCommonReply NodeCenter::RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) +{ + return HandleMsg( + head, [&](Node node) -> MsgCommonReply { + auto src = SrcAddr(head); + auto &topics = msg.topics().topic_list(); + node->services_[src].insert(topics.begin(), topics.end()); + TopicDest dest = {src, SrcAbsAddr(head), node}; + for (auto &topic : topics) { + service_map_[topic].insert(dest); + } + LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n"; + for (auto &topic : topics) { + LOG_DEBUG() << "\t" << topic; + } + return MakeReply(eSuccess); + }); +} + +MsgCommonReply NodeCenter::Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) +{ + return HandleMsg(head, [&](Node node) { + NodeInfo &ni = *node; + ni.state_.timestamp_ = head.timestamp(); + ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); + + auto &info = msg.proc(); + if (!info.public_info().empty()) { + ni.proc_.set_public_info(info.public_info()); + } + if (!info.private_info().empty()) { + ni.proc_.set_private_info(info.private_info()); + } + return MakeReply(eSuccess); + }); +} +MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) +{ + typedef MsgQueryProcReply Reply; + auto query = [&](Node self) -> Reply { + auto Add1 = [](Reply &reply, Node node) { + auto info = reply.add_proc_list(); + *info->mutable_proc() = node->proc_; + info->set_online(node->state_.flag_ == kStateNormal); + for (auto &addr_topics : node->services_) { + for (auto &topic : addr_topics.second) { + info->mutable_topics()->add_topic_list(topic); + } + } + }; + + if (!req.proc_id().empty()) { + auto pos = online_node_addr_map_.find(req.proc_id()); + if (pos == online_node_addr_map_.end()) { + return MakeReply<Reply>(eNotFound, "proc not found."); + } else { + auto node_pos = nodes_.find(pos->second); + if (node_pos == nodes_.end()) { + return MakeReply<Reply>(eNotFound, "proc node not found."); + } else { + auto reply = MakeReply<Reply>(eSuccess); + Add1(reply, node_pos->second); + return reply; + } + } + } else { + Reply reply(MakeReply<Reply>(eSuccess)); + for (auto &kv : nodes_) { + Add1(reply, kv.second); + } + return reply; + } + }; + + return HandleMsg<Reply>(head, query); +} + +MsgQueryTopicReply NodeCenter::QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) +{ + typedef MsgQueryTopicReply Reply; + + auto query = [&](Node self) -> Reply { + auto pos = service_map_.find(req.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + auto &clients = pos->second; + Reply reply = MakeReply<Reply>(eSuccess); + for (auto &dest : clients) { + Node dest_node(dest.weak_node_.lock()); + if (dest_node && Valid(*dest_node)) { + auto node_addr = reply.add_node_address(); + node_addr->set_proc_id(dest_node->proc_.proc_id()); + node_addr->mutable_addr()->set_mq_id(dest.mq_id_); + node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + } + } + return reply; + } else { + return MakeReply<Reply>(eNotFound, "topic server not found."); + } + }; + + return HandleMsg<Reply>(head, query); +} + +MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) +{ + return HandleMsg(head, [&](Node node) { + auto src = SrcAddr(head); + auto &topics = msg.topics().topic_list(); + node->subscriptions_[src].insert(topics.begin(), topics.end()); + TopicDest dest = {src, SrcAbsAddr(head), node}; + for (auto &topic : topics) { + subscribe_map_[topic].insert(dest); + } + return MakeReply(eSuccess); + }); +} +MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) +{ + return HandleMsg(head, [&](Node node) { + auto src = SrcAddr(head); + auto pos = node->subscriptions_.find(src); + + auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { + auto pos = subscribe_map_.find(topic); + if (pos != subscribe_map_.end() && + pos->second.erase(dest) != 0 && + pos->second.empty()) { + subscribe_map_.erase(pos); + } + }; + + if (pos != node->subscriptions_.end()) { + const TopicDest &dest = {src, SrcAbsAddr(head), node}; + auto &topics = msg.topics().topic_list(); + // clear node sub records; + for (auto &topic : topics) { + pos->second.erase(topic); + RemoveSubTopicDestRecord(topic, dest); + } + if (pos->second.empty()) { + node->subscriptions_.erase(pos); + } + } + return MakeReply(eSuccess); + }); +} + +NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) +{ + Clients dests; + auto Find1 = [&](const std::string &t) { + auto pos = subscribe_map_.find(topic); + if (pos != subscribe_map_.end()) { + auto &clients = pos->second; + for (auto &cli : clients) { + if (Valid(cli.weak_node_)) { + dests.insert(cli); + } + } + } + }; + Find1(topic); + + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } + } + return dests; +} + +bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) +{ + bool ret = false; + HandleMsg(head, [&](Node node) { + DoFindClients(msg.topic()).swap(out); + ret = true; + return MakeReply(eSuccess); + }).Swap(&reply); + return ret; +} + +void NodeCenter::OnTimer() +{ + CheckNodes(); + msgs_.AutoRemove(); +} + +void NodeCenter::CheckNodes() +{ + auto now = NowSec(); + if (now <= last_check_time_) { return; } + last_check_time_ = now; + + auto it = nodes_.begin(); + while (it != nodes_.end()) { + auto &cli = *it->second; + cli.state_.UpdateState(now, offline_time_, kill_time_); + if (cli.state_.flag_ == kStateKillme) { + RemoveNode(it->second); + it = nodes_.erase(it); + } else { + ++it; + } + } + msgs_.DebugPrint(); +} + +void NodeCenter::RemoveNode(Node &node) +{ + auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { + for (auto &addr_topics : node_rec) { + TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used. + for (auto &topic : addr_topics.second) { + auto pos = rec_map.find(topic); + if (pos != rec_map.end()) { + pos->second.erase(dest); + if (pos->second.empty()) { + rec_map.erase(pos); + } + } + } + } + }; + EraseMapRec(service_map_, node->services_); + EraseMapRec(subscribe_map_, node->subscriptions_); + + // remove online record. + auto pos = online_node_addr_map_.find(node->proc_.proc_id()); + if (pos != online_node_addr_map_.end()) { + if (node->addrs_.find(pos->second) != node->addrs_.end()) { + online_node_addr_map_.erase(pos); + } + } + + for (auto &addr : node->addrs_) { + cleaner_(addr.first); + } + + node->addrs_.clear(); +} \ No newline at end of file diff --git a/box/node_center.h b/box/node_center.h new file mode 100644 index 0000000..b9a01b3 --- /dev/null +++ b/box/node_center.h @@ -0,0 +1,195 @@ +/* + * ===================================================================================== + * + * Filename: node_center.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�20鏃� 11鏃�33鍒�06绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef NODE_CENTER_KY67RJ1Q +#define NODE_CENTER_KY67RJ1Q + +#include "shm_socket.h" +#include <unordered_map> + +typedef std::string ProcId; +typedef size_t ProcIndex; // max local procs. +const int kMaxProcs = 65536; + +// record all procs ever registered, always grow, never remove. +// mainly for node to request msg allocation. +// use index instead of MQId to save some bits. +class ProcRecords +{ +public: + struct ProcRec { + ProcId proc_; + MQId ssn_ = 0; + }; + + ProcRecords() { procs_.reserve(kMaxProcs); } + ProcIndex Put(const ProcId &proc_id, const MQId ssn); + const ProcRec &Get(const ProcIndex index) const; + +private: + std::unordered_map<ProcId, size_t> proc_index_; + std::vector<ProcRec> procs_; +}; + +class MsgRecords +{ + typedef int64_t MsgId; + typedef int64_t Offset; + +public: + void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } + void FreeMsg(MsgId id); + void AutoRemove(); + size_t size() const { return msgs_.size(); } + void DebugPrint() const; + +private: + std::unordered_map<MsgId, Offset> msgs_; + int64_t time_to_clean_ = 0; +}; + +class NodeCenter +{ +public: + typedef MQId Address; + typedef bhome_msg::ProcInfo ProcInfo; + typedef std::function<void(Address const)> Cleaner; + +private: + enum { + kStateInvalid, + kStateNormal, + kStateOffline, + kStateKillme, + }; + + struct ProcState { + int64_t timestamp_ = 0; + uint32_t flag_ = 0; // reserved + void PutOffline(const int64_t offline_time); + void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); + }; + typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; + + struct NodeInfo { + ProcState state_; // state + std::map<MQId, int64_t> addrs_; // registered mqs + ProcInfo proc_; // + AddressTopics services_; // address: topics + AddressTopics subscriptions_; // address: topics + }; + typedef std::shared_ptr<NodeInfo> Node; + typedef std::weak_ptr<NodeInfo> WeakNode; + + struct TopicDest { + MQId mq_id_; + int64_t mq_abs_addr_; + WeakNode weak_node_; + bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } + }; + + static inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } + static inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } + static inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } + +public: + typedef std::set<TopicDest> Clients; + + NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : + id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} + + // center name, no relative to shm. + const std::string &id() const { return id_; } + int64_t OnNodeInit(ShmSocket &socket, const int64_t val); + void RecordMsg(const MsgI &msg); + bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); + bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); + void OnAlloc(ShmSocket &socket, const int64_t val); + void OnFree(ShmSocket &socket, const int64_t val); + bool OnCommand(ShmSocket &socket, const int64_t val); + + MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg); + MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg); + + template <class Reply, class Func> + Reply HandleMsg(const BHMsgHead &head, Func const &op) + { + try { + auto pos = nodes_.find(head.ssn_id()); + if (pos == nodes_.end()) { + return MakeReply<Reply>(eNotRegistered, "Node is not registered."); + } else { + auto &node = pos->second; + if (!MatchAddr(node->addrs_, SrcAddr(head))) { + return MakeReply<Reply>(eAddressNotMatch, "Node address error."); + } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { + return op(node); + } else if (!Valid(*node)) { + return MakeReply<Reply>(eNoRespond, "Node is not alive."); + } else { + return op(node); + } + } + } catch (...) { + //TODO error log + return MakeReply<Reply>(eError, "internal error."); + } + } + template <class Func> + inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) + { + return HandleMsg<MsgCommonReply, Func>(head, op); + } + + MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); + MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); + MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg); + MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req); + MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); + MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); + MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); + Clients DoFindClients(const std::string &topic); + bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); + + void OnTimer(); + +private: + void CheckNodes(); + bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } + bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } + bool Valid(const WeakNode &weak) + { + auto node = weak.lock(); + return node && Valid(*node); + } + void RemoveNode(Node &node); + std::string id_; // center proc id; + + std::unordered_map<Topic, Clients> service_map_; + std::unordered_map<Topic, Clients> subscribe_map_; + std::unordered_map<Address, Node> nodes_; + std::unordered_map<ProcId, Address> online_node_addr_map_; + ProcRecords procs_; // To get a short index for msg alloc. + MsgRecords msgs_; // record all msgs alloced. + + Cleaner cleaner_; // remove mqs. + int64_t offline_time_; + int64_t kill_time_; + int64_t last_check_time_; +}; + +#endif // end of include guard: NODE_CENTER_KY67RJ1Q -- Gitblit v1.8.0