From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 30 六月 2021 11:15:53 +0800
Subject: [PATCH] support tcp pub/sub.
---
box/tcp_connection.cpp | 2
utest/api_test.cpp | 91 ++++++++++++-----
box/center_topic_node.cpp | 29 +++++
src/bh_api.cc | 4
box/center.cpp | 2
api/bhsgo/bhome_node.go | 5 +
src/topic_node.h | 10 +
box/node_center.h | 32 ++++++
src/bh_api.h | 5 +
box/node_center.cpp | 82 ++++++++++++---
src/exported_symbols | 1
src/topic_node.cpp | 3
12 files changed, 210 insertions(+), 56 deletions(-)
diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go
index 9d66814..09d571b 100644
--- a/api/bhsgo/bhome_node.go
+++ b/api/bhsgo/bhome_node.go
@@ -47,6 +47,11 @@
return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
}
+func SubscribeNet(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+ data, _ := topics.Marshal()
+ return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeNetTopics), data, reply, timeout_ms)
+}
+
func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
data, _ := proc.Marshal()
return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
diff --git a/box/center.cpp b/box/center.cpp
index 78135d1..e0abbb3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -146,7 +146,7 @@
replyer(reply);
auto hosts = center->FindRemoteSubClients(pub.topic());
for (auto &host : hosts) {
- tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
+ tcp_proxy.Publish(host, kBHCenterPort, msg.content());
}
}
};
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 3c4f369..1f4103f 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -30,6 +30,7 @@
namespace
{
const std::string &kTopicQueryProc = "#center_query_procs";
+const std::string &kTopicNotifyRemoteInfo = "pub-allRegisterInfo-to-center";
std::string ToJson(const MsgQueryProcReply &qpr)
{
@@ -92,10 +93,16 @@
throw std::runtime_error("center node register failed.");
}
- MsgTopicList topics;
- topics.add_topic_list(kTopicQueryProc);
- if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
+ MsgTopicList services;
+ services.add_topic_list(kTopicQueryProc);
+ if (!pnode_->DoServerRegisterRPC(true, services, reply, timeout)) {
throw std::runtime_error("center node register topics failed.");
+ }
+ MsgTopicList subs;
+
+ subs.add_topic_list(kTopicNotifyRemoteInfo);
+ if (!pnode_->Subscribe(subs, reply, timeout)) {
+ throw std::runtime_error("center node subscribe topics failed.");
}
auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
@@ -117,6 +124,20 @@
pnode_->ServerSendReply(src_info, reply);
};
+ auto OnSubRecv = [&](const std::string &proc_id, const MsgPublish &data) {
+ if (data.topic() == kTopicNotifyRemoteInfo) {
+ // parse other data.
+ // LOG_DEBUG() << "center got net info.";
+ ssjson::Json js;
+ if (js.parse(data.data())) {
+ if (js.is_array()) {
+ auto ¢er = *pscenter_;
+ center->ParseNetInfo(js);
+ }
+ }
+ }
+ };
+
bool cur = false;
if (run_.compare_exchange_strong(cur, true)) {
auto heartbeat = [this]() {
@@ -126,7 +147,7 @@
}
};
std::thread(heartbeat).swap(worker_);
- return pnode_->ServerStart(onRequest);
+ return pnode_->ServerStart(onRequest) && pnode_->SubscribeStartWorker(OnSubRecv);
} else {
return false;
}
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 068aa00..77bfac1 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -270,6 +270,7 @@
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
+ // LOG_FUNCTION;
auto &topic = head.topic();
auto clients = DoFindClients(topic, true);
if (clients.empty()) { return true; }
@@ -288,9 +289,10 @@
}
}
MsgI msg(shm);
- if (msg.Make(body_content)) {
+ if (msg.Make(head, body_content)) {
RecordMsg(msg);
msgs.push_back(msg);
+ // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
}
};
@@ -554,22 +556,43 @@
typedef MsgQueryTopicReply Reply;
auto query = [&](Node self) -> Reply {
- auto pos = service_map_.find(req.topic());
- if (pos != service_map_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- Reply reply = MakeReply<Reply>(eSuccess);
- for (auto &dest : clients) {
- Node dest_node(dest.weak_node_.lock());
- if (dest_node && Valid(*dest_node)) {
- auto node_addr = reply.add_node_address();
- node_addr->set_proc_id(dest_node->proc_.proc_id());
- node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
- node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+ Reply reply = MakeReply<Reply>(eSuccess);
+ auto local = [&]() {
+ auto pos = service_map_.find(req.topic());
+ if (pos != service_map_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &dest : clients) {
+ Node dest_node(dest.weak_node_.lock());
+ if (dest_node && Valid(*dest_node)) {
+ auto node_addr = reply.add_node_address();
+ node_addr->set_proc_id(dest_node->proc_.proc_id());
+ node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+ node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+ }
}
+ return true;
+ } else {
+ return false;
}
- return reply;
- } else {
+ };
+ auto net = [&]() {
+ auto hosts(FindRemoteRPCServers(req.topic()));
+ if (hosts.empty()) {
+ return false;
+ } else {
+ for (auto &ip : hosts) {
+ auto node_addr = reply.add_node_address();
+ node_addr->mutable_addr()->set_ip(ip);
+ }
+ return true;
+ }
+ };
+ local();
+ net();
+ if (reply.node_address_size() == 0) {
return MakeReply<Reply>(eNotFound, "topic server not found.");
+ } else {
+ return reply;
}
};
@@ -587,7 +610,6 @@
sub_map[topic].insert(dest);
}
};
- LOG_DEBUG() << "subscribe net : " << msg.network();
if (msg.network()) {
Sub(net_sub_, center_.net_sub_map_);
center_.Notify(kTopicNodeSub, *this);
@@ -651,6 +673,7 @@
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
+ // LOG_FUNCTION;
Clients dests;
auto Find1 = [&](const std::string &exact) {
auto FindIn = [&](auto &sub_map) {
@@ -666,8 +689,11 @@
};
if (!from_remote) {
FindIn(local_sub_map_);
+ // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
}
+ // net subscripitions also work in local mode.
FindIn(net_sub_map_);
+ // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
};
Find1(topic);
@@ -793,8 +819,28 @@
}
}
-std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
+void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
{
- //TODO search synced full list;
- return std::vector<std::string>();
+ // LOG_FUNCTION;
+ sub_hosts_.clear();
+ rpc_hosts_.clear();
+ for (auto &host : info.array()) {
+ if (host.get("isLocal", false)) {
+ host_id_ = host.get("serverId", "");
+ ip_ = host.get("ip", "");
+ } else {
+ auto ip = host.get("ip", "");
+ auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
+ for (auto &topic : lot) {
+ auto t = topic.get_value<std::string>();
+ rec[t].insert(ip);
+ // LOG_DEBUG() << "net topic: " << t << ", " << ip;
+ }
+ };
+ // LOG_DEBUG() << "serives:";
+ UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
+ // LOG_DEBUG() << "net sub:";
+ UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
+ }
+ }
}
\ No newline at end of file
diff --git a/box/node_center.h b/box/node_center.h
index ae5b075..b6ceab5 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -18,6 +18,7 @@
#ifndef NODE_CENTER_KY67RJ1Q
#define NODE_CENTER_KY67RJ1Q
+#include "json.h"
#include "shm_socket.h"
#include <unordered_map>
@@ -188,7 +189,9 @@
void OnTimer();
// remote hosts records
- std::vector<std::string> FindRemoteSubClients(const Topic &topic);
+ std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); }
+ std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); }
+ void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); }
private:
void CheckNodes();
@@ -219,6 +222,33 @@
int64_t offline_time_;
int64_t kill_time_;
int64_t last_check_time_;
+
+ // net hosts info
+ class NetRecords
+ {
+ public:
+ typedef std::set<std::string> Hosts;
+ void ParseData(const ssjson::Json &input);
+ Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); }
+ Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); }
+
+ private:
+ typedef std::unordered_map<Topic, Hosts> TopicMap;
+ TopicMap sub_hosts_;
+ TopicMap rpc_hosts_;
+ Hosts FindHosts(const Topic &topic, const TopicMap &tmap)
+ {
+ auto pos = tmap.find(topic);
+ if (pos != tmap.end()) {
+ return pos->second;
+ } else {
+ return Hosts();
+ }
+ }
+ std::string host_id_;
+ std::string ip_;
+ };
+ NetRecords net_records_;
};
#endif // end of include guard: NODE_CENTER_KY67RJ1Q
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 6506369..8f0fe86 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -63,7 +63,7 @@
if (4 > len) { return false; }
uint32_t head_len = Get32(p);
if (head_len > 1024 * 4) {
- throw std::runtime_error("unexpected tcp reply data.");
+ throw std::runtime_error("unexpected tcp data head.");
}
auto before_body = 4 + head_len + 4;
if (before_body > len) {
diff --git a/src/bh_api.cc b/src/bh_api.cc
index 3dafe7a..6bcf45c 100644
--- a/src/bh_api.cc
+++ b/src/bh_api.cc
@@ -226,6 +226,10 @@
{
return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
+int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ return BHApi_In1_Out1<MsgTopicList>(&TopicNode::SubscribeNet, topics, topics_len, reply, reply_len, timeout_ms);
+}
int BHPublish(const void *msgpub,
const int msgpub_len,
diff --git a/src/bh_api.h b/src/bh_api.h
index 3b77da5..8178e55 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -57,6 +57,11 @@
void **reply,
int *reply_len,
const int timeout_ms);
+int BHSubscribeNetTopics(const void *topics,
+ const int topics_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
int proc_id_len,
diff --git a/src/exported_symbols b/src/exported_symbols
index addfadc..4867769 100644
--- a/src/exported_symbols
+++ b/src/exported_symbols
@@ -7,6 +7,7 @@
BHQueryTopicAddress;
BHQueryProcs;
BHSubscribeTopics;
+ BHSubscribeNetTopics;
BHStartWorker;
BHHeartbeatEasy;
BHHeartbeat;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 6f98694..6096fbb 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -649,7 +649,7 @@
// subscribe
-bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
SetLastError(eNotRegistered, kErrMsgNotRegistered);
@@ -659,6 +659,7 @@
try {
auto &sock = SockSub();
MsgSubscribe sub;
+ sub.set_network(net);
sub.mutable_topics()->Swap(&topics);
BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
diff --git a/src/topic_node.h b/src/topic_node.h
index 1425844..9e7eed2 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -73,7 +73,15 @@
// subscribe
typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
- bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
+ bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+ {
+ return DoSubscribe(topics, false, reply_body, timeout_ms);
+ }
+ bool SubscribeNet(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+ {
+ return DoSubscribe(topics, true, reply_body, timeout_ms);
+ }
+ bool DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms);
bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 239ea8b..13a552d 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -16,6 +16,7 @@
* =====================================================================================
*/
#include "bh_api.h"
+#include "json.h"
#include "robust.h"
#include "util.h"
#include <atomic>
@@ -330,17 +331,27 @@
}
{ // Subscribe
- MsgTopicList topics;
- topics.add_topic_list("#center.node");
- for (int i = 0; i < 10; ++i) {
- topics.add_topic_list(topic_ + std::to_string(i * 2));
- }
- std::string s = topics.SerializeAsString();
- void *reply = 0;
- int reply_len = 0;
- bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
- BHFree(reply, reply_len);
- printf("subscribe topic : %s\n", r ? "ok" : "failed");
+ auto Subscribe = [&](std::string topic, bool net) {
+ MsgTopicList topics;
+ topics.add_topic_list(topic);
+ for (int i = 0; i < 10; ++i) {
+ topics.add_topic_list(topic_ + std::to_string(i * 2));
+ }
+ std::string s = topics.SerializeAsString();
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = false;
+ if (net) {
+ r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+ } else {
+ r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+ }
+ BHFree(reply, reply_len);
+ printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed"));
+ };
+ Subscribe("#center.node", false);
+ Subscribe("local0", false);
+ Subscribe("net0", true);
}
auto ServerLoop = [&](std::atomic<bool> *run) {
@@ -368,14 +379,47 @@
}
};
+ std::atomic<bool> run(true);
+ ThreadManager threads;
+#if 1
+ BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+#else
+ BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
+ threads.Launch(ServerLoop, &run);
+#endif
+
+ auto Publish = [&](const std::string &topic, const std::string &data) {
+ MsgPublish pub;
+ pub.set_topic(topic);
+ pub.set_data(data);
+ std::string s(pub.SerializeAsString());
+ BHPublish(s.data(), s.size(), 0);
+ };
+
{
+ // publish
+ Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a'));
for (int i = 0; i < 1; ++i) {
- MsgPublish pub;
- pub.set_topic(topic_ + std::to_string(i));
- pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
- std::string s(pub.SerializeAsString());
- BHPublish(s.data(), s.size(), 0);
- // Sleep(1s);
+
+ ssjson::Json net = ssjson::Json::Array();
+ ssjson::Json host;
+ host.put("serverId", "test_host");
+ host.put("ip", "127.0.0.1");
+ ssjson::Json topics = ssjson::Json::Array();
+ topics.push_back("aaaaa");
+ topics.push_back("bbbbb");
+ host.put("pubTopics", topics);
+ topics.push_back("net0");
+ topics.push_back("net1");
+ host.put("netSubTopics", topics);
+ net.push_back(host);
+
+ Publish("pub-allRegisterInfo-to-center", net.dump());
+ Sleep(1s);
+ Publish("local0", "local-abcd0");
+ Publish("net0", "net-abcd0");
+ Publish("local0", "local-abcd1");
+ Sleep(1s);
}
}
@@ -428,22 +472,11 @@
}
};
- std::atomic<bool> run(true);
-
- ThreadManager threads;
-
-#if 1
- BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
-#else
- BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
- threads.Launch(ServerLoop, &run);
-#endif
-
boost::timer::auto_cpu_timer timer;
threads.Launch(hb, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
- const int64_t nreq = 1000 * 100;
+ const int64_t nreq = 1000; //* 100;
for (int i = 0; i < 10; ++i) {
SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));
--
Gitblit v1.8.0