From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- box/node_center.cpp | 531 +++++++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 410 insertions(+), 121 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index b970d44..76407a8 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -16,7 +16,25 @@ * ===================================================================================== */ #include "node_center.h" +#include "json.h" #include "log.h" + +using ssjson::Json; + +namespace +{ +std::string Join(const std::string &parent, const std::string &child) +{ + return parent + kTopicSep + child; +} +const std::string kTopicCenterRoot = "#center"; +const std::string kTopicNode = Join(kTopicCenterRoot, "node"); +const std::string kTopicNodeOnline = Join(kTopicNode, "online"); +const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); +const std::string kTopicNodeService = Join(kTopicNode, "service"); +const std::string kTopicNodeSub = Join(kTopicNode, "subscribe"); +const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe"); +} // namespace ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) { @@ -42,7 +60,7 @@ { auto pos = msgs_.find(id); if (pos != msgs_.end()) { - ShmMsg(pos->second).Free(); + pos->second.Free(); msgs_.erase(pos); } else { LOG_TRACE() << "ignore late free request."; @@ -55,8 +73,9 @@ return; } // LOG_FUNCTION; + const size_t total = msgs_.size(); time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t limit = std::max(10000ul, total / 10); int64_t n = 0; auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { @@ -67,49 +86,65 @@ ++n; }; int n = now - msg.timestamp(); - if (n < 10) { + if (msg.Count() == 0) { + Free(); + } else if (n > NodeTimeoutSec()) { + Free(); + } else { ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); } } if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; } } void MsgRecords::DebugPrint() const { - LOG_DEBUG() << "msgs : " << size(); + LOG_TRACE() << "msgs : " << size(); int i = 0; int total_count = 0; for (auto &kv : msgs_) { - MsgI msg(kv.second); + auto &msg = kv.second; total_count += msg.Count(); - LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); + LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size(); } - LOG_DEBUG() << "total count: " << total_count; + LOG_TRACE() << "total count: " << total_count; } // NodeCenter::ProcState -void NodeCenter::ProcState::PutOffline(const int64_t offline_time) +void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time) { - timestamp_ = NowSec() - offline_time; - flag_ = kStateOffline; + state_.timestamp_ = NowSec() - offline_time; + state_.flag_ = kStateOffline; + center_.Notify(kTopicNodeOffline, *this); } -void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) +void NodeCenter::Notify(const Topic &topic, NodeInfo &node) { - auto diff = now - timestamp_; - LOG_DEBUG() << "state " << this << " diff: " << diff; + if (node.proc_.proc_id().empty()) { return; } // node init, ignore. + Json json; + json.put("proc_id", node.proc_.proc_id()); + Publish(node.shm_, topic, json.dump()); +} +void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) +{ + auto old = state_.flag_; + auto diff = now - state_.timestamp_; + + LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; if (diff < offline_time) { - flag_ = kStateNormal; + state_.flag_ = kStateNormal; + if (old != state_.flag_) { + center_.Notify(kTopicNodeOnline, *this); + } } else if (diff < kill_time) { - flag_ = kStateOffline; + state_.flag_ = kStateOffline; + if (old != state_.flag_) { + center_.Notify(kTopicNodeOffline, *this); + } } else { - flag_ = kStateKillme; + state_.flag_ = kStateKillme; } } @@ -126,11 +161,11 @@ auto UpdateRegInfo = [&](Node &node) { node->state_.timestamp_ = NowSec() - offline_time_; - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + node->UpdateState(NowSec(), offline_time_, kill_time_); // create sockets. try { - ShmSocket tmp(shm, true, ssn, 16); + ShmSocket tmp(shm, ssn, eCreate); node->addrs_.emplace(ssn, tmp.AbsAddr()); return true; } catch (...) { @@ -140,7 +175,7 @@ auto PrepareProcInit = [&](Node &node) { bool r = false; - ShmMsg init_msg; + ShmMsg init_msg(shm); DEFER1(init_msg.Release()); MsgProcInit body; auto head = InitMsgHead(GetType(body), id(), ssn); @@ -149,7 +184,7 @@ SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); }; - Node node(new NodeInfo); + Node node(new NodeInfo(*this, shm)); if (UpdateRegInfo(node) && PrepareProcInit(node)) { reply |= (node->addrs_[ssn] << 4); nodes_[ssn] = node; @@ -175,6 +210,121 @@ { RecordMsg(msg); return socket.Send(dest, msg); +} + +NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) +{ + Node node; + auto ssn = mq_id - (mq_id % 10); + auto pos = nodes_.find(ssn); + if (pos != nodes_.end()) { + node = pos->second; + } + return node; +} + +bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) +{ + Node node; + + auto FindDest = [&]() { + auto pos = service_map_.find(head.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + node = cli.weak_node_.lock(); + if (node && Valid(*node)) { + dest.id_ = cli.mq_id_; + dest.offset_ = cli.mq_abs_addr_; + return true; + } + } + } + return false; + }; + + if (dest.id_ == 0) { + if (!FindDest()) { + LOG_ERROR() << id() << " pass remote request, topic dest not found."; + return false; + } + } else { + node = GetNode(dest.id_); + if (!node || !Valid(*node)) { + LOG_ERROR() << id() << " pass remote request, dest not found."; + return false; + } + } + + ShmSocket &sender(DefaultSender(node->shm_)); + auto route = head.add_route(); + route->set_mq_id(sender.id()); + route->set_abs_addr(sender.AbsAddr()); + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return sender.Send(dest, msg, head.msg_id(), std::move(cb)); +} + +bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) +{ + // LOG_FUNCTION; + auto &topic = head.topic(); + auto clients = DoFindClients(topic, true); + if (clients.empty()) { return true; } + + std::vector<MsgI> msgs; + auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; + DEFER1(ReleaseAll();); + + for (auto &cli : clients) { + auto Send1 = [&](Node node) { + auto &shm = node->shm_; + for (auto &msg : msgs) { + if (msg.shm().name() == shm.name()) { + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + return; + } + } + MsgI msg(shm); + if (msg.Make(head, body_content)) { + RecordMsg(msg); + msgs.push_back(msg); + // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + }; + auto node = cli.weak_node_.lock(); + if (node) { + Send1(node); + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + } + } + + return true; +} + +bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) +{ + Node node(GetNode(dest.id_)); + if (!node) { + LOG_ERROR() << id() << " pass remote reply , ssn not found."; + return false; + } + auto offset = node->addrs_[dest.id_]; + if (offset != dest.offset_) { + LOG_ERROR() << id() << " pass remote reply, dest address not match"; + return false; + } + + ShmMsg msg(node->shm_); + if (!msg.Make(head, body_content)) { return false; } + DEFER1(msg.Release();); + RecordMsg(msg); + return DefaultSender(node->shm_).Send(dest, msg); } void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) @@ -205,7 +355,7 @@ if (!FindMq()) { return; } auto size = GetAllocSize((val >> 52) & MaskBits(8)); - MsgI new_msg; + MsgI new_msg(socket.shm()); if (new_msg.Make(size)) { // 31bit proc index, 28bit id, ,4bit cmd+flag int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); @@ -248,7 +398,7 @@ auto &node = pos->second; try { for (int i = 0; i < msg.extra_mq_num(); ++i) { - ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); + ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate); node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); auto addr = reply.add_extra_mqs(); addr->set_mq_id(tmp.id()); @@ -271,33 +421,33 @@ MQId ssn = head.ssn_id(); // when node restart, ssn will change, // and old node will be removed after timeout. - auto UpdateRegInfo = [&](Node &node) { - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - }; - auto pos = nodes_.find(ssn); if (pos == nodes_.end()) { return MakeReply(eInvalidInput, "invalid session."); } - // update proc info - Node &node = pos->second; - UpdateRegInfo(node); - LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; - + // try to remove old session auto old = online_node_addr_map_.find(head.proc_id()); if (old != online_node_addr_map_.end()) { // old session auto &old_ssn = old->second; if (old_ssn != ssn) { - nodes_[old_ssn]->state_.PutOffline(offline_time_); + nodes_[old_ssn]->PutOffline(offline_time_); + LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; old_ssn = ssn; } } else { online_node_addr_map_.emplace(head.proc_id(), ssn); } + + // update proc info + Node &node = pos->second; + node->proc_.Swap(msg.mutable_proc()); + node->state_.timestamp_ = head.timestamp(); + node->UpdateState(NowSec(), offline_time_, kill_time_); + + LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; + return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); @@ -309,7 +459,7 @@ return HandleMsg( head, [&](Node node) -> MsgCommonReply { NodeInfo &ni = *node; - ni.state_.PutOffline(offline_time_); + ni.PutOffline(offline_time_); return MakeReply(eSuccess); }); } @@ -329,6 +479,7 @@ for (auto &topic : topics) { LOG_DEBUG() << "\t" << topic; } + Notify(kTopicNodeService, *node); return MakeReply(eSuccess); }); } @@ -338,7 +489,7 @@ return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; ni.state_.timestamp_ = head.timestamp(); - ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); + ni.UpdateState(NowSec(), offline_time_, kill_time_); auto &info = msg.proc(); if (!info.public_info().empty()) { @@ -350,44 +501,53 @@ return MakeReply(eSuccess); }); } -MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) + +MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id) { typedef MsgQueryProcReply Reply; - auto query = [&](Node self) -> Reply { - auto Add1 = [](Reply &reply, Node node) { - auto info = reply.add_proc_list(); - *info->mutable_proc() = node->proc_; - info->set_online(node->state_.flag_ == kStateNormal); - for (auto &addr_topics : node->services_) { + auto Add1 = [](Reply &reply, Node node) { + auto info = reply.add_proc_list(); + *info->mutable_proc() = node->proc_; + info->mutable_proc()->clear_private_info(); + info->set_online(node->state_.flag_ == kStateNormal); + auto AddTopics = [](auto &dst, auto &src) { + for (auto &addr_topics : src) { for (auto &topic : addr_topics.second) { - info->mutable_topics()->add_topic_list(topic); + dst.add_topic_list(topic); } } }; - - if (!req.proc_id().empty()) { - auto pos = online_node_addr_map_.find(req.proc_id()); - if (pos == online_node_addr_map_.end()) { - return MakeReply<Reply>(eNotFound, "proc not found."); - } else { - auto node_pos = nodes_.find(pos->second); - if (node_pos == nodes_.end()) { - return MakeReply<Reply>(eNotFound, "proc node not found."); - } else { - auto reply = MakeReply<Reply>(eSuccess); - Add1(reply, node_pos->second); - return reply; - } - } - } else { - Reply reply(MakeReply<Reply>(eSuccess)); - for (auto &kv : nodes_) { - Add1(reply, kv.second); - } - return reply; - } + AddTopics(*info->mutable_service(), node->services_); + AddTopics(*info->mutable_local_sub(), node->local_sub_); + AddTopics(*info->mutable_net_sub(), node->net_sub_); }; + if (!proc_id.empty()) { + auto pos = online_node_addr_map_.find(proc_id); + if (pos == online_node_addr_map_.end()) { + return MakeReply<Reply>(eNotFound, "proc not found."); + } else { + auto node_pos = nodes_.find(pos->second); + if (node_pos == nodes_.end()) { + return MakeReply<Reply>(eNotFound, "proc node not found."); + } else { + auto reply = MakeReply<Reply>(eSuccess); + Add1(reply, node_pos->second); + return reply; + } + } + } else { + Reply reply(MakeReply<Reply>(eSuccess)); + for (auto &kv : nodes_) { + Add1(reply, kv.second); + } + return reply; + } +} +MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) +{ + typedef MsgQueryProcReply Reply; + auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); }; return HandleMsg<Reply>(head, query); } @@ -396,57 +556,93 @@ typedef MsgQueryTopicReply Reply; auto query = [&](Node self) -> Reply { - auto pos = service_map_.find(req.topic()); - if (pos != service_map_.end() && !pos->second.empty()) { - auto &clients = pos->second; - Reply reply = MakeReply<Reply>(eSuccess); - for (auto &dest : clients) { - Node dest_node(dest.weak_node_.lock()); - if (dest_node && Valid(*dest_node)) { - auto node_addr = reply.add_node_address(); - node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_id_); - node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + Reply reply = MakeReply<Reply>(eSuccess); + auto local = [&]() { + auto pos = service_map_.find(req.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &dest : clients) { + Node dest_node(dest.weak_node_.lock()); + if (dest_node && Valid(*dest_node)) { + auto node_addr = reply.add_node_address(); + node_addr->set_proc_id(dest_node->proc_.proc_id()); + node_addr->mutable_addr()->set_mq_id(dest.mq_id_); + node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + } } + return true; + } else { + return false; } - return reply; - } else { + }; + auto net = [&]() { + auto hosts(FindRemoteRPCServers(req.topic())); + if (hosts.empty()) { + return false; + } else { + for (auto &ip : hosts) { + auto node_addr = reply.add_node_address(); + node_addr->mutable_addr()->set_ip(ip); + } + return true; + } + }; + local(); + net(); + if (reply.node_address_size() == 0) { return MakeReply<Reply>(eNotFound, "topic server not found."); + } else { + return reply; } }; return HandleMsg<Reply>(head, query); } +void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) +{ + auto src = SrcAddr(head); + auto Sub = [&](auto &sub, auto &sub_map) { + auto &topics = msg.topics().topic_list(); + sub[src].insert(topics.begin(), topics.end()); + const TopicDest &dest = {src, SrcAbsAddr(head), node}; + for (auto &topic : topics) { + sub_map[topic].insert(dest); + } + }; + if (msg.network()) { + Sub(net_sub_, center_.net_sub_map_); + center_.Notify(kTopicNodeSub, *this); + } else { + Sub(local_sub_, center_.local_sub_map_); + } +} + MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - subscribe_map_[topic].insert(dest); - } + node->Subscribe(head, msg, node); return MakeReply(eSuccess); }); } -MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) -{ - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto pos = node->subscriptions_.find(src); - auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end() && +void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) +{ + auto src = SrcAddr(head); + + auto Unsub = [&](auto &sub, auto &sub_map) { + auto pos = sub.find(src); + + auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { + auto pos = sub_map.find(topic); + if (pos != sub_map.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { - subscribe_map_.erase(pos); + sub_map.erase(pos); } }; - if (pos != node->subscriptions_.end()) { + if (pos != sub.end()) { const TopicDest &dest = {src, SrcAbsAddr(head), node}; auto &topics = msg.topics().topic_list(); // clear node sub records; @@ -455,26 +651,55 @@ RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { - node->subscriptions_.erase(pos); + sub.erase(pos); } } + }; + if (msg.network()) { + Unsub(net_sub_, center_.net_sub_map_); + center_.Notify(kTopicNodeUnsub, *this); + } else { + Unsub(local_sub_, center_.local_sub_map_); + } +} + +MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) +{ + return HandleMsg(head, [&](Node node) { + node->Unsubscribe(head, msg, node); return MakeReply(eSuccess); }); } -NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) +NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { + // LOG_FUNCTION; Clients dests; - auto Find1 = [&](const std::string &t) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end()) { - auto &clients = pos->second; - for (auto &cli : clients) { - if (Valid(cli.weak_node_)) { - dests.insert(cli); + auto Find1 = [&](const std::string &exact) { + auto FindIn = [&](auto &sub_map) { + auto pos = sub_map.find(exact); + if (pos != sub_map.end()) { + auto &clients = pos->second; + for (auto &cli : clients) { + auto node = cli.weak_node_.lock(); + if (node) { + if (node->state_.flag_ == kStateNormal) + dests.insert(cli); + } + + // if (Valid(cli.weak_node_)) { + // dests.insert(cli); + // } } } + }; + if (!from_remote) { + FindIn(local_sub_map_); + // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); } + // net subscripitions also work in local mode. + FindIn(net_sub_map_); + // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); }; Find1(topic); @@ -485,21 +710,37 @@ // Find1(std::string()); // sub all. break; } else { - Find1(topic.substr(0, pos)); + Find1(topic.substr(0, pos - 1)); } } return dests; } -bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) +MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) { - bool ret = false; - HandleMsg(head, [&](Node node) { - DoFindClients(msg.topic()).swap(out); - ret = true; + return HandleMsg(head, [&](Node node) { + DoPublish(DefaultSender(node->shm_), topic, msg); return MakeReply(eSuccess); - }).Swap(&reply); - return ret; + }); +} + +void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) +{ + try { + auto clients = DoFindClients(topic, false); + if (clients.empty()) { return; } + + for (auto &cli : clients) { + auto node = cli.weak_node_.lock(); + if (node) { + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + } + } catch (...) { + LOG_ERROR() << "DoPublish error."; + } } void NodeCenter::OnTimer() @@ -517,7 +758,7 @@ auto it = nodes_.begin(); while (it != nodes_.end()) { auto &cli = *it->second; - cli.state_.UpdateState(now, offline_time_, kill_time_); + cli.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { RemoveNode(it->second); it = nodes_.erase(it); @@ -545,7 +786,8 @@ } }; EraseMapRec(service_map_, node->services_); - EraseMapRec(subscribe_map_, node->subscriptions_); + EraseMapRec(local_sub_map_, node->local_sub_); + EraseMapRec(net_sub_map_, node->net_sub_); // remove online record. auto pos = online_node_addr_map_.find(node->proc_.proc_id()); @@ -556,8 +798,55 @@ } for (auto &addr : node->addrs_) { - cleaner_(addr.first); + auto &id = addr.first; + auto r = ShmSocket::Remove(node->shm_, id); + LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); } node->addrs_.clear(); +} + +void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) +{ + try { + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(content); + BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); + MsgI msg(shm); + if (msg.Make(head, pub)) { + DEFER1(msg.Release()); + RecordMsg(msg); + DoPublish(DefaultSender(shm), topic, msg); + } + + } catch (...) { + LOG_ERROR() << "center publish error."; + } +} + +void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) +{ + // LOG_FUNCTION; + sub_hosts_.clear(); + rpc_hosts_.clear(); + for (auto &host : info.array()) { + if (host.get("isLocal", false)) { + host_id_ = host.get("serverId", ""); + ip_ = host.get("ip", ""); + } else { + auto ip = host.get("ip", ""); + auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { + for (auto &topic : lot) { + auto t = topic.get_value<std::string>(); + rec[t].insert(ip); + // LOG_DEBUG() << "net topic: " << t << ", " << ip; + } + }; + // LOG_DEBUG() << "serives:"; + UpdateRec(host.child("pubTopics").array(), rpc_hosts_); + // LOG_DEBUG() << "net sub:"; + UpdateRec(host.child("netSubTopics").array(), sub_hosts_); + } + } } \ No newline at end of file -- Gitblit v1.8.0