/* * ===================================================================================== * * Filename: node_center.cpp * * Description: * * Version: 1.0 * Created: 2021年05月20日 11时32分55秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "node_center.h" #include "json.h" #include "log.h" using ssjson::Json; namespace { std::string Join(const std::string &parent, const std::string &child) { return parent + kTopicSep + child; } const std::string kTopicCenterRoot = "#center"; const std::string kTopicNode = Join(kTopicCenterRoot, "node"); const std::string kTopicNodeOnline = Join(kTopicNode, "online"); const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); const std::string kTopicNodeService = Join(kTopicNode, "service"); const std::string kTopicNodeSub = Join(kTopicNode, "subscribe"); const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe"); } // namespace ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) { if (procs_.size() >= kMaxProcs) { return -1; } auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); int index = pos_isnew.first->second; if (pos_isnew.second) { procs_.emplace_back(ProcRec{proc_id, ssn}); } else { // update ssn procs_[index].ssn_ = ssn; } return index; } const ProcRecords::ProcRec &ProcRecords::Get(const ProcIndex index) const { static ProcRec empty_rec; return (index < procs_.size()) ? procs_[index] : empty_rec; } void MsgRecords::FreeMsg(MsgId id) { auto pos = msgs_.find(id); if (pos != msgs_.end()) { pos->second.Free(); msgs_.erase(pos); } else { LOG_TRACE() << "ignore late free request."; } } void MsgRecords::AutoRemove() { auto now = NowSec(); if (now < time_to_clean_) { return; } // LOG_FUNCTION; const size_t total = msgs_.size(); time_to_clean_ = now + 1; int64_t limit = std::max(10000ul, total / 10); int64_t n = 0; auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { ShmMsg msg(it->second); auto Free = [&]() { msg.Free(); it = msgs_.erase(it); ++n; }; int n = now - msg.timestamp(); if (msg.Count() == 0) { Free(); } else if (n > NodeTimeoutSec()) { Free(); } else { ++it; } } if (n > 0) { LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; } } void MsgRecords::DebugPrint() const { LOG_TRACE() << "msgs : " << size(); int i = 0; int total_count = 0; for (auto &kv : msgs_) { auto &msg = kv.second; total_count += msg.Count(); LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size(); } LOG_TRACE() << "total count: " << total_count; } // NodeCenter::ProcState void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time) { state_.timestamp_ = NowSec() - offline_time; state_.flag_ = kStateOffline; center_.Notify(kTopicNodeOffline, *this); } void NodeCenter::Notify(const Topic &topic, NodeInfo &node) { if (node.proc_.proc_id().empty()) { return; } // node init, ignore. Json json; json.put("proc_id", node.proc_.proc_id()); Publish(node.shm_, topic, json.dump()); } void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { auto old = state_.flag_; auto diff = now - state_.timestamp_; LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; if (diff < offline_time) { state_.flag_ = kStateNormal; if (old != state_.flag_) { center_.Notify(kTopicNodeOnline, *this); } } else if (diff < kill_time) { state_.flag_ = kStateOffline; if (old != state_.flag_) { center_.Notify(kTopicNodeOffline, *this); } } else { state_.flag_ = kStateKillme; } } int64_t NodeCenter::OnNodeInit(ShmSocket &socket, const int64_t val) { LOG_FUNCTION; SharedMemory &shm = socket.shm(); MQId ssn = (val >> 4) & MaskBits(56); int reply = EncodeCmd(eCmdNodeInitReply); if (nodes_.find(ssn) != nodes_.end()) { return reply; // ignore if exists. } auto UpdateRegInfo = [&](Node &node) { node->state_.timestamp_ = NowSec() - offline_time_; node->UpdateState(NowSec(), offline_time_, kill_time_); // create sockets. try { ShmSocket tmp(shm, ssn, eCreate); node->addrs_.emplace(ssn, tmp.AbsAddr()); return true; } catch (...) { return false; } }; auto PrepareProcInit = [&](Node &node) { bool r = false; ShmMsg init_msg(shm); DEFER1(init_msg.Release()); MsgProcInit body; auto head = InitMsgHead(GetType(body), id(), ssn); return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) && init_msg.Fill(ShmMsg::Serialize(head, body)) && SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); }; Node node(new NodeInfo(*this, shm)); if (UpdateRegInfo(node) && PrepareProcInit(node)) { reply |= (node->addrs_[ssn] << 4); nodes_[ssn] = node; LOG_INFO() << "new node ssn (" << ssn << ") init"; } else { ShmSocket::Remove(shm, ssn); } return reply; } void NodeCenter::RecordMsg(const MsgI &msg) { msg.reset_managed(true); msgs_.RecordMsg(msg); } bool NodeCenter::SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) { RecordMsg(msg); auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; return socket.Send(dest, reply, onExpireFree); } bool NodeCenter::SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg) { RecordMsg(msg); return socket.Send(dest, msg); } NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) { Node node; auto ssn = mq_id - (mq_id % 10); auto pos = nodes_.find(ssn); if (pos != nodes_.end()) { node = pos->second; } return node; } bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) { Node node; auto FindDest = [&]() { auto pos = service_map_.find(head.topic()); if (pos != service_map_.end() && !pos->second.empty()) { auto &clients = pos->second; for (auto &cli : clients) { node = cli.weak_node_.lock(); if (node && Valid(*node)) { dest.id_ = cli.mq_id_; dest.offset_ = cli.mq_abs_addr_; return true; } } } return false; }; if (dest.id_ == 0) { if (!FindDest()) { LOG_ERROR() << id() << " pass remote request, topic dest not found."; return false; } } else { node = GetNode(dest.id_); if (!node || !Valid(*node)) { LOG_ERROR() << id() << " pass remote request, dest not found."; return false; } } ShmSocket &sender(DefaultSender(node->shm_)); auto route = head.add_route(); route->set_mq_id(sender.id()); route->set_abs_addr(sender.AbsAddr()); ShmMsg msg(node->shm_); if (!msg.Make(head, body_content)) { return false; } DEFER1(msg.Release();); RecordMsg(msg); return sender.Send(dest, msg, head.msg_id(), std::move(cb)); } bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) { // LOG_FUNCTION; auto &topic = head.topic(); auto clients = DoFindClients(topic, true); if (clients.empty()) { return true; } std::vector msgs; auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; DEFER1(ReleaseAll();); for (auto &cli : clients) { auto Send1 = [&](Node node) { auto &shm = node->shm_; for (auto &msg : msgs) { if (msg.shm().name() == shm.name()) { DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); return; } } MsgI msg(shm); if (msg.Make(head, body_content)) { RecordMsg(msg); msgs.push_back(msg); // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } }; auto node = cli.weak_node_.lock(); if (node) { Send1(node); // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. } } return true; } bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) { Node node(GetNode(dest.id_)); if (!node) { LOG_ERROR() << id() << " pass remote reply , ssn not found."; return false; } auto offset = node->addrs_[dest.id_]; if (offset != dest.offset_) { LOG_ERROR() << id() << " pass remote reply, dest address not match"; return false; } ShmMsg msg(node->shm_); if (!msg.Make(head, body_content)) { return false; } DEFER1(msg.Release();); RecordMsg(msg); return DefaultSender(node->shm_).Send(dest, msg); } void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) { // LOG_FUNCTION; // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag int64_t msg_id = (val >> 4) & MaskBits(28); int proc_index = (val >> 32) & MaskBits(16); int socket_index = ((val) >> 48) & MaskBits(4); auto proc_rec(procs_.Get(proc_index)); if (proc_rec.proc_.empty()) { return; } MQInfo dest = {proc_rec.ssn_ + socket_index, 0}; auto FindMq = [&]() { auto pos = nodes_.find(proc_rec.ssn_); if (pos != nodes_.end()) { for (auto &&mq : pos->second->addrs_) { if (mq.first == dest.id_) { dest.offset_ = mq.second; return true; } } } return false; }; if (!FindMq()) { return; } auto size = GetAllocSize((val >> 52) & MaskBits(8)); MsgI new_msg(socket.shm()); if (new_msg.Make(size)) { // 31bit proc index, 28bit id, ,4bit cmd+flag int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); SendAllocReply(socket, dest, reply, new_msg); } else { int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. socket.Send(dest, reply); } } void NodeCenter::OnFree(ShmSocket &socket, const int64_t val) { int64_t msg_id = (val >> 4) & MaskBits(31); msgs_.FreeMsg(msg_id); } bool NodeCenter::OnCommand(ShmSocket &socket, const int64_t val) { assert(IsCmd(val)); int cmd = DecodeCmd(val); switch (cmd) { case eCmdAllocRequest0: OnAlloc(socket, val); break; case eCmdFree: OnFree(socket, val); break; default: return false; } return true; } MsgProcInitReply NodeCenter::ProcInit(const BHMsgHead &head, MsgProcInit &msg) { LOG_DEBUG() << "center got proc init."; auto pos = nodes_.find(head.ssn_id()); if (pos == nodes_.end()) { return MakeReply(eNotFound, "Node Not Initialised"); } auto index = procs_.Put(head.proc_id(), head.ssn_id()); auto reply(MakeReply(eSuccess)); reply.set_proc_index(index); auto &node = pos->second; try { for (int i = 0; i < msg.extra_mq_num(); ++i) { ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate); node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); auto addr = reply.add_extra_mqs(); addr->set_mq_id(tmp.id()); addr->set_abs_addr(tmp.AbsAddr()); } return reply; } catch (...) { LOG_ERROR() << "proc init create mq error"; return MakeReply(eError, "Create mq failed."); } } MsgCommonReply NodeCenter::Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { return MakeReply(eInvalidInput, "invalid proc id."); } try { MQId ssn = head.ssn_id(); // when node restart, ssn will change, // and old node will be removed after timeout. auto pos = nodes_.find(ssn); if (pos == nodes_.end()) { return MakeReply(eInvalidInput, "invalid session."); } // try to remove old session auto old = online_node_addr_map_.find(head.proc_id()); if (old != online_node_addr_map_.end()) { // old session auto &old_ssn = old->second; if (old_ssn != ssn) { nodes_[old_ssn]->PutOffline(offline_time_); LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; old_ssn = ssn; } } else { online_node_addr_map_.emplace(head.proc_id(), ssn); } // update proc info Node &node = pos->second; node->proc_.Swap(msg.mutable_proc()); node->state_.timestamp_ = head.timestamp(); node->UpdateState(NowSec(), offline_time_, kill_time_); LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); } } MsgCommonReply NodeCenter::Unregister(const BHMsgHead &head, MsgUnregister &msg) { return HandleMsg( head, [&](Node node) -> MsgCommonReply { NodeInfo &ni = *node; ni.PutOffline(offline_time_); return MakeReply(eSuccess); }); } MsgCommonReply NodeCenter::RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) { return HandleMsg( head, [&](Node node) -> MsgCommonReply { auto src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->services_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, SrcAbsAddr(head), node}; for (auto &topic : topics) { service_map_[topic].insert(dest); } LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n"; for (auto &topic : topics) { LOG_DEBUG() << "\t" << topic; } Notify(kTopicNodeService, *node); return MakeReply(eSuccess); }); } MsgCommonReply NodeCenter::Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) { return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; ni.state_.timestamp_ = head.timestamp(); ni.UpdateState(NowSec(), offline_time_, kill_time_); auto &info = msg.proc(); if (!info.public_info().empty()) { ni.proc_.set_public_info(info.public_info()); } if (!info.private_info().empty()) { ni.proc_.set_private_info(info.private_info()); } return MakeReply(eSuccess); }); } MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id) { typedef MsgQueryProcReply Reply; auto Add1 = [](Reply &reply, Node node) { auto info = reply.add_proc_list(); *info->mutable_proc() = node->proc_; info->mutable_proc()->clear_private_info(); info->set_online(node->state_.flag_ == kStateNormal); auto AddTopics = [](auto &dst, auto &src) { for (auto &addr_topics : src) { for (auto &topic : addr_topics.second) { dst.add_topic_list(topic); } } }; AddTopics(*info->mutable_service(), node->services_); AddTopics(*info->mutable_local_sub(), node->local_sub_); AddTopics(*info->mutable_net_sub(), node->net_sub_); }; if (!proc_id.empty()) { auto pos = online_node_addr_map_.find(proc_id); if (pos == online_node_addr_map_.end()) { return MakeReply(eNotFound, "proc not found."); } else { auto node_pos = nodes_.find(pos->second); if (node_pos == nodes_.end()) { return MakeReply(eNotFound, "proc node not found."); } else { auto reply = MakeReply(eSuccess); Add1(reply, node_pos->second); return reply; } } } else { Reply reply(MakeReply(eSuccess)); for (auto &kv : nodes_) { Add1(reply, kv.second); } return reply; } } MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) { typedef MsgQueryProcReply Reply; auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); }; return HandleMsg(head, query); } MsgQueryTopicReply NodeCenter::QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) { typedef MsgQueryTopicReply Reply; auto query = [&](Node self) -> Reply { Reply reply = MakeReply(eSuccess); auto local = [&]() { auto pos = service_map_.find(req.topic()); if (pos != service_map_.end() && !pos->second.empty()) { auto &clients = pos->second; for (auto &dest : clients) { Node dest_node(dest.weak_node_.lock()); if (dest_node && Valid(*dest_node)) { auto node_addr = reply.add_node_address(); node_addr->set_proc_id(dest_node->proc_.proc_id()); node_addr->mutable_addr()->set_mq_id(dest.mq_id_); node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); } } return true; } else { return false; } }; auto net = [&]() { auto hosts(FindRemoteRPCServers(req.topic())); if (hosts.empty()) { return false; } else { for (auto &ip : hosts) { auto node_addr = reply.add_node_address(); node_addr->mutable_addr()->set_ip(ip); } return true; } }; local(); net(); if (reply.node_address_size() == 0) { return MakeReply(eNotFound, "topic server not found."); } else { return reply; } }; return HandleMsg(head, query); } void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) { auto src = SrcAddr(head); auto Sub = [&](auto &sub, auto &sub_map) { auto &topics = msg.topics().topic_list(); sub[src].insert(topics.begin(), topics.end()); const TopicDest &dest = {src, SrcAbsAddr(head), node}; for (auto &topic : topics) { sub_map[topic].insert(dest); } }; if (msg.network()) { Sub(net_sub_, center_.net_sub_map_); center_.Notify(kTopicNodeSub, *this); } else { Sub(local_sub_, center_.local_sub_map_); } } MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { node->Subscribe(head, msg, node); return MakeReply(eSuccess); }); } void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) { auto src = SrcAddr(head); auto Unsub = [&](auto &sub, auto &sub_map) { auto pos = sub.find(src); auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { auto pos = sub_map.find(topic); if (pos != sub_map.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { sub_map.erase(pos); } }; if (pos != sub.end()) { const TopicDest &dest = {src, SrcAbsAddr(head), node}; auto &topics = msg.topics().topic_list(); // clear node sub records; for (auto &topic : topics) { pos->second.erase(topic); RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { sub.erase(pos); } } }; if (msg.network()) { Unsub(net_sub_, center_.net_sub_map_); center_.Notify(kTopicNodeUnsub, *this); } else { Unsub(local_sub_, center_.local_sub_map_); } } MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) { return HandleMsg(head, [&](Node node) { node->Unsubscribe(head, msg, node); return MakeReply(eSuccess); }); } NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { // LOG_FUNCTION; Clients dests; auto Find1 = [&](const std::string &exact) { auto FindIn = [&](auto &sub_map) { auto pos = sub_map.find(exact); if (pos != sub_map.end()) { auto &clients = pos->second; for (auto &cli : clients) { if (Valid(cli.weak_node_)) { dests.insert(cli); } } } }; if (!from_remote) { FindIn(local_sub_map_); // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); } // net subscripitions also work in local mode. FindIn(net_sub_map_); // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); }; Find1(topic); size_t pos = 0; while (true) { pos = topic.find(kTopicSep, pos); if (pos == topic.npos || ++pos == topic.size()) { // Find1(std::string()); // sub all. break; } else { Find1(topic.substr(0, pos - 1)); } } return dests; } MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) { return HandleMsg(head, [&](Node node) { DoPublish(DefaultSender(node->shm_), topic, msg); return MakeReply(eSuccess); }); } void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) { try { auto clients = DoFindClients(topic, false); if (clients.empty()) { return; } for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } } } catch (...) { LOG_ERROR() << "DoPublish error."; } } void NodeCenter::OnTimer() { CheckNodes(); msgs_.AutoRemove(); } void NodeCenter::CheckNodes() { auto now = NowSec(); if (now <= last_check_time_) { return; } last_check_time_ = now; auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; cli.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { RemoveNode(it->second); it = nodes_.erase(it); } else { ++it; } } msgs_.DebugPrint(); } void NodeCenter::RemoveNode(Node &node) { auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { for (auto &addr_topics : node_rec) { TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used. for (auto &topic : addr_topics.second) { auto pos = rec_map.find(topic); if (pos != rec_map.end()) { pos->second.erase(dest); if (pos->second.empty()) { rec_map.erase(pos); } } } } }; EraseMapRec(service_map_, node->services_); EraseMapRec(local_sub_map_, node->local_sub_); EraseMapRec(net_sub_map_, node->net_sub_); // remove online record. auto pos = online_node_addr_map_.find(node->proc_.proc_id()); if (pos != online_node_addr_map_.end()) { if (node->addrs_.find(pos->second) != node->addrs_.end()) { online_node_addr_map_.erase(pos); } } for (auto &addr : node->addrs_) { auto &id = addr.first; auto r = ShmSocket::Remove(node->shm_, id); LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); } node->addrs_.clear(); } void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) { try { MsgPublish pub; pub.set_topic(topic); pub.set_data(content); BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); MsgI msg(shm); if (msg.Make(head, pub)) { DEFER1(msg.Release()); RecordMsg(msg); DoPublish(DefaultSender(shm), topic, msg); } } catch (...) { LOG_ERROR() << "center publish error."; } } void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) { // LOG_FUNCTION; sub_hosts_.clear(); rpc_hosts_.clear(); for (auto &host : info.array()) { if (host.get("isLocal", false)) { host_id_ = host.get("serverId", ""); ip_ = host.get("ip", ""); } else { auto ip = host.get("ip", ""); auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { for (auto &topic : lot) { auto t = topic.get_value(); rec[t].insert(ip); // LOG_DEBUG() << "net topic: " << t << ", " << ip; } }; // LOG_DEBUG() << "serives:"; UpdateRec(host.child("pubTopics").array(), rpc_hosts_); // LOG_DEBUG() << "net sub:"; UpdateRec(host.child("netSubTopics").array(), sub_hosts_); } } }