From 056f71f24cefaf88f2a93714c6678c03ed5f1e0e Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 02 七月 2021 16:54:33 +0800
Subject: [PATCH] fixed to adapt gcc-5.4 & glibc-2.25
---
box/node_center.cpp | 92 ++++++++++++++++++++++++++++++++++++----------
1 files changed, 72 insertions(+), 20 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 068aa00..76407a8 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -270,6 +270,7 @@
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
+ // LOG_FUNCTION;
auto &topic = head.topic();
auto clients = DoFindClients(topic, true);
if (clients.empty()) { return true; }
@@ -288,9 +289,10 @@
}
}
MsgI msg(shm);
- if (msg.Make(body_content)) {
+ if (msg.Make(head, body_content)) {
RecordMsg(msg);
msgs.push_back(msg);
+ // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
}
};
@@ -554,22 +556,43 @@
typedef MsgQueryTopicReply Reply;
auto query = [&](Node self) -> Reply {
- auto pos = service_map_.find(req.topic());
- if (pos != service_map_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- Reply reply = MakeReply<Reply>(eSuccess);
- for (auto &dest : clients) {
- Node dest_node(dest.weak_node_.lock());
- if (dest_node && Valid(*dest_node)) {
- auto node_addr = reply.add_node_address();
- node_addr->set_proc_id(dest_node->proc_.proc_id());
- node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
- node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+ Reply reply = MakeReply<Reply>(eSuccess);
+ auto local = [&]() {
+ auto pos = service_map_.find(req.topic());
+ if (pos != service_map_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &dest : clients) {
+ Node dest_node(dest.weak_node_.lock());
+ if (dest_node && Valid(*dest_node)) {
+ auto node_addr = reply.add_node_address();
+ node_addr->set_proc_id(dest_node->proc_.proc_id());
+ node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+ node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+ }
}
+ return true;
+ } else {
+ return false;
}
- return reply;
- } else {
+ };
+ auto net = [&]() {
+ auto hosts(FindRemoteRPCServers(req.topic()));
+ if (hosts.empty()) {
+ return false;
+ } else {
+ for (auto &ip : hosts) {
+ auto node_addr = reply.add_node_address();
+ node_addr->mutable_addr()->set_ip(ip);
+ }
+ return true;
+ }
+ };
+ local();
+ net();
+ if (reply.node_address_size() == 0) {
return MakeReply<Reply>(eNotFound, "topic server not found.");
+ } else {
+ return reply;
}
};
@@ -587,7 +610,6 @@
sub_map[topic].insert(dest);
}
};
- LOG_DEBUG() << "subscribe net : " << msg.network();
if (msg.network()) {
Sub(net_sub_, center_.net_sub_map_);
center_.Notify(kTopicNodeSub, *this);
@@ -651,6 +673,7 @@
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
+ // LOG_FUNCTION;
Clients dests;
auto Find1 = [&](const std::string &exact) {
auto FindIn = [&](auto &sub_map) {
@@ -658,16 +681,25 @@
if (pos != sub_map.end()) {
auto &clients = pos->second;
for (auto &cli : clients) {
- if (Valid(cli.weak_node_)) {
- dests.insert(cli);
+ auto node = cli.weak_node_.lock();
+ if (node) {
+ if (node->state_.flag_ == kStateNormal)
+ dests.insert(cli);
}
+
+ // if (Valid(cli.weak_node_)) {
+ // dests.insert(cli);
+ // }
}
}
};
if (!from_remote) {
FindIn(local_sub_map_);
+ // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
}
+ // net subscripitions also work in local mode.
FindIn(net_sub_map_);
+ // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
};
Find1(topic);
@@ -793,8 +825,28 @@
}
}
-std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
+void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
{
- //TODO search synced full list;
- return std::vector<std::string>();
+ // LOG_FUNCTION;
+ sub_hosts_.clear();
+ rpc_hosts_.clear();
+ for (auto &host : info.array()) {
+ if (host.get("isLocal", false)) {
+ host_id_ = host.get("serverId", "");
+ ip_ = host.get("ip", "");
+ } else {
+ auto ip = host.get("ip", "");
+ auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
+ for (auto &topic : lot) {
+ auto t = topic.get_value<std::string>();
+ rec[t].insert(ip);
+ // LOG_DEBUG() << "net topic: " << t << ", " << ip;
+ }
+ };
+ // LOG_DEBUG() << "serives:";
+ UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
+ // LOG_DEBUG() << "net sub:";
+ UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
+ }
+ }
}
\ No newline at end of file
--
Gitblit v1.8.0