From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 23 六月 2021 19:43:29 +0800 Subject: [PATCH] refactor, start tcp pub/sub. --- box/tcp_connection.cpp | 11 ++ proto/source/bhome_msg_api.proto | 4 utest/api_test.cpp | 15 ++ box/center_topic_node.cpp | 16 ++- box/center.cpp | 23 +--- box/tcp_proxy.h | 1 proto/source/bhome_msg.proto | 2 box/node_center.h | 24 +++- box/tcp_proxy.cpp | 5 + box/node_center.cpp | 184 +++++++++++++++++++++++++++--------- src/topic_node.cpp | 1 11 files changed, 205 insertions(+), 81 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0e4c40b..78135d1 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -135,27 +135,18 @@ auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; - auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; - NodeCenter::Clients clients; - MsgCommonReply reply; - if (head.route_size() != 1 || !msg.ParseBody(pub)) { - return; - } else if (!center->FindClients(head, pub, clients, reply)) { + if (head.route_size() == 1 && msg.ParseBody(pub)) { + // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? + auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); - } else { - replyer(MakeReply(eSuccess)); - if (clients.empty()) { return; } - for (auto &cli : clients) { - auto node = cli.weak_node_.lock(); - if (node) { - // should also make sure that mq is not killed before msg expires. - // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - } + auto hosts = center->FindRemoteSubClients(pub.topic()); + for (auto &host : hosts) { + tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString()); } } }; diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp index 8228992..3c4f369 100644 --- a/box/center_topic_node.cpp +++ b/box/center_topic_node.cpp @@ -43,11 +43,17 @@ proc.put("name", info.proc().name()); proc.put("publicInfo", info.proc().public_info()); proc.put("online", info.online()); - Json topics = Json::Array(); - for (auto &t : info.topics().topic_list()) { - topics.push_back(t); - } - proc.put("topics", topics); + auto AddTopics = [&](auto &name, auto &topic_list) { + Json topics = Json::Array(); + for (auto &t : topic_list) { + topics.push_back(t); + } + proc.put(name, topics); + }; + AddTopics("service", info.service().topic_list()); + AddTopics("local_sub", info.local_sub().topic_list()); + AddTopics("net_sub", info.net_sub().topic_list()); + list.push_back(proc); } return json.dump(0); diff --git a/box/node_center.cpp b/box/node_center.cpp index 5c24409..c32b197 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -267,6 +267,43 @@ return sender.Send(dest, msg, head.msg_id(), std::move(cb)); } +bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) +{ + auto &topic = head.topic(); + auto clients = DoFindClients(topic, true); + if (clients.empty()) { return true; } + + std::vector<MsgI> msgs; + auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; + DEFER1(ReleaseAll();); + + for (auto &cli : clients) { + auto Send1 = [&](Node node) { + auto &shm = node->shm_; + for (auto &msg : msgs) { + if (msg.shm().name() == shm.name()) { + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + return; + } + } + MsgI msg(shm); + if (msg.Make(body_content)) { + RecordMsg(msg); + msgs.push_back(msg); + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + }; + auto node = cli.weak_node_.lock(); + if (node) { + Send1(node); + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + } + } + + return true; +} + bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) { Node node(GetNode(dest.id_)); @@ -469,11 +506,16 @@ *info->mutable_proc() = node->proc_; info->mutable_proc()->clear_private_info(); info->set_online(node->state_.flag_ == kStateNormal); - for (auto &addr_topics : node->services_) { - for (auto &topic : addr_topics.second) { - info->mutable_topics()->add_topic_list(topic); + auto AddTopics = [](auto &dst, auto &src) { + for (auto &addr_topics : src) { + for (auto &topic : addr_topics.second) { + dst.add_topic_list(topic); + } } - } + }; + AddTopics(*info->mutable_service(), node->services_); + AddTopics(*info->mutable_local_sub(), node->local_sub_); + AddTopics(*info->mutable_net_sub(), node->net_sub_); }; if (!proc_id.empty()) { @@ -532,35 +574,50 @@ return HandleMsg<Reply>(head, query); } +void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) +{ + auto src = SrcAddr(head); + auto Sub = [&](auto &sub, auto &sub_map) { + auto &topics = msg.topics().topic_list(); + sub[src].insert(topics.begin(), topics.end()); + const TopicDest &dest = {src, SrcAbsAddr(head), node}; + for (auto &topic : topics) { + sub_map[topic].insert(dest); + } + }; + LOG_DEBUG() << "subscribe net : " << msg.network(); + if (msg.network()) { + Sub(net_sub_, center_.net_sub_map_); + } else { + Sub(local_sub_, center_.local_sub_map_); + } +} + MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - subscribe_map_[topic].insert(dest); - } + node->Subscribe(head, msg, node); return MakeReply(eSuccess); }); } -MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) -{ - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto pos = node->subscriptions_.find(src); - auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end() && +void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) +{ + auto src = SrcAddr(head); + + auto Unsub = [&](auto &sub, auto &sub_map) { + auto pos = sub.find(src); + + auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { + auto pos = sub_map.find(topic); + if (pos != sub_map.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { - subscribe_map_.erase(pos); + sub_map.erase(pos); } }; - if (pos != node->subscriptions_.end()) { + if (pos != sub.end()) { const TopicDest &dest = {src, SrcAbsAddr(head), node}; auto &topics = msg.topics().topic_list(); // clear node sub records; @@ -569,26 +626,44 @@ RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { - node->subscriptions_.erase(pos); + sub.erase(pos); } } + }; + if (msg.network()) { + Unsub(net_sub_, center_.net_sub_map_); + } else { + Unsub(local_sub_, center_.local_sub_map_); + } +} + +MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) +{ + return HandleMsg(head, [&](Node node) { + node->Unsubscribe(head, msg, node); return MakeReply(eSuccess); }); } -NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) +NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { Clients dests; auto Find1 = [&](const std::string &exact) { - auto pos = subscribe_map_.find(exact); - if (pos != subscribe_map_.end()) { - auto &clients = pos->second; - for (auto &cli : clients) { - if (Valid(cli.weak_node_)) { - dests.insert(cli); + auto FindIn = [&](auto &sub_map) { + auto pos = sub_map.find(exact); + if (pos != sub_map.end()) { + auto &clients = pos->second; + for (auto &cli : clients) { + if (Valid(cli.weak_node_)) { + dests.insert(cli); + } } } + }; + if (!from_remote) { + FindIn(local_sub_map_); } + FindIn(net_sub_map_); }; Find1(topic); @@ -605,15 +680,31 @@ return dests; } -bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) +MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) { - bool ret = false; - HandleMsg(head, [&](Node node) { - DoFindClients(msg.topic()).swap(out); - ret = true; + return HandleMsg(head, [&](Node node) { + DoPublish(DefaultSender(node->shm_), topic, msg); return MakeReply(eSuccess); - }).Swap(&reply); - return ret; + }); +} + +void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) +{ + try { + auto clients = DoFindClients(topic, false); + if (clients.empty()) { return; } + + for (auto &cli : clients) { + auto node = cli.weak_node_.lock(); + if (node) { + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + } + } + } catch (...) { + LOG_ERROR() << "DoPublish error."; + } } void NodeCenter::OnTimer() @@ -659,7 +750,8 @@ } }; EraseMapRec(service_map_, node->services_); - EraseMapRec(subscribe_map_, node->subscriptions_); + EraseMapRec(local_sub_map_, node->local_sub_); + EraseMapRec(net_sub_map_, node->net_sub_); // remove online record. auto pos = online_node_addr_map_.find(node->proc_.proc_id()); @@ -681,10 +773,6 @@ void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) { try { - // LOG_DEBUG() << "center publish: " << topic << ": " << content; - Clients clients(DoFindClients(topic)); - if (clients.empty()) { return; } - MsgPublish pub; pub.set_topic(topic); pub.set_data(content); @@ -693,16 +781,16 @@ if (msg.Make(head, pub)) { DEFER1(msg.Release()); RecordMsg(msg); - - for (auto &cli : clients) { - auto node = cli.weak_node_.lock(); - if (node && node->state_.flag_ == kStateNormal) { - DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - } - } + DoPublish(DefaultSender(shm), topic, msg); } } catch (...) { LOG_ERROR() << "center publish error."; } +} + +std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) +{ + //TODO search synced full list; + return std::vector<std::string>(); } \ No newline at end of file diff --git a/box/node_center.h b/box/node_center.h index 74dd52f..54f84c0 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -82,6 +82,10 @@ }; typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; + struct NodeInfo; + typedef std::shared_ptr<NodeInfo> Node; + typedef std::weak_ptr<NodeInfo> WeakNode; + struct NodeInfo { NodeCenter ¢er_; SharedMemory &shm_; @@ -89,14 +93,15 @@ std::map<MQId, int64_t> addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics - AddressTopics subscriptions_; // address: topics + AddressTopics local_sub_; // address: topics + AddressTopics net_sub_; // address: topics NodeInfo(NodeCenter ¢er, SharedMemory &shm) : center_(center), shm_(shm) {} void PutOffline(const int64_t offline_time); void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); + void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); + void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); }; - typedef std::shared_ptr<NodeInfo> Node; - typedef std::weak_ptr<NodeInfo> WeakNode; struct TopicDest { MQId mq_id_; @@ -121,7 +126,9 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); + bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); + bool RemotePublish(BHMsgHead &head, const std::string &body_content); bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); @@ -176,15 +183,19 @@ MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); - Clients DoFindClients(const std::string &topic); - bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); + MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); void OnTimer(); + + // remote hosts records + std::vector<std::string> FindRemoteSubClients(const Topic &topic); private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); + void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); + Clients DoFindClients(const std::string &topic, bool from_remote); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { @@ -197,7 +208,8 @@ std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; - std::unordered_map<Topic, Clients> subscribe_map_; + std::unordered_map<Topic, Clients> local_sub_map_; + std::unordered_map<Topic, Clients> net_sub_map_; std::unordered_map<Address, Node> nodes_; std::unordered_map<ProcId, Address> online_node_addr_map_; ProcRecords procs_; // To get a short index for msg alloc. diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 85ed4ed..6506369 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -178,8 +178,17 @@ send_buffer_ = imsg.content(); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); }; + auto &scenter = *pscenter_; - if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { + if (head.type() == kMsgTypePublish) { + auto reply = MakeReply(eSuccess); + auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); + send_buffer_ = MsgI::Serialize(rep_head, reply); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + + scenter->RemotePublish(head, body_content); + return; + } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { return; } else { Close(); diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp index b4ec497..803874d 100644 --- a/box/tcp_proxy.cpp +++ b/box/tcp_proxy.cpp @@ -31,3 +31,8 @@ return false; } } + +bool TcpProxy::Publish(const std::string &ip, int port, std::string &&content) +{ + return Request(ip, port, std::move(content), ReplyCB()); +} \ No newline at end of file diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h index 69c3f03..09febe5 100644 --- a/box/tcp_proxy.h +++ b/box/tcp_proxy.h @@ -33,6 +33,7 @@ TcpProxy(io_service_t &io) : io_(io) {} bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb); + bool Publish(const std::string &ip, int port, std::string &&content); private: io_service_t &io_; diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index f34aebb..b1f9772 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -57,9 +57,11 @@ message MsgSubscribe { MsgTopicList topics = 1; + bool network = 2; } message MsgUnsubscribe { MsgTopicList topics = 1; + bool network = 2; } message MsgRegisterRPC { MsgTopicList topics = 1; diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto index 6a20aa7..0ef3451 100644 --- a/proto/source/bhome_msg_api.proto +++ b/proto/source/bhome_msg_api.proto @@ -84,7 +84,9 @@ message Info { ProcInfo proc = 1; bool online = 2; - MsgTopicList topics = 3; + MsgTopicList service = 3; + MsgTopicList local_sub = 4; + MsgTopicList net_sub = 5; } repeated Info proc_list = 2; } diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 5362318..6f98694 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -628,6 +628,7 @@ auto &sock = SockPub(); BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); AddRoute(head, sock); + head.set_topic(pub.topic()); if (timeout_ms == 0) { return sock.Send(BusAddr(), head, pub); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 5363d6e..239ea8b 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -256,14 +256,21 @@ printf("proc [%d] %s, %s, %s\n\ttopics\n", i, (info.online() ? "online" : "offline"), info.proc().proc_id().c_str(), info.proc().name().c_str()); - for (auto &t : info.topics().topic_list()) { - printf("\t\t %s\n", t.c_str()); - } + auto PrintTopics = [](std::string const &name, auto &topic_list) { + printf("%s:[", name.c_str()); + for (auto &t : topic_list) { + printf("%s,", t.c_str()); + } + printf("]\n"); + }; + PrintTopics("service", info.service().topic_list()); + PrintTopics("local_sub", info.local_sub().topic_list()); + PrintTopics("net_sub", info.net_sub().topic_list()); printf("\n"); } printf("\n"); }; - if (0) { + if (1) { // query procs std::string dest(BHAddress().SerializeAsString()); MsgQueryProc query; -- Gitblit v1.8.0