From 903b27f875e5f2a872c1b309f354b18c0450f35a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 04 六月 2021 11:46:18 +0800
Subject: [PATCH] allow tcp request with no dest, auto query topic.
---
box/tcp_connection.cpp | 22 ++++------
utest/tcp_test.cpp | 7 ++-
box/node_center.h | 2
box/node_center.cpp | 33 ++++++++++++++--
4 files changed, 43 insertions(+), 21 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index b285c9f..5c24409 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -222,12 +222,37 @@
return node;
}
-bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
{
- Node node(GetNode(dest.id_));
- if (!node || !Valid(*node)) {
- LOG_ERROR() << id() << " pass remote request, dest not found.";
+ Node node;
+
+ auto FindDest = [&]() {
+ auto pos = service_map_.find(head.topic());
+ if (pos != service_map_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ node = cli.weak_node_.lock();
+ if (node && Valid(*node)) {
+ dest.id_ = cli.mq_id_;
+ dest.offset_ = cli.mq_abs_addr_;
+ return true;
+ }
+ }
+ }
return false;
+ };
+
+ if (dest.id_ == 0) {
+ if (!FindDest()) {
+ LOG_ERROR() << id() << " pass remote request, topic dest not found.";
+ return false;
+ }
+ } else {
+ node = GetNode(dest.id_);
+ if (!node || !Valid(*node)) {
+ LOG_ERROR() << id() << " pass remote request, dest not found.";
+ return false;
+ }
}
ShmSocket &sender(DefaultSender(node->shm_));
diff --git a/box/node_center.h b/box/node_center.h
index 461a354..74dd52f 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -121,7 +121,7 @@
void RecordMsg(const MsgI &msg);
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
- bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+ bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
void OnAlloc(ShmSocket &socket, const int64_t val);
void OnFree(ShmSocket &socket, const int64_t val);
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 02001bb..85ed4ed 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -173,21 +173,17 @@
if (recv_done) {
LOG_TRACE() << "tcp server recv request data, size: " << size;
MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
- if (remote.id_ && remote.offset_) {
- auto self(shared_from_this());
- auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
- send_buffer_ = imsg.content();
- async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
- };
- auto &scenter = *pscenter_;
- if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
- return;
- }
+ auto self(shared_from_this());
+ auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ send_buffer_ = imsg.content();
+ async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+ };
+ auto &scenter = *pscenter_;
+ if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+ return;
} else {
- LOG_DEBUG() << "no address";
+ Close();
}
- Close();
-
} else { // not complete, read again
LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index 2aead7e..0ead665 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -49,11 +49,12 @@
auto route = head.add_route();
route->set_mq_id(12345);
route->set_abs_addr(67890);
+ head.set_topic(req.topic());
head.mutable_dest()->set_ip(connect_addr);
- head.mutable_dest()->set_port(port);
- head.mutable_dest()->set_mq_id(201);
- head.mutable_dest()->set_abs_addr(10072);
+ // head.mutable_dest()->set_port(port);
+ // head.mutable_dest()->set_mq_id(201);
+ // head.mutable_dest()->set_abs_addr(10072);
return (MsgI::Serialize(head, req));
};
--
Gitblit v1.8.0