From db322f33ba13592f2492317e3f1a070454c97059 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 13 五月 2021 19:34:46 +0800 Subject: [PATCH] center alloc all msgs. --- box/center.cpp | 276 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 files changed, 232 insertions(+), 44 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index d6ac804..b440a03 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -21,7 +21,7 @@ #include "log.h" #include "shm.h" #include <chrono> -#include <set> +#include <unordered_map> using namespace std::chrono; using namespace std::chrono_literals; @@ -33,11 +33,118 @@ namespace { +typedef std::string ProcId; +typedef size_t ProcIndex; // max local procs. +const int kMaxProcs = 65536; + +// record all procs ever registered, always grow, never remove. +// mainly for node to request msg allocation. +// use index instead of MQId to save some bits. +class ProcRecords +{ +public: + struct ProcRec { + ProcId proc_; + MQId ssn_ = 0; + }; + + ProcRecords() { procs_.reserve(kMaxProcs); } + + ProcIndex Put(const ProcId &proc_id, const MQId ssn) + { + if (procs_.size() >= kMaxProcs) { + return -1; + } + auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); + int index = pos_isnew.first->second; + if (pos_isnew.second) { + procs_.emplace_back(ProcRec{proc_id, ssn}); + } else { // update ssn + procs_[index].ssn_ = ssn; + } + return index; + } + const ProcRec &Get(const ProcIndex index) const + { + static ProcRec empty_rec; + return (index < procs_.size()) ? procs_[index] : empty_rec; + } + +private: + std::unordered_map<ProcId, size_t> proc_index_; + std::vector<ProcRec> procs_; +}; + +class MsgRecords +{ + typedef int64_t MsgId; + typedef int64_t Offset; + +public: + void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } + void FreeMsg(MsgId id) + { + auto pos = msgs_.find(id); + if (pos != msgs_.end()) { + ShmMsg(pos->second).Free(); + msgs_.erase(pos); + } else { + LOG_TRACE() << "ignore late free request."; + } + } + void AutoRemove() + { + auto now = NowSec(); + if (now < time_to_clean_) { + return; + } + LOG_FUNCTION; + time_to_clean_ = now + 1; + int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t n = 0; + auto it = msgs_.begin(); + while (it != msgs_.end() && --limit > 0) { + ShmMsg msg(it->second); + if (msg.Count() == 0) { + msg.Free(); + it = msgs_.erase(it); + ++n; + } else if (msg.timestamp() + 10 < NowSec()) { + msg.Free(); + it = msgs_.erase(it); + ++n; + // LOG_DEBUG() << "release timeout msg, someone crashed."; + } else { + ++it; + } + } + if (n > 0) { + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + } + } + size_t size() const { return msgs_.size(); } + void DebugPrint() const + { + LOG_DEBUG() << "msgs : " << size(); + int i = 0; + int total_count = 0; + for (auto &kv : msgs_) { + MsgI msg(kv.second); + total_count += msg.Count(); + LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); + } + LOG_DEBUG() << "total count: " << total_count; + } + +private: + std::unordered_map<MsgId, Offset> msgs_; + int64_t time_to_clean_ = 0; +}; + //TODO check proc_id class NodeCenter { public: - typedef std::string ProcId; typedef MQId Address; typedef bhome_msg::ProcInfo ProcInfo; typedef std::function<void(Address const)> Cleaner; @@ -102,13 +209,14 @@ // center name, no relative to shm. const std::string &id() const { return id_; } - void OnNodeInit(SharedMemory &shm, const int64_t msg) + void OnNodeInit(ShmSocket &socket, const int64_t val) { - MQId ssn = msg; + LOG_FUNCTION; + SharedMemory &shm = socket.shm(); + MQId ssn = (val >> 4) & MaskBits(60); if (nodes_.find(ssn) != nodes_.end()) { return; // ignore in exists. } - auto UpdateRegInfo = [&](Node &node) { for (int i = 0; i < 10; ++i) { node->addrs_.insert(ssn + i); @@ -118,12 +226,10 @@ // create sockets. try { - auto CreateSocket = [](SharedMemory &shm, const MQId id) { - ShmSocket tmp(shm, true, id, 16); - }; + auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); }; // alloc(-1), node, server, sub, request, - for (int i = -1; i < 4; ++i) { - CreateSocket(shm, ssn + i); + for (int i = 0; i < 4; ++i) { + CreateSocket(ssn + i); node->addrs_.insert(ssn + i); } return true; @@ -132,11 +238,93 @@ } }; + auto PrepareProcInit = [&]() { + bool r = false; + ShmMsg init_msg; + if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { + // 31bit pointer, 4bit cmd+flag + int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); + r = SendAllocReply(socket, ssn, reply, init_msg); + } + return r; + }; + Node node(new NodeInfo); - if (UpdateRegInfo(node)) { + if (UpdateRegInfo(node) && PrepareProcInit()) { nodes_[ssn] = node; LOG_INFO() << "new node ssn (" << ssn << ") init"; + } else { + for (int i = 0; i < 10; ++i) { + ShmSocket::Remove(shm, ssn + i); + } } + } + void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } + + bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg) + { + RecordMsg(msg); + auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; + return socket.Send(dest, reply, onExpireFree); + } + bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg) + { + RecordMsg(msg); + auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; + return socket.Send(dest, msg, onExpireFree); + } + + void OnAlloc(ShmSocket &socket, const int64_t val) + { + // LOG_FUNCTION; + // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag + int64_t msg_id = (val >> 4) & MaskBits(28); + int proc_index = (val >> 32) & MaskBits(16); + int socket_index = ((val) >> 48) & MaskBits(4); + auto proc_rec(procs_.Get(proc_index)); + if (proc_rec.proc_.empty()) { + return; + } + Address dest = proc_rec.ssn_ + socket_index; + + auto size = GetAllocSize((val >> 52) & MaskBits(8)); + MsgI new_msg; + if (new_msg.Make(size)) { + // 31bit proc index, 28bit id, ,4bit cmd+flag + int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); + SendAllocReply(socket, dest, reply, new_msg); + } else { + int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. + socket.Send(dest, reply); + } + } + + void OnFree(ShmSocket &socket, const int64_t val) + { + int64_t msg_id = (val >> 4) & MaskBits(31); + msgs_.FreeMsg(msg_id); + } + + bool OnCommand(ShmSocket &socket, const int64_t val) + { + assert(IsCmd(val)); + int cmd = DecodeCmd(val); + switch (cmd) { + case eCmdNodeInit: OnNodeInit(socket, val); break; + case eCmdAllocRequest0: OnAlloc(socket, val); break; + case eCmdFree: OnFree(socket, val); break; + default: return false; + } + return true; + } + + MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) + { + LOG_DEBUG() << "center got proc init."; + auto index = procs_.Put(head.proc_id(), head.ssn_id()); + auto reply(MakeReply<MsgProcInitReply>(eSuccess)); + reply.set_proc_index(index); + return reply; } MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) @@ -160,14 +348,13 @@ }; auto pos = nodes_.find(ssn); - if (pos != nodes_.end()) { // update - Node &node = pos->second; - UpdateRegInfo(node); - } else { - Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[ssn] = node; + if (pos == nodes_.end()) { + return MakeReply(eInvalidInput, "invalid session."); } + + // update proc info + Node &node = pos->second; + UpdateRegInfo(node); LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; auto old = online_node_addr_map_.find(head.proc_id()); @@ -376,13 +563,14 @@ void OnTimer() { CheckNodes(); + msgs_.AutoRemove(); } private: void CheckNodes() { auto now = NowSec(); - if (now - last_check_time_ < 1) { return; } + if (now <= last_check_time_) { return; } last_check_time_ = now; auto it = nodes_.begin(); @@ -396,6 +584,7 @@ ++it; } } + msgs_.DebugPrint(); } bool CanHeartbeat(const NodeInfo &node) { @@ -448,7 +637,10 @@ std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; std::unordered_map<Address, Node> nodes_; - std::unordered_map<std::string, Address> online_node_addr_map_; + std::unordered_map<ProcId, Address> online_node_addr_map_; + ProcRecords procs_; // To get a short index for msg alloc. + MsgRecords msgs_; // record all msgs alloced. + Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; @@ -483,25 +675,28 @@ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; -auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) +auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er) { return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); + auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); auto remote = head.route(0).mq_id(); - socket.Send(remote, reply_head, rep_body); + MsgI msg; + if (msg.Make(reply_head, rep_body)) { + DEFER1(msg.Release();); + center->SendAllocMsg(socket, remote, msg); + } }; } bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) { - auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { + // command + auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { auto ¢er = *center_ptr; - center->OnNodeInit(socket.shm(), msg.Offset()); + return IsCmd(cmd) && center->OnCommand(socket, cmd); }; - auto Nothing = [](ShmSocket &socket) {}; - BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); - + // now we can talk. auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; center->OnTimer(); @@ -509,8 +704,9 @@ auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center); switch (head.type()) { + CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(Unregister); @@ -520,12 +716,13 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; + auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; @@ -561,7 +758,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); return true; } @@ -576,14 +773,9 @@ return rec; } -bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) +bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len) { - Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; - return true; -} -bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) -{ - Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len}; return true; } @@ -609,11 +801,7 @@ { for (auto &kv : Centers()) { auto &info = kv.second; - if (info.handler_) { - sockets_[info.name_]->Start(info.handler_, info.idle_); - } else { - sockets_[info.name_]->Start(info.raw_handler_, info.idle_); - } + sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } return true; -- Gitblit v1.8.0