| | |
| | | #include "log.h" |
| | | #include "shm.h" |
| | | #include <chrono> |
| | | #include <set> |
| | | #include <unordered_map> |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | |
| | | namespace |
| | | { |
| | | |
| | | typedef std::string ProcId; |
| | | typedef size_t ProcIndex; // max local procs. |
| | | const int kMaxProcs = 65536; |
| | | |
| | | // record all procs ever registered, always grow, never remove. |
| | | // mainly for node to request msg allocation. |
| | | // use index instead of MQId to save some bits. |
| | | class ProcRecords |
| | | { |
| | | public: |
| | | struct ProcRec { |
| | | ProcId proc_; |
| | | MQId ssn_ = 0; |
| | | }; |
| | | |
| | | ProcRecords() { procs_.reserve(kMaxProcs); } |
| | | |
| | | ProcIndex Put(const ProcId &proc_id, const MQId ssn) |
| | | { |
| | | if (procs_.size() >= kMaxProcs) { |
| | | return -1; |
| | | } |
| | | auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); |
| | | int index = pos_isnew.first->second; |
| | | if (pos_isnew.second) { |
| | | procs_.emplace_back(ProcRec{proc_id, ssn}); |
| | | } else { // update ssn |
| | | procs_[index].ssn_ = ssn; |
| | | } |
| | | return index; |
| | | } |
| | | const ProcRec &Get(const ProcIndex index) const |
| | | { |
| | | static ProcRec empty_rec; |
| | | return (index < procs_.size()) ? procs_[index] : empty_rec; |
| | | } |
| | | |
| | | private: |
| | | std::unordered_map<ProcId, size_t> proc_index_; |
| | | std::vector<ProcRec> procs_; |
| | | }; |
| | | |
| | | class MsgRecords |
| | | { |
| | | typedef int64_t MsgId; |
| | | typedef int64_t Offset; |
| | | |
| | | public: |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } |
| | | void FreeMsg(MsgId id) |
| | | { |
| | | auto pos = msgs_.find(id); |
| | | if (pos != msgs_.end()) { |
| | | ShmMsg(pos->second).Free(); |
| | | msgs_.erase(pos); |
| | | } else { |
| | | LOG_TRACE() << "ignore late free request."; |
| | | } |
| | | } |
| | | void AutoRemove() |
| | | { |
| | | auto now = NowSec(); |
| | | if (now < time_to_clean_) { |
| | | return; |
| | | } |
| | | LOG_FUNCTION; |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t n = 0; |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | | ShmMsg msg(it->second); |
| | | if (msg.Count() == 0) { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | } else if (msg.timestamp() + 10 < NowSec()) { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | // LOG_DEBUG() << "release timeout msg, someone crashed."; |
| | | } else { |
| | | ++it; |
| | | } |
| | | } |
| | | if (n > 0) { |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; |
| | | } |
| | | } |
| | | size_t size() const { return msgs_.size(); } |
| | | void DebugPrint() const |
| | | { |
| | | LOG_DEBUG() << "msgs : " << size(); |
| | | int i = 0; |
| | | int total_count = 0; |
| | | for (auto &kv : msgs_) { |
| | | MsgI msg(kv.second); |
| | | total_count += msg.Count(); |
| | | LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); |
| | | } |
| | | LOG_DEBUG() << "total count: " << total_count; |
| | | } |
| | | |
| | | private: |
| | | std::unordered_map<MsgId, Offset> msgs_; |
| | | int64_t time_to_clean_ = 0; |
| | | }; |
| | | |
| | | //TODO check proc_id |
| | | class NodeCenter |
| | | { |
| | | public: |
| | | typedef std::string ProcId; |
| | | typedef MQId Address; |
| | | typedef bhome_msg::ProcInfo ProcInfo; |
| | | typedef std::function<void(Address const)> Cleaner; |
| | |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | | void OnNodeInit(SharedMemory &shm, const int64_t msg) |
| | | void OnNodeInit(ShmSocket &socket, const int64_t val) |
| | | { |
| | | MQId ssn = msg; |
| | | LOG_FUNCTION; |
| | | SharedMemory &shm = socket.shm(); |
| | | MQId ssn = (val >> 4) & MaskBits(60); |
| | | if (nodes_.find(ssn) != nodes_.end()) { |
| | | return; // ignore in exists. |
| | | } |
| | | |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | node->addrs_.insert(ssn + i); |
| | |
| | | |
| | | // create sockets. |
| | | try { |
| | | auto CreateSocket = [](SharedMemory &shm, const MQId id) { |
| | | ShmSocket tmp(shm, true, id, 16); |
| | | }; |
| | | auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); }; |
| | | // alloc(-1), node, server, sub, request, |
| | | for (int i = -1; i < 4; ++i) { |
| | | CreateSocket(shm, ssn + i); |
| | | for (int i = 0; i < 4; ++i) { |
| | | CreateSocket(ssn + i); |
| | | node->addrs_.insert(ssn + i); |
| | | } |
| | | return true; |
| | |
| | | } |
| | | }; |
| | | |
| | | auto PrepareProcInit = [&]() { |
| | | bool r = false; |
| | | ShmMsg init_msg; |
| | | if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { |
| | | // 31bit pointer, 4bit cmd+flag |
| | | int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); |
| | | r = SendAllocReply(socket, ssn, reply, init_msg); |
| | | } |
| | | return r; |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | if (UpdateRegInfo(node)) { |
| | | if (UpdateRegInfo(node) && PrepareProcInit()) { |
| | | nodes_[ssn] = node; |
| | | LOG_INFO() << "new node ssn (" << ssn << ") init"; |
| | | } else { |
| | | for (int i = 0; i < 10; ++i) { |
| | | ShmSocket::Remove(shm, ssn + i); |
| | | } |
| | | } |
| | | } |
| | | void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } |
| | | |
| | | bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg) |
| | | { |
| | | RecordMsg(msg); |
| | | auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; |
| | | return socket.Send(dest, reply, onExpireFree); |
| | | } |
| | | bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg) |
| | | { |
| | | RecordMsg(msg); |
| | | auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; |
| | | return socket.Send(dest, msg, onExpireFree); |
| | | } |
| | | |
| | | void OnAlloc(ShmSocket &socket, const int64_t val) |
| | | { |
| | | // LOG_FUNCTION; |
| | | // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag |
| | | int64_t msg_id = (val >> 4) & MaskBits(28); |
| | | int proc_index = (val >> 32) & MaskBits(16); |
| | | int socket_index = ((val) >> 48) & MaskBits(4); |
| | | auto proc_rec(procs_.Get(proc_index)); |
| | | if (proc_rec.proc_.empty()) { |
| | | return; |
| | | } |
| | | Address dest = proc_rec.ssn_ + socket_index; |
| | | |
| | | auto size = GetAllocSize((val >> 52) & MaskBits(8)); |
| | | MsgI new_msg; |
| | | if (new_msg.Make(size)) { |
| | | // 31bit proc index, 28bit id, ,4bit cmd+flag |
| | | int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); |
| | | SendAllocReply(socket, dest, reply, new_msg); |
| | | } else { |
| | | int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. |
| | | socket.Send(dest, reply); |
| | | } |
| | | } |
| | | |
| | | void OnFree(ShmSocket &socket, const int64_t val) |
| | | { |
| | | int64_t msg_id = (val >> 4) & MaskBits(31); |
| | | msgs_.FreeMsg(msg_id); |
| | | } |
| | | |
| | | bool OnCommand(ShmSocket &socket, const int64_t val) |
| | | { |
| | | assert(IsCmd(val)); |
| | | int cmd = DecodeCmd(val); |
| | | switch (cmd) { |
| | | case eCmdNodeInit: OnNodeInit(socket, val); break; |
| | | case eCmdAllocRequest0: OnAlloc(socket, val); break; |
| | | case eCmdFree: OnFree(socket, val); break; |
| | | default: return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) |
| | | { |
| | | LOG_DEBUG() << "center got proc init."; |
| | | auto index = procs_.Put(head.proc_id(), head.ssn_id()); |
| | | auto reply(MakeReply<MsgProcInitReply>(eSuccess)); |
| | | reply.set_proc_index(index); |
| | | return reply; |
| | | } |
| | | |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | |
| | | }; |
| | | |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos != nodes_.end()) { // update |
| | | Node &node = pos->second; |
| | | UpdateRegInfo(node); |
| | | } else { |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply(eInvalidInput, "invalid session."); |
| | | } |
| | | |
| | | // update proc info |
| | | Node &node = pos->second; |
| | | UpdateRegInfo(node); |
| | | LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; |
| | | |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | |
| | | void OnTimer() |
| | | { |
| | | CheckNodes(); |
| | | msgs_.AutoRemove(); |
| | | } |
| | | |
| | | private: |
| | | void CheckNodes() |
| | | { |
| | | auto now = NowSec(); |
| | | if (now - last_check_time_ < 1) { return; } |
| | | if (now <= last_check_time_) { return; } |
| | | last_check_time_ = now; |
| | | |
| | | auto it = nodes_.begin(); |
| | |
| | | ++it; |
| | | } |
| | | } |
| | | msgs_.DebugPrint(); |
| | | } |
| | | bool CanHeartbeat(const NodeInfo &node) |
| | | { |
| | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<std::string, Address> online_node_addr_map_; |
| | | std::unordered_map<ProcId, Address> online_node_addr_map_; |
| | | ProcRecords procs_; // To get a short index for msg alloc. |
| | | MsgRecords msgs_; // record all msgs alloced. |
| | | |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | |
| | | msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ |
| | | return true; |
| | | |
| | | auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) |
| | | auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er) |
| | | { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | MsgI msg; |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | | center->SendAllocMsg(socket, remote, msg); |
| | | } |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | { |
| | | auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | center->OnNodeInit(socket.shm(), msg.Offset()); |
| | | return IsCmd(cmd) && center->OnCommand(socket, cmd); |
| | | }; |
| | | auto Nothing = [](ShmSocket &socket) {}; |
| | | |
| | | BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); |
| | | |
| | | // now we can talk. |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | | center->OnTimer(); |
| | |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(ProcInit); |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | | CASE_ON_MSG_TYPE(Unregister); |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | return rec; |
| | | } |
| | | |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; |
| | | Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | |
| | |
| | | { |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | if (info.handler_) { |
| | | sockets_[info.name_]->Start(info.handler_, info.idle_); |
| | | } else { |
| | | sockets_[info.name_]->Start(info.raw_handler_, info.idle_); |
| | | } |
| | | sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); |
| | | } |
| | | |
| | | return true; |