From d4a1e59e1dac399a1e0117fc5184732507b212c6 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期四, 24 六月 2021 18:15:26 +0800
Subject: [PATCH] rebuild bhome_msg.proto,bhome_msg_api.proto
---
box/center.cpp | 23 +++++++----------------
1 files changed, 7 insertions(+), 16 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 0e4c40b..78135d1 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -135,27 +135,18 @@
auto OnBusIdle = [=](ShmSocket &socket) {};
auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
- auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
auto replyer = MakeReplyer(socket, head, center);
auto OnPublish = [&]() {
MsgPublish pub;
- NodeCenter::Clients clients;
- MsgCommonReply reply;
- if (head.route_size() != 1 || !msg.ParseBody(pub)) {
- return;
- } else if (!center->FindClients(head, pub, clients, reply)) {
+ if (head.route_size() == 1 && msg.ParseBody(pub)) {
+ // replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
+ auto reply(center->Publish(head, pub.topic(), msg));
replyer(reply);
- } else {
- replyer(MakeReply(eSuccess));
- if (clients.empty()) { return; }
- for (auto &cli : clients) {
- auto node = cli.weak_node_.lock();
- if (node) {
- // should also make sure that mq is not killed before msg expires.
- // it would be ok if (kill_time - offline_time) is longer than expire time.
- socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
- }
+ auto hosts = center->FindRemoteSubClients(pub.topic());
+ for (auto &host : hosts) {
+ tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
}
}
};
--
Gitblit v1.8.0