From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 23 六月 2021 19:43:29 +0800 Subject: [PATCH] refactor, start tcp pub/sub. --- box/center.cpp | 23 +++++++---------------- 1 files changed, 7 insertions(+), 16 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0e4c40b..78135d1 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -135,27 +135,18 @@ auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; - auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; - NodeCenter::Clients clients; - MsgCommonReply reply; - if (head.route_size() != 1 || !msg.ParseBody(pub)) { - return; - } else if (!center->FindClients(head, pub, clients, reply)) { + if (head.route_size() == 1 && msg.ParseBody(pub)) { + // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? + auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); - } else { - replyer(MakeReply(eSuccess)); - if (clients.empty()) { return; } - for (auto &cli : clients) { - auto node = cli.weak_node_.lock(); - if (node) { - // should also make sure that mq is not killed before msg expires. - // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - } + auto hosts = center->FindRemoteSubClients(pub.topic()); + for (auto &host : hosts) { + tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString()); } } }; -- Gitblit v1.8.0