box/center.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/center_topic_node.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/node_center.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/node_center.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/tcp_connection.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/tcp_proxy.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/tcp_proxy.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
proto/source/bhome_msg.proto | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
proto/source/bhome_msg_api.proto | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/topic_node.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/api_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
box/center.cpp
@@ -135,27 +135,18 @@ auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; MsgCommonReply reply; if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { if (head.route_size() == 1 && msg.ParseBody(pub)) { // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); } else { replyer(MakeReply(eSuccess)); if (clients.empty()) { return; } for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } auto hosts = center->FindRemoteSubClients(pub.topic()); for (auto &host : hosts) { tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString()); } } }; box/center_topic_node.cpp
@@ -43,11 +43,17 @@ proc.put("name", info.proc().name()); proc.put("publicInfo", info.proc().public_info()); proc.put("online", info.online()); Json topics = Json::Array(); for (auto &t : info.topics().topic_list()) { topics.push_back(t); } proc.put("topics", topics); auto AddTopics = [&](auto &name, auto &topic_list) { Json topics = Json::Array(); for (auto &t : topic_list) { topics.push_back(t); } proc.put(name, topics); }; AddTopics("service", info.service().topic_list()); AddTopics("local_sub", info.local_sub().topic_list()); AddTopics("net_sub", info.net_sub().topic_list()); list.push_back(proc); } return json.dump(0); box/node_center.cpp
@@ -267,6 +267,43 @@ return sender.Send(dest, msg, head.msg_id(), std::move(cb)); } bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) { auto &topic = head.topic(); auto clients = DoFindClients(topic, true); if (clients.empty()) { return true; } std::vector<MsgI> msgs; auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; DEFER1(ReleaseAll();); for (auto &cli : clients) { auto Send1 = [&](Node node) { auto &shm = node->shm_; for (auto &msg : msgs) { if (msg.shm().name() == shm.name()) { DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); return; } } MsgI msg(shm); if (msg.Make(body_content)) { RecordMsg(msg); msgs.push_back(msg); DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } }; auto node = cli.weak_node_.lock(); if (node) { Send1(node); // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. } } return true; } bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) { Node node(GetNode(dest.id_)); @@ -469,11 +506,16 @@ *info->mutable_proc() = node->proc_; info->mutable_proc()->clear_private_info(); info->set_online(node->state_.flag_ == kStateNormal); for (auto &addr_topics : node->services_) { for (auto &topic : addr_topics.second) { info->mutable_topics()->add_topic_list(topic); auto AddTopics = [](auto &dst, auto &src) { for (auto &addr_topics : src) { for (auto &topic : addr_topics.second) { dst.add_topic_list(topic); } } } }; AddTopics(*info->mutable_service(), node->services_); AddTopics(*info->mutable_local_sub(), node->local_sub_); AddTopics(*info->mutable_net_sub(), node->net_sub_); }; if (!proc_id.empty()) { @@ -532,35 +574,50 @@ return HandleMsg<Reply>(head, query); } void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) { auto src = SrcAddr(head); auto Sub = [&](auto &sub, auto &sub_map) { auto &topics = msg.topics().topic_list(); sub[src].insert(topics.begin(), topics.end()); const TopicDest &dest = {src, SrcAbsAddr(head), node}; for (auto &topic : topics) { sub_map[topic].insert(dest); } }; LOG_DEBUG() << "subscribe net : " << msg.network(); if (msg.network()) { Sub(net_sub_, center_.net_sub_map_); } else { Sub(local_sub_, center_.local_sub_map_); } } MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { auto src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->subscriptions_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, SrcAbsAddr(head), node}; for (auto &topic : topics) { subscribe_map_[topic].insert(dest); } node->Subscribe(head, msg, node); return MakeReply(eSuccess); }); } MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) { return HandleMsg(head, [&](Node node) { auto src = SrcAddr(head); auto pos = node->subscriptions_.find(src); auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { auto pos = subscribe_map_.find(topic); if (pos != subscribe_map_.end() && void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) { auto src = SrcAddr(head); auto Unsub = [&](auto &sub, auto &sub_map) { auto pos = sub.find(src); auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { auto pos = sub_map.find(topic); if (pos != sub_map.end() && pos->second.erase(dest) != 0 && pos->second.empty()) { subscribe_map_.erase(pos); sub_map.erase(pos); } }; if (pos != node->subscriptions_.end()) { if (pos != sub.end()) { const TopicDest &dest = {src, SrcAbsAddr(head), node}; auto &topics = msg.topics().topic_list(); // clear node sub records; @@ -569,26 +626,44 @@ RemoveSubTopicDestRecord(topic, dest); } if (pos->second.empty()) { node->subscriptions_.erase(pos); sub.erase(pos); } } }; if (msg.network()) { Unsub(net_sub_, center_.net_sub_map_); } else { Unsub(local_sub_, center_.local_sub_map_); } } MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) { return HandleMsg(head, [&](Node node) { node->Unsubscribe(head, msg, node); return MakeReply(eSuccess); }); } NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) { Clients dests; auto Find1 = [&](const std::string &exact) { auto pos = subscribe_map_.find(exact); if (pos != subscribe_map_.end()) { auto &clients = pos->second; for (auto &cli : clients) { if (Valid(cli.weak_node_)) { dests.insert(cli); auto FindIn = [&](auto &sub_map) { auto pos = sub_map.find(exact); if (pos != sub_map.end()) { auto &clients = pos->second; for (auto &cli : clients) { if (Valid(cli.weak_node_)) { dests.insert(cli); } } } }; if (!from_remote) { FindIn(local_sub_map_); } FindIn(net_sub_map_); }; Find1(topic); @@ -605,15 +680,31 @@ return dests; } bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) { bool ret = false; HandleMsg(head, [&](Node node) { DoFindClients(msg.topic()).swap(out); ret = true; return HandleMsg(head, [&](Node node) { DoPublish(DefaultSender(node->shm_), topic, msg); return MakeReply(eSuccess); }).Swap(&reply); return ret; }); } void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) { try { auto clients = DoFindClients(topic, false); if (clients.empty()) { return; } for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } } } catch (...) { LOG_ERROR() << "DoPublish error."; } } void NodeCenter::OnTimer() @@ -659,7 +750,8 @@ } }; EraseMapRec(service_map_, node->services_); EraseMapRec(subscribe_map_, node->subscriptions_); EraseMapRec(local_sub_map_, node->local_sub_); EraseMapRec(net_sub_map_, node->net_sub_); // remove online record. auto pos = online_node_addr_map_.find(node->proc_.proc_id()); @@ -681,10 +773,6 @@ void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) { try { // LOG_DEBUG() << "center publish: " << topic << ": " << content; Clients clients(DoFindClients(topic)); if (clients.empty()) { return; } MsgPublish pub; pub.set_topic(topic); pub.set_data(content); @@ -693,16 +781,16 @@ if (msg.Make(head, pub)) { DEFER1(msg.Release()); RecordMsg(msg); for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node && node->state_.flag_ == kStateNormal) { DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } } DoPublish(DefaultSender(shm), topic, msg); } } catch (...) { LOG_ERROR() << "center publish error."; } } std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) { //TODO search synced full list; return std::vector<std::string>(); } box/node_center.h
@@ -82,6 +82,10 @@ }; typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; struct NodeInfo; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; struct NodeInfo { NodeCenter ¢er_; SharedMemory &shm_; @@ -89,14 +93,15 @@ std::map<MQId, int64_t> addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics AddressTopics subscriptions_; // address: topics AddressTopics local_sub_; // address: topics AddressTopics net_sub_; // address: topics NodeInfo(NodeCenter ¢er, SharedMemory &shm) : center_(center), shm_(shm) {} void PutOffline(const int64_t offline_time); void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); }; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; struct TopicDest { MQId mq_id_; @@ -121,7 +126,9 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); bool RemotePublish(BHMsgHead &head, const std::string &body_content); bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); @@ -176,15 +183,19 @@ MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); Clients DoFindClients(const std::string &topic); bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); void OnTimer(); // remote hosts records std::vector<std::string> FindRemoteSubClients(const Topic &topic); private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); Clients DoFindClients(const std::string &topic, bool from_remote); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { @@ -197,7 +208,8 @@ std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; std::unordered_map<Topic, Clients> local_sub_map_; std::unordered_map<Topic, Clients> net_sub_map_; std::unordered_map<Address, Node> nodes_; std::unordered_map<ProcId, Address> online_node_addr_map_; ProcRecords procs_; // To get a short index for msg alloc. box/tcp_connection.cpp
@@ -178,8 +178,17 @@ send_buffer_ = imsg.content(); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); }; auto &scenter = *pscenter_; if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { if (head.type() == kMsgTypePublish) { auto reply = MakeReply(eSuccess); auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); send_buffer_ = MsgI::Serialize(rep_head, reply); async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); scenter->RemotePublish(head, body_content); return; } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { return; } else { Close(); box/tcp_proxy.cpp
@@ -31,3 +31,8 @@ return false; } } bool TcpProxy::Publish(const std::string &ip, int port, std::string &&content) { return Request(ip, port, std::move(content), ReplyCB()); } box/tcp_proxy.h
@@ -33,6 +33,7 @@ TcpProxy(io_service_t &io) : io_(io) {} bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb); bool Publish(const std::string &ip, int port, std::string &&content); private: io_service_t &io_; proto/source/bhome_msg.proto
@@ -57,9 +57,11 @@ message MsgSubscribe { MsgTopicList topics = 1; bool network = 2; } message MsgUnsubscribe { MsgTopicList topics = 1; bool network = 2; } message MsgRegisterRPC { MsgTopicList topics = 1; proto/source/bhome_msg_api.proto
@@ -84,7 +84,9 @@ message Info { ProcInfo proc = 1; bool online = 2; MsgTopicList topics = 3; MsgTopicList service = 3; MsgTopicList local_sub = 4; MsgTopicList net_sub = 5; } repeated Info proc_list = 2; } src/topic_node.cpp
@@ -628,6 +628,7 @@ auto &sock = SockPub(); BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); AddRoute(head, sock); head.set_topic(pub.topic()); if (timeout_ms == 0) { return sock.Send(BusAddr(), head, pub); utest/api_test.cpp
@@ -256,14 +256,21 @@ printf("proc [%d] %s, %s, %s\n\ttopics\n", i, (info.online() ? "online" : "offline"), info.proc().proc_id().c_str(), info.proc().name().c_str()); for (auto &t : info.topics().topic_list()) { printf("\t\t %s\n", t.c_str()); } auto PrintTopics = [](std::string const &name, auto &topic_list) { printf("%s:[", name.c_str()); for (auto &t : topic_list) { printf("%s,", t.c_str()); } printf("]\n"); }; PrintTopics("service", info.service().topic_list()); PrintTopics("local_sub", info.local_sub().topic_list()); PrintTopics("net_sub", info.net_sub().topic_list()); printf("\n"); } printf("\n"); }; if (0) { if (1) { // query procs std::string dest(BHAddress().SerializeAsString()); MsgQueryProc query;