From 056f71f24cefaf88f2a93714c6678c03ed5f1e0e Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 02 七月 2021 16:54:33 +0800
Subject: [PATCH] fixed to adapt gcc-5.4 & glibc-2.25
---
box/center.cpp | 91 +++++++++++++++++++++++++++------------------
1 files changed, 55 insertions(+), 36 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 8d24315..e0abbb3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -17,7 +17,9 @@
*/
#include "center.h"
#include "center_topic_node.h"
+#include "io_service.h"
#include "node_center.h"
+#include "tcp_proxy.h"
#include "tcp_server.h"
#include <chrono>
@@ -31,8 +33,6 @@
namespace
{
-//TODO check proc_id
-
template <class Body, class OnMsg, class Replyer>
inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
{
@@ -41,18 +41,6 @@
if (msg.ParseBody(body)) {
replyer(onmsg(body));
}
-}
-
-Handler Combine(const Handler &h1, const Handler &h2)
-{
- return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
- return h1(socket, msg, head) || h2(socket, msg, head);
- };
-}
-template <class... H>
-Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
-{
- return Combine(Combine(h0, h1), h2, rest...);
}
#define CASE_ON_MSG_TYPE(MsgTag) \
@@ -74,7 +62,7 @@
};
}
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
{
// command
auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -92,9 +80,45 @@
center->OnTimer();
};
- auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
auto replyer = MakeReplyer(socket, head, center);
+
+ if (!head.dest().ip().empty()) { // other host, proxy
+ auto valid = [&]() { return head.route_size() == 1; };
+ if (!valid()) { return false; }
+
+ if (head.type() == kMsgTypeRequestTopic) {
+ typedef MsgRequestTopicReply Reply;
+ Reply reply;
+ if (!center->CheckMsg(head, reply)) {
+ replyer(reply);
+ } else {
+ auto onResult = [¢er](BHMsgHead &head, std::string body_content) {
+ if (head.route_size() > 0) {
+ auto &back = head.route(head.route_size() - 1);
+ MQInfo dest = {back.mq_id(), back.abs_addr()};
+ head.mutable_route()->RemoveLast();
+ center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
+ }
+ };
+ uint16_t port = head.dest().port();
+ if (port == 0) {
+ port = kBHCenterPort;
+ }
+ if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) {
+ replyer(MakeReply<Reply>(eError, "send request failed."));
+ } else {
+ // success
+ }
+ }
+ return true;
+ } else {
+ // ignore other msgs for now.
+ }
+ return false;
+ }
+
switch (head.type()) {
CASE_ON_MSG_TYPE(ProcInit);
CASE_ON_MSG_TYPE(Register);
@@ -111,27 +135,18 @@
auto OnBusIdle = [=](ShmSocket &socket) {};
auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
- auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
auto replyer = MakeReplyer(socket, head, center);
auto OnPublish = [&]() {
MsgPublish pub;
- NodeCenter::Clients clients;
- MsgCommonReply reply;
- if (head.route_size() != 1 || !msg.ParseBody(pub)) {
- return;
- } else if (!center->FindClients(head, pub, clients, reply)) {
+ if (head.route_size() == 1 && msg.ParseBody(pub)) {
+ // replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
+ auto reply(center->Publish(head, pub.topic(), msg));
replyer(reply);
- } else {
- replyer(MakeReply(eSuccess));
- if (clients.empty()) { return; }
- for (auto &cli : clients) {
- auto node = cli.weak_node_.lock();
- if (node) {
- // should also make sure that mq is not killed before msg expires.
- // it would be ok if (kill_time - offline_time) is longer than expire time.
- socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
- }
+ auto hosts = center->FindRemoteSubClients(pub.topic());
+ for (auto &host : hosts) {
+ tcp_proxy.Publish(host, kBHCenterPort, msg.content());
}
}
};
@@ -168,7 +183,10 @@
{
auto nsec = NodeTimeoutSec();
auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
- AddCenter(center_ptr, shm);
+ io_service_.reset(new IoService);
+ tcp_proxy_.reset(new TcpProxy(io_service_->io()));
+
+ AddCenter(center_ptr, shm, *tcp_proxy_);
for (auto &kv : Centers()) {
auto &info = kv.second;
@@ -176,7 +194,7 @@
}
topic_node_.reset(new CenterTopicNode(center_ptr, shm));
- tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
+ tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
@@ -188,13 +206,14 @@
sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
}
topic_node_->Start();
- tcp_server_->Start();
return true;
}
bool BHCenter::Stop()
{
- tcp_server_->Stop();
+ tcp_proxy_.reset();
+ tcp_server_.reset();
+ io_service_.reset();
topic_node_->Stop();
for (auto &kv : sockets_) {
kv.second->Stop();
--
Gitblit v1.8.0