From 365c864a587365fe443b11cc0cd7cfc8f8f8eb81 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 01 六月 2021 11:19:22 +0800
Subject: [PATCH] refactor, clean up useless code.
---
box/center.cpp | 66 +++++++++++++++++++++++++-------
1 files changed, 51 insertions(+), 15 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 3f565b1..0fdfa33 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -17,7 +17,10 @@
*/
#include "center.h"
#include "center_topic_node.h"
+#include "io_service.h"
#include "node_center.h"
+#include "tcp_proxy.h"
+#include "tcp_server.h"
#include <chrono>
using namespace std::chrono;
@@ -65,7 +68,7 @@
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
- MsgI msg;
+ MsgI msg(socket.shm());
if (msg.Make(reply_head, rep_body)) {
DEFER1(msg.Release(););
center->SendAllocMsg(socket, remote, msg);
@@ -73,7 +76,7 @@
};
}
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
{
// command
auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -87,13 +90,45 @@
auto onInit = [&](const int64_t request) {
return center->OnNodeInit(socket, request);
};
- BHCenterHandleInit(onInit);
+ BHCenterHandleInit(socket.shm(), onInit);
center->OnTimer();
};
- auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
auto replyer = MakeReplyer(socket, head, center);
+
+ if (!head.dest().ip().empty()) { // other host, proxy
+ auto valid = [&]() { return head.route_size() == 1; };
+ if (!valid()) { return false; }
+
+ if (head.type() == kMsgTypeRequestTopic) {
+ typedef MsgRequestTopicReply Reply;
+ Reply reply;
+ if (!center->CheckMsg(head, reply)) {
+ replyer(reply);
+ } else {
+ auto onResult = [¢er](BHMsgHead &head, std::string body_content) {
+ if (head.route_size() > 0) {
+ auto &back = head.route(head.route_size() - 1);
+ MQInfo dest = {back.mq_id(), back.abs_addr()};
+ head.mutable_route()->RemoveLast();
+ center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
+ }
+ };
+ if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) {
+ replyer(MakeReply<Reply>(eError, "send request failed."));
+ } else {
+ // success
+ }
+ }
+ return true;
+ } else {
+ // ignore other msgs for now.
+ }
+ return false;
+ }
+
switch (head.type()) {
CASE_ON_MSG_TYPE(ProcInit);
CASE_ON_MSG_TYPE(Register);
@@ -106,7 +141,7 @@
default: return false;
}
};
- BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
+ BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
auto OnBusIdle = [=](ShmSocket &socket) {};
auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -142,7 +177,7 @@
}
};
- BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
+ BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
return true;
}
@@ -165,16 +200,12 @@
BHCenter::BHCenter(Socket::Shm &shm)
{
- auto gc = [&](const MQId id) {
- auto r = ShmSocket::Remove(shm, id);
- if (r) {
- LOG_DEBUG() << "remove mq " << id << " ok\n";
- }
- };
-
auto nsec = NodeTimeoutSec();
- auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
- AddCenter(center_ptr);
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
+ io_service_.reset(new IoService);
+ tcp_proxy_.reset(new TcpProxy(io_service_->io()));
+
+ AddCenter(center_ptr, shm, *tcp_proxy_);
for (auto &kv : Centers()) {
auto &info = kv.second;
@@ -182,7 +213,9 @@
}
topic_node_.reset(new CenterTopicNode(center_ptr, shm));
+ tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
}
+
BHCenter::~BHCenter() { Stop(); }
bool BHCenter::Start()
@@ -197,6 +230,9 @@
bool BHCenter::Stop()
{
+ tcp_proxy_.reset();
+ tcp_server_.reset();
+ io_service_.reset();
topic_node_->Stop();
for (auto &kv : sockets_) {
kv.second->Stop();
--
Gitblit v1.8.0