From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- box/tcp_connection.cpp | 2 utest/api_test.cpp | 91 ++++++++++++----- box/center_topic_node.cpp | 29 +++++ src/bh_api.cc | 4 box/center.cpp | 2 api/bhsgo/bhome_node.go | 5 + src/topic_node.h | 10 + box/node_center.h | 32 ++++++ src/bh_api.h | 5 + box/node_center.cpp | 82 ++++++++++++--- src/exported_symbols | 1 src/topic_node.cpp | 3 12 files changed, 210 insertions(+), 56 deletions(-) diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go index 9d66814..09d571b 100644 --- a/api/bhsgo/bhome_node.go +++ b/api/bhsgo/bhome_node.go @@ -47,6 +47,11 @@ return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) } +func SubscribeNet(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := topics.Marshal() + return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeNetTopics), data, reply, timeout_ms) +} + func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { data, _ := proc.Marshal() return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) diff --git a/box/center.cpp b/box/center.cpp index 78135d1..e0abbb3 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -146,7 +146,7 @@ replyer(reply); auto hosts = center->FindRemoteSubClients(pub.topic()); for (auto &host : hosts) { - tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString()); + tcp_proxy.Publish(host, kBHCenterPort, msg.content()); } } }; diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp index 3c4f369..1f4103f 100644 --- a/box/center_topic_node.cpp +++ b/box/center_topic_node.cpp @@ -30,6 +30,7 @@ namespace { const std::string &kTopicQueryProc = "#center_query_procs"; +const std::string &kTopicNotifyRemoteInfo = "pub-allRegisterInfo-to-center"; std::string ToJson(const MsgQueryProcReply &qpr) { @@ -92,10 +93,16 @@ throw std::runtime_error("center node register failed."); } - MsgTopicList topics; - topics.add_topic_list(kTopicQueryProc); - if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) { + MsgTopicList services; + services.add_topic_list(kTopicQueryProc); + if (!pnode_->DoServerRegisterRPC(true, services, reply, timeout)) { throw std::runtime_error("center node register topics failed."); + } + MsgTopicList subs; + + subs.add_topic_list(kTopicNotifyRemoteInfo); + if (!pnode_->Subscribe(subs, reply, timeout)) { + throw std::runtime_error("center node subscribe topics failed."); } auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) { @@ -117,6 +124,20 @@ pnode_->ServerSendReply(src_info, reply); }; + auto OnSubRecv = [&](const std::string &proc_id, const MsgPublish &data) { + if (data.topic() == kTopicNotifyRemoteInfo) { + // parse other data. + // LOG_DEBUG() << "center got net info."; + ssjson::Json js; + if (js.parse(data.data())) { + if (js.is_array()) { + auto ¢er = *pscenter_; + center->ParseNetInfo(js); + } + } + } + }; + bool cur = false; if (run_.compare_exchange_strong(cur, true)) { auto heartbeat = [this]() { @@ -126,7 +147,7 @@ } }; std::thread(heartbeat).swap(worker_); - return pnode_->ServerStart(onRequest); + return pnode_->ServerStart(onRequest) && pnode_->SubscribeStartWorker(OnSubRecv); } else { return false; } diff --git a/box/node_center.cpp b/box/node_center.cpp index 068aa00..77bfac1 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -270,6 +270,7 @@ bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) { + // LOG_FUNCTION; auto &topic = head.topic(); auto clients = DoFindClients(topic, true); if (clients.empty()) { return true; } @@ -288,9 +289,10 @@ } } MsgI msg(shm); - if (msg.Make(body_content)) { + if (msg.Make(head, body_content)) { RecordMsg(msg); msgs.push_back(msg); + // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } }; @@ -554,22 +556,43 @@ typedef MsgQueryTopicReply Reply; auto query = [&](Node self) -> Reply { - auto pos = service_map_.find(req.topic()); - if (pos != service_map_.end() && !pos->second.empty()) { - auto &clients = pos->second; - Reply reply = MakeReply<Reply>(eSuccess); - for (auto &dest : clients) { - Node dest_node(dest.weak_node_.lock()); - if (dest_node && Valid(*dest_node)) { - auto node_addr = reply.add_node_address(); - node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_id_); - node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + Reply reply = MakeReply<Reply>(eSuccess); + auto local = [&]() { + auto pos = service_map_.find(req.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &dest : clients) { + Node dest_node(dest.weak_node_.lock()); + if (dest_node && Valid(*dest_node)) { + auto node_addr = reply.add_node_address(); + node_addr->set_proc_id(dest_node->proc_.proc_id()); + node_addr->mutable_addr()->set_mq_id(dest.mq_id_); + node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); + } } + return true; + } else { + return false; } - return reply; - } else { + }; + auto net = [&]() { + auto hosts(FindRemoteRPCServers(req.topic())); + if (hosts.empty()) { + return false; + } else { + for (auto &ip : hosts) { + auto node_addr = reply.add_node_address(); + node_addr->mutable_addr()->set_ip(ip); + } + return true; + } + }; + local(); + net(); + if (reply.node_address_size() == 0) { return MakeReply<Reply>(eNotFound, "topic server not found."); + } else { + return reply; } }; @@ -587,7 +610,6 @@ sub_map[topic].insert(dest); } }; - LOG_DEBUG() << "subscribe net : " << msg.network(); if (msg.network()) { Sub(net_sub_, center_.net_sub_map_); center_.Notify(kTopicNodeSub, *this); @@ -651,6 +673,7 @@ NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { + // LOG_FUNCTION; Clients dests; auto Find1 = [&](const std::string &exact) { auto FindIn = [&](auto &sub_map) { @@ -666,8 +689,11 @@ }; if (!from_remote) { FindIn(local_sub_map_); + // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); } + // net subscripitions also work in local mode. FindIn(net_sub_map_); + // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); }; Find1(topic); @@ -793,8 +819,28 @@ } } -std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) +void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) { - //TODO search synced full list; - return std::vector<std::string>(); + // LOG_FUNCTION; + sub_hosts_.clear(); + rpc_hosts_.clear(); + for (auto &host : info.array()) { + if (host.get("isLocal", false)) { + host_id_ = host.get("serverId", ""); + ip_ = host.get("ip", ""); + } else { + auto ip = host.get("ip", ""); + auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { + for (auto &topic : lot) { + auto t = topic.get_value<std::string>(); + rec[t].insert(ip); + // LOG_DEBUG() << "net topic: " << t << ", " << ip; + } + }; + // LOG_DEBUG() << "serives:"; + UpdateRec(host.child("pubTopics").array(), rpc_hosts_); + // LOG_DEBUG() << "net sub:"; + UpdateRec(host.child("netSubTopics").array(), sub_hosts_); + } + } } \ No newline at end of file diff --git a/box/node_center.h b/box/node_center.h index ae5b075..b6ceab5 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -18,6 +18,7 @@ #ifndef NODE_CENTER_KY67RJ1Q #define NODE_CENTER_KY67RJ1Q +#include "json.h" #include "shm_socket.h" #include <unordered_map> @@ -188,7 +189,9 @@ void OnTimer(); // remote hosts records - std::vector<std::string> FindRemoteSubClients(const Topic &topic); + std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); } + std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); } + void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); } private: void CheckNodes(); @@ -219,6 +222,33 @@ int64_t offline_time_; int64_t kill_time_; int64_t last_check_time_; + + // net hosts info + class NetRecords + { + public: + typedef std::set<std::string> Hosts; + void ParseData(const ssjson::Json &input); + Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); } + Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); } + + private: + typedef std::unordered_map<Topic, Hosts> TopicMap; + TopicMap sub_hosts_; + TopicMap rpc_hosts_; + Hosts FindHosts(const Topic &topic, const TopicMap &tmap) + { + auto pos = tmap.find(topic); + if (pos != tmap.end()) { + return pos->second; + } else { + return Hosts(); + } + } + std::string host_id_; + std::string ip_; + }; + NetRecords net_records_; }; #endif // end of include guard: NODE_CENTER_KY67RJ1Q diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 6506369..8f0fe86 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -63,7 +63,7 @@ if (4 > len) { return false; } uint32_t head_len = Get32(p); if (head_len > 1024 * 4) { - throw std::runtime_error("unexpected tcp reply data."); + throw std::runtime_error("unexpected tcp data head."); } auto before_body = 4 + head_len + 4; if (before_body > len) { diff --git a/src/bh_api.cc b/src/bh_api.cc index 3dafe7a..6bcf45c 100644 --- a/src/bh_api.cc +++ b/src/bh_api.cc @@ -226,6 +226,10 @@ { return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } +int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::SubscribeNet, topics, topics_len, reply, reply_len, timeout_ms); +} int BHPublish(const void *msgpub, const int msgpub_len, diff --git a/src/bh_api.h b/src/bh_api.h index 3b77da5..8178e55 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -57,6 +57,11 @@ void **reply, int *reply_len, const int timeout_ms); +int BHSubscribeNetTopics(const void *topics, + const int topics_len, + void **reply, + int *reply_len, + const int timeout_ms); typedef void (*FSubDataCallback)(const void *proc_id, int proc_id_len, diff --git a/src/exported_symbols b/src/exported_symbols index addfadc..4867769 100644 --- a/src/exported_symbols +++ b/src/exported_symbols @@ -7,6 +7,7 @@ BHQueryTopicAddress; BHQueryProcs; BHSubscribeTopics; + BHSubscribeNetTopics; BHStartWorker; BHHeartbeatEasy; BHHeartbeat; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 6f98694..6096fbb 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -649,7 +649,7 @@ // subscribe -bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) +bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); @@ -659,6 +659,7 @@ try { auto &sock = SockSub(); MsgSubscribe sub; + sub.set_network(net); sub.mutable_topics()->Swap(&topics); BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); diff --git a/src/topic_node.h b/src/topic_node.h index 1425844..9e7eed2 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -73,7 +73,15 @@ // subscribe typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB; bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2); - bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms); + bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) + { + return DoSubscribe(topics, false, reply_body, timeout_ms); + } + bool SubscribeNet(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) + { + return DoSubscribe(topics, true, reply_body, timeout_ms); + } + bool DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms); bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 239ea8b..13a552d 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -16,6 +16,7 @@ * ===================================================================================== */ #include "bh_api.h" +#include "json.h" #include "robust.h" #include "util.h" #include <atomic> @@ -330,17 +331,27 @@ } { // Subscribe - MsgTopicList topics; - topics.add_topic_list("#center.node"); - for (int i = 0; i < 10; ++i) { - topics.add_topic_list(topic_ + std::to_string(i * 2)); - } - std::string s = topics.SerializeAsString(); - void *reply = 0; - int reply_len = 0; - bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); - BHFree(reply, reply_len); - printf("subscribe topic : %s\n", r ? "ok" : "failed"); + auto Subscribe = [&](std::string topic, bool net) { + MsgTopicList topics; + topics.add_topic_list(topic); + for (int i = 0; i < 10; ++i) { + topics.add_topic_list(topic_ + std::to_string(i * 2)); + } + std::string s = topics.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = false; + if (net) { + r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000); + } else { + r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); + } + BHFree(reply, reply_len); + printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed")); + }; + Subscribe("#center.node", false); + Subscribe("local0", false); + Subscribe("net0", true); } auto ServerLoop = [&](std::atomic<bool> *run) { @@ -368,14 +379,47 @@ } }; + std::atomic<bool> run(true); + ThreadManager threads; +#if 1 + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); +#else + BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); + threads.Launch(ServerLoop, &run); +#endif + + auto Publish = [&](const std::string &topic, const std::string &data) { + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data); + std::string s(pub.SerializeAsString()); + BHPublish(s.data(), s.size(), 0); + }; + { + // publish + Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a')); for (int i = 0; i < 1; ++i) { - MsgPublish pub; - pub.set_topic(topic_ + std::to_string(i)); - pub.set_data("pub_data_" + std::string(104 * 1, 'a')); - std::string s(pub.SerializeAsString()); - BHPublish(s.data(), s.size(), 0); - // Sleep(1s); + + ssjson::Json net = ssjson::Json::Array(); + ssjson::Json host; + host.put("serverId", "test_host"); + host.put("ip", "127.0.0.1"); + ssjson::Json topics = ssjson::Json::Array(); + topics.push_back("aaaaa"); + topics.push_back("bbbbb"); + host.put("pubTopics", topics); + topics.push_back("net0"); + topics.push_back("net1"); + host.put("netSubTopics", topics); + net.push_back(host); + + Publish("pub-allRegisterInfo-to-center", net.dump()); + Sleep(1s); + Publish("local0", "local-abcd0"); + Publish("net0", "net-abcd0"); + Publish("local0", "local-abcd1"); + Sleep(1s); } } @@ -428,22 +472,11 @@ } }; - std::atomic<bool> run(true); - - ThreadManager threads; - -#if 1 - BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); -#else - BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); - threads.Launch(ServerLoop, &run); -#endif - boost::timer::auto_cpu_timer timer; threads.Launch(hb, &run); threads.Launch(showStatus, &run); int ncli = 10; - const int64_t nreq = 1000 * 100; + const int64_t nreq = 1000; //* 100; for (int i = 0; i < 10; ++i) { SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i)); -- Gitblit v1.8.0