From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 28 五月 2021 17:18:33 +0800
Subject: [PATCH] tcp proxy requests, need more test.
---
box/tcp_connection.cpp | 91 ++++------
box/center_topic_node.cpp | 2
box/center.cpp | 50 ++++
box/io_service.h | 10
box/tcp_proxy.cpp | 56 +-----
box/center.h | 5
box/node_center.cpp | 57 ++++-
src/topic_node.cpp | 54 +++--
src/msg.cpp | 3
box/io_service.cpp | 33 --
src/shm_socket.cpp | 3
utest/api_test.cpp | 17 +
box/tcp_common.h | 8
box/tcp_proxy.h | 24 +-
box/tcp_server.h | 10
utest/tcp_test.cpp | 61 ++++--
box/node_center.h | 13 +
box/tcp_server.cpp | 23 -
box/tcp_connection.h | 16 +
19 files changed, 294 insertions(+), 242 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 8d24315..0fdfa33 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -17,7 +17,9 @@
*/
#include "center.h"
#include "center_topic_node.h"
+#include "io_service.h"
#include "node_center.h"
+#include "tcp_proxy.h"
#include "tcp_server.h"
#include <chrono>
@@ -74,7 +76,7 @@
};
}
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
{
// command
auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -92,9 +94,41 @@
center->OnTimer();
};
- auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
auto replyer = MakeReplyer(socket, head, center);
+
+ if (!head.dest().ip().empty()) { // other host, proxy
+ auto valid = [&]() { return head.route_size() == 1; };
+ if (!valid()) { return false; }
+
+ if (head.type() == kMsgTypeRequestTopic) {
+ typedef MsgRequestTopicReply Reply;
+ Reply reply;
+ if (!center->CheckMsg(head, reply)) {
+ replyer(reply);
+ } else {
+ auto onResult = [¢er](BHMsgHead &head, std::string body_content) {
+ if (head.route_size() > 0) {
+ auto &back = head.route(head.route_size() - 1);
+ MQInfo dest = {back.mq_id(), back.abs_addr()};
+ head.mutable_route()->RemoveLast();
+ center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
+ }
+ };
+ if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) {
+ replyer(MakeReply<Reply>(eError, "send request failed."));
+ } else {
+ // success
+ }
+ }
+ return true;
+ } else {
+ // ignore other msgs for now.
+ }
+ return false;
+ }
+
switch (head.type()) {
CASE_ON_MSG_TYPE(ProcInit);
CASE_ON_MSG_TYPE(Register);
@@ -168,7 +202,10 @@
{
auto nsec = NodeTimeoutSec();
auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
- AddCenter(center_ptr, shm);
+ io_service_.reset(new IoService);
+ tcp_proxy_.reset(new TcpProxy(io_service_->io()));
+
+ AddCenter(center_ptr, shm, *tcp_proxy_);
for (auto &kv : Centers()) {
auto &info = kv.second;
@@ -176,7 +213,7 @@
}
topic_node_.reset(new CenterTopicNode(center_ptr, shm));
- tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
+ tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
@@ -188,13 +225,14 @@
sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
}
topic_node_->Start();
- tcp_server_->Start();
return true;
}
bool BHCenter::Stop()
{
- tcp_server_->Stop();
+ tcp_proxy_.reset();
+ tcp_server_.reset();
+ io_service_.reset();
topic_node_->Stop();
for (auto &kv : sockets_) {
kv.second->Stop();
diff --git a/box/center.h b/box/center.h
index c4aa1ac..8850db4 100644
--- a/box/center.h
+++ b/box/center.h
@@ -24,6 +24,8 @@
#include <memory>
class CenterTopicNode;
class TcpServer;
+class TcpProxy;
+class IoService;
class BHCenter
{
@@ -54,7 +56,10 @@
std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
std::unique_ptr<CenterTopicNode> topic_node_;
+
+ std::unique_ptr<IoService> io_service_;
std::unique_ptr<TcpServer> tcp_server_;
+ std::unique_ptr<TcpProxy> tcp_proxy_;
};
#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 5c8df7a..749f4e6 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -106,7 +106,7 @@
*reply.mutable_errmsg() = data.errmsg();
reply.set_data(ToJson(data));
} else {
- SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic());
+ SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic());
}
pnode_->ServerSendReply(src_info, reply);
};
diff --git a/box/io_service.cpp b/box/io_service.cpp
index 1d531e0..5640d50 100644
--- a/box/io_service.cpp
+++ b/box/io_service.cpp
@@ -16,33 +16,16 @@
* =====================================================================================
*/
#include "io_service.h"
-#include <chrono>
-using namespace std::chrono_literals;
-bool IoService::Start()
+IoService::IoService() :
+ guard_(io_.get_executor())
{
- Stop();
- bool cur = false;
- if (!run_.compare_exchange_strong(cur, true)) {
- return false;
- }
-
- auto proc = [this]() {
- while (run_) {
- io_.run_one_for(100ms);
- }
- OnStop();
- };
- std::thread(proc).swap(worker_);
- return true;
+ std::thread([this]() { io_.run(); }).swap(worker_);
}
-
-void IoService::Stop()
+IoService::~IoService()
{
- bool cur = true;
- if (run_.compare_exchange_strong(cur, false)) {
- if (worker_.joinable()) {
- worker_.join();
- }
- }
+ guard_.reset();
+ io_.stop(); // normally not needed, but make sure run() exits.
+ if (worker_.joinable())
+ worker_.join();
}
diff --git a/box/io_service.h b/box/io_service.h
index 000facc..f492e71 100644
--- a/box/io_service.h
+++ b/box/io_service.h
@@ -24,19 +24,17 @@
class IoService
{
public:
- IoService() :
- run_(false) {}
- bool Start();
- void Stop();
+ IoService();
+ ~IoService();
typedef boost::asio::io_context io_service_t;
io_service_t &io() { return io_; }
private:
- virtual void OnStop() {}
io_service_t io_;
+ typedef boost::asio::executor_work_guard<io_service_t::executor_type> guard_t;
+ guard_t guard_;
std::thread worker_;
- std::atomic<bool> run_;
};
#endif // end of include guard: IO_SERVICE_ODKKJG3D
diff --git a/box/node_center.cpp b/box/node_center.cpp
index ff199b2..662b2c0 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -70,8 +70,9 @@
return;
}
// LOG_FUNCTION;
+ const size_t total = msgs_.size();
time_to_clean_ = now + 1;
- int64_t limit = std::max(10000ul, msgs_.size() / 10);
+ int64_t limit = std::max(10000ul, total / 10);
int64_t n = 0;
auto it = msgs_.begin();
while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
++n;
};
int n = now - msg.timestamp();
- if (n < 10) {
+ if (msg.Count() == 0) {
+ Free();
+ } else if (n > NodeTimeoutSec()) {
+ Free();
+ } else {
++it;
- } else if (msg.Count() == 0) {
- Free();
- } else if (n > 60) {
- Free();
}
}
if (n > 0) {
- LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+ LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
}
}
@@ -209,17 +210,25 @@
RecordMsg(msg);
return socket.Send(dest, msg);
}
-bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
{
- auto ssn = dest.id_ - (dest.id_ % 10);
- LOG_DEBUG() << "prox ssn " << ssn;
+ Node node;
+ auto ssn = mq_id - (mq_id % 10);
auto pos = nodes_.find(ssn);
- if (pos == nodes_.end()) {
- LOG_ERROR() << "proxy msg, ssn not found.";
+ if (pos != nodes_.end()) {
+ node = pos->second;
+ }
+ return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+ Node node(GetNode(dest.id_));
+ if (!node || !Valid(*node)) {
+ LOG_ERROR() << id() << " pass remote request, dest not found.";
return false;
}
- auto &node = pos->second;
- if (!Valid(*node)) { return false; }
ShmSocket &sender(DefaultSender(node->shm_));
auto route = head.add_route();
@@ -233,6 +242,26 @@
return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+ Node node(GetNode(dest.id_));
+ if (!node) {
+ LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+ return false;
+ }
+ auto offset = node->addrs_[dest.id_];
+ if (offset != dest.offset_) {
+ LOG_ERROR() << id() << " pass remote reply, dest address not match";
+ return false;
+ }
+
+ ShmMsg msg(node->shm_);
+ if (!msg.Make(head, body_content)) { return false; }
+ DEFER1(msg.Release(););
+ RecordMsg(msg);
+ return DefaultSender(node->shm_).Send(dest, msg);
+}
+
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
{
// LOG_FUNCTION;
diff --git a/box/node_center.h b/box/node_center.h
index 1c79809..8bc2cf8 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -122,7 +122,8 @@
void RecordMsg(const MsgI &msg);
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
- bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+ bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+ bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
void OnAlloc(ShmSocket &socket, const int64_t val);
void OnFree(ShmSocket &socket, const int64_t val);
bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -159,6 +160,14 @@
{
return HandleMsg<MsgCommonReply, Func>(head, op);
}
+ template <class Reply>
+ bool CheckMsg(const BHMsgHead &head, Reply &reply)
+ {
+ bool r = false;
+ auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
+ reply = HandleMsg<Reply>(head, onOk);
+ return r;
+ }
MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
@@ -184,6 +193,8 @@
return node && Valid(*node);
}
void RemoveNode(Node &node);
+ Node GetNode(const MQId mq);
+
std::string id_; // center proc id;
std::unordered_map<Topic, Clients> service_map_;
diff --git a/box/tcp_common.h b/box/tcp_common.h
index 8c8b7ec..3d2b133 100644
--- a/box/tcp_common.h
+++ b/box/tcp_common.h
@@ -21,10 +21,18 @@
#include <boost/asio.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
+#include <functional>
+#include <string>
+
namespace ip = boost::asio::ip;
using boost::asio::ip::tcp;
typedef boost::system::error_code bserror_t;
const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39");
+namespace bhome_msg
+{
+class BHMsgHead;
+}
+typedef std::function<void(bhome_msg::BHMsgHead &head, std::string body_content)> ReplyCB;
#endif // end of include guard: TCP_COMMON_8S8O7OV
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 22162e5..02001bb 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -19,6 +19,7 @@
#include "log.h"
#include "msg.h"
#include "node_center.h"
+#include "proto.h"
#include "shm_socket.h"
namespace
@@ -59,10 +60,8 @@
bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
{
const char *p = buffer.data();
- LOG_DEBUG() << "msg len " << len;
if (4 > len) { return false; }
uint32_t head_len = Get32(p);
- LOG_DEBUG() << "head_len " << head_len;
if (head_len > 1024 * 4) {
throw std::runtime_error("unexpected tcp reply data.");
}
@@ -87,15 +86,34 @@
/// request -----------------------------------------------------------
+void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
+{
+ if (reply_cb_) {
+ reply_cb_(head, std::move(body_content));
+ }
+}
+
void TcpRequest1::OnError(bserror_t ec)
{
- LOG_ERROR() << "tcp client error: " << ec;
+ // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
+ BHMsgHead head;
+ std::string body_content;
+ try {
+ std::vector<char> req(request_.begin(), request_.end());
+ if (CheckData(req, req.size(), head, body_content)) {
+ if (head.type() == kMsgTypeRequestTopic) {
+ SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
+ }
+ }
+ } catch (std::exception &e) {
+ }
Close();
}
void TcpRequest1::Start()
{
auto readReply = [this]() {
+ // if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
recv_buffer_.resize(1000);
recv_len_ = 0;
socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
@@ -104,47 +122,22 @@
socket_.async_connect(remote_, TcpCB(*this, request));
}
-void TcpRequest1::Close()
-{
- LOG_DEBUG() << "client close";
- socket_.close();
-}
+void TcpRequest1::Close() { socket_.close(); }
void TcpRequest1::OnRead(size_t size)
{
- LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
-
recv_len_ += size;
BHMsgHead head;
std::string body_content;
- bool recv_done = false;
try {
- recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+ if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
+ Close();
+ SendReply(head, std::move(body_content));
+ } else { // not complete, read again
+ socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+ }
} catch (std::exception &e) {
LOG_ERROR() << e.what();
Close();
- return;
- }
-
- if (recv_done) {
- // just pass to client, no check, client will check it anyway.
- LOG_DEBUG() << "route size: " << head.route_size();
- if (head.route_size() < 1) { return; }
- auto &back = head.route(head.route_size() - 1);
- MQInfo dest = {back.mq_id(), back.abs_addr()};
- head.mutable_route()->RemoveLast();
-
- LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
- MsgRequestTopicReply reply;
- if (reply.ParseFromString(body_content)) {
- LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
- LOG_DEBUG() << "content : " << reply.data();
- }
- Close();
- return;
- shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
- } else { // read again
- LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
- socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
}
}
@@ -153,7 +146,7 @@
void TcpReply1::OnError(bserror_t ec) { Close(); }
void TcpReply1::Close()
{
- LOG_DEBUG() << "server close.";
+ LOG_TRACE() << "server close.";
socket_.close();
}
@@ -177,38 +170,26 @@
return;
}
- auto ParseBody = [&](auto &req) {
- const char *p = recv_buffer_.data();
- uint32_t size = Get32(p);
- p += 4;
- p += size;
- size = Get32(p);
- p += 4;
- return req.ParseFromArray(p, size);
- };
-
if (recv_done) {
- LOG_DEBUG() << "request data: " << size;
- auto self(shared_from_this());
+ LOG_TRACE() << "tcp server recv request data, size: " << size;
MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
if (remote.id_ && remote.offset_) {
+ auto self(shared_from_this());
auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
send_buffer_ = imsg.content();
async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
};
auto &scenter = *pscenter_;
- if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
- send_buffer_ = "fake reply";
- async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+ if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+ return;
}
} else {
LOG_DEBUG() << "no address";
- send_buffer_ = "no address";
- async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
}
+ Close();
- } else { // read again
- LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+ } else { // not complete, read again
+ LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
}
};
\ No newline at end of file
diff --git a/box/tcp_connection.h b/box/tcp_connection.h
index 5aa93a4..6e1d0ad 100644
--- a/box/tcp_connection.h
+++ b/box/tcp_connection.h
@@ -25,25 +25,28 @@
#include <memory>
class ShmSocket;
+class NodeCenter;
+typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+
class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
{
public:
- static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket)
+ static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb)
{
- std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start();
+ std::make_shared<TcpRequest1>(io, addr, std::move(request), cb)->Start();
}
-
- TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) :
- socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {}
+ TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) :
+ socket_(io), reply_cb_(cb), remote_(addr), request_(std::move(request)) {}
void OnError(bserror_t ec);
private:
void Start();
void Close();
void OnRead(size_t size);
+ void SendReply(BHMsgHead &head, std::string body_content);
tcp::socket socket_;
- ShmSocket &shm_socket_; // send reply
+ ReplyCB reply_cb_;
tcp::endpoint remote_;
std::string request_;
std::vector<char> recv_buffer_;
@@ -54,7 +57,6 @@
class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
{
public:
- typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
static void Create(tcp::socket sock, CenterPtr pscenter)
{
std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start();
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
index 298c6b6..b4ec497 100644
--- a/box/tcp_proxy.cpp
+++ b/box/tcp_proxy.cpp
@@ -16,54 +16,18 @@
* =====================================================================================
*/
#include "tcp_proxy.h"
-#include "defs.h"
-#include "shm_socket.h"
#include "tcp_connection.h"
-TcpProxy::TcpProxy() :
- run_(false) {}
-
-TcpProxy::~TcpProxy() {}
-
-bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
-{
- Stop();
- bool cur = false;
- if (!run_.compare_exchange_strong(cur, true)) { return false; }
-
- auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
- local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
- auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
- auto &dest = head.dest();
- if (dest.ip().empty() || dest.port() == 0) { return; }
- Request(dest.ip(), dest.port(), msg.content());
- };
- local_->Start(1, localProc);
-
- auto proxyProc = [this]() {
- while (run_) {
- io_context_.run_one_for(std::chrono::milliseconds(100));
- }
- };
- std::thread(proxyProc).swap(worker_);
- return true;
-}
-
-void TcpProxy::Stop()
-{
- bool cur = true;
- if (run_.compare_exchange_strong(cur, false)) {
- if (worker_.joinable()) {
- worker_.join();
- }
- local_.reset();
- }
-}
-
-bool TcpProxy::Request(const std::string &ip, int port, std::string &&content)
+bool TcpProxy::Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb)
{
if (content.empty()) { return false; }
-
- tcp::endpoint dest(ip::address::from_string(ip), port);
- TcpRequest1::Create(io_context_, dest, std::move(content), *local_);
+ try {
+ tcp::endpoint dest(ip::address::from_string(ip), port);
+ TcpRequest1::Create(io_, dest, std::move(content), cb);
+ LOG_TRACE() << "tcp request start " << ip << ':' << port;
+ return true;
+ } catch (std::exception &e) {
+ LOG_ERROR() << "proxy request exception: " << e.what();
+ return false;
+ }
}
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
index 9c74532..69c3f03 100644
--- a/box/tcp_proxy.h
+++ b/box/tcp_proxy.h
@@ -18,28 +18,24 @@
#ifndef TCP_PROXY_E1YJ92U5
#define TCP_PROXY_E1YJ92U5
-#include "shm.h"
+#include "bh_util.h"
+#include "io_service.h"
#include "tcp_common.h"
-#include <atomic>
-#include <thread>
+#include <memory>
-class ShmSocket;
+class NodeCenter;
+typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
class TcpProxy
{
public:
- TcpProxy();
- ~TcpProxy();
- bool Start(bhome_shm::SharedMemory &shm);
- void Stop();
+ typedef IoService::io_service_t io_service_t;
+ TcpProxy(io_service_t &io) :
+ io_(io) {}
+ bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb);
private:
- bool Request(const std::string &ip, int port, std::string &&content);
- std::unique_ptr<ShmSocket> local_;
-
- boost::asio::io_context io_context_;
- std::thread worker_;
- std::atomic<bool> run_;
+ io_service_t &io_;
};
#endif // end of include guard: TCP_PROXY_E1YJ92U5
diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp
index ea23106..5cd8743 100644
--- a/box/tcp_server.cpp
+++ b/box/tcp_server.cpp
@@ -23,26 +23,19 @@
using namespace std::chrono_literals;
-TcpServer::TcpServer(int port, CenterPtr pscenter) :
- listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter)
-{
- Accept();
-}
-
-TcpServer::~TcpServer() { Stop(); }
-
-void TcpServer::OnStop()
-{
- listener_.close();
-}
-
void TcpServer::Accept()
{
listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
if (!ec) {
- LOG_INFO() << "server accept client";
+ LOG_TRACE() << "server accept client";
TcpReply1::Create(std::move(sock), pscenter_);
+ Accept();
+ } else {
+ // this is already destructed by now.
+ if (ec.value() != ECANCELED) {
+ LOG_WARNING() << "tcp server accept error: " << ec;
+ Accept();
+ }
}
- Accept();
});
}
\ No newline at end of file
diff --git a/box/tcp_server.h b/box/tcp_server.h
index 2c9337c..4698196 100644
--- a/box/tcp_server.h
+++ b/box/tcp_server.h
@@ -23,15 +23,17 @@
#include "tcp_common.h"
class NodeCenter;
-class TcpServer : public IoService
+class TcpServer
{
public:
+ typedef IoService::io_service_t io_service_t;
typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
- TcpServer(int port, CenterPtr pscenter);
- ~TcpServer();
+ TcpServer(io_service_t &io, int port, CenterPtr pscenter) :
+ io_(io), listener_(io_, tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) { Accept(); }
+ ~TcpServer() { listener_.close(); }
private:
- virtual void OnStop();
+ io_service_t &io_;
void Accept();
tcp::acceptor listener_;
CenterPtr pscenter_;
diff --git a/src/msg.cpp b/src/msg.cpp
index 40a7b0d..3546424 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -37,7 +37,8 @@
Free();
}
} else if (n < 0) {
- LOG_FATAL() << "error double release data.";
+ // ns_log::GetTrace();
+ LOG_FATAL() << "double release msg.";
throw std::runtime_error("double release msg.");
}
return n;
diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp
index d54168d..2bdccc2 100644
--- a/src/shm_socket.cpp
+++ b/src/shm_socket.cpp
@@ -171,8 +171,7 @@
RecvCB cb_no_use;
per_msg_cbs_->Pick(msg_id, cb_no_use);
};
- SendImpl(remote, msg, onExpireRemoveCB);
- return true;
+ return SendImpl(remote, msg, onExpireRemoveCB);
} catch (std::exception &e) {
SetLastError(eError, "Send internal error.");
return false;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 3c38121..b21f7ef 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -497,9 +497,10 @@
out_msg_id = msg_id;
- auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
+ auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
auto &sock = SockClient();
BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
+ *head.mutable_dest() = remote_addr;
AddRoute(head, sock);
head.set_topic(req.topic());
@@ -519,8 +520,12 @@
};
try {
- BHAddress addr;
- return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+ if (remote_addr.ip().empty()) {
+ BHAddress addr;
+ return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+ } else {
+ return SendTo(CenterAddr(), req, cb);
+ }
} catch (...) {
SetLastError(eError, "internal error.");
return false;
@@ -536,25 +541,34 @@
try {
auto &sock = SockClient();
-
- BHAddress addr;
- if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
- LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
- BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
- AddRoute(head, sock);
- head.set_topic(request.topic());
-
- MsgI reply_msg(shm());
- DEFER1(reply_msg.Release(););
- BHMsgHead reply_head;
-
- if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
- reply_head.type() == kMsgTypeRequestTopicReply &&
- reply_msg.ParseBody(out_reply)) {
- reply_head.mutable_proc_id()->swap(out_proc_id);
- return true;
+ MQInfo dest;
+ if (!remote_addr.ip().empty()) {
+ dest = CenterAddr();
+ } else {
+ BHAddress addr;
+ if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
+ dest.offset_ = addr.abs_addr();
+ dest.id_ = addr.mq_id();
+ } else {
+ return false;
}
}
+
+ BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
+ *head.mutable_dest() = remote_addr;
+ AddRoute(head, sock);
+ head.set_topic(request.topic());
+
+ MsgI reply_msg(shm());
+ DEFER1(reply_msg.Release(););
+ BHMsgHead reply_head;
+
+ if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
+ reply_head.type() == kMsgTypeRequestTopicReply &&
+ reply_msg.ParseBody(out_reply)) {
+ reply_head.mutable_proc_id()->swap(out_proc_id);
+ return true;
+ }
} catch (...) {
SetLastError(eError, __func__ + std::string(" internal errer."));
}
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 3d842bf..bddcbf7 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -206,7 +206,7 @@
}
printf("\n");
};
- {
+ if (0) {
// query procs
std::string dest(BHAddress().SerializeAsString());
MsgQueryProc query;
@@ -224,14 +224,21 @@
// printf("register topic : %s\n", r ? "ok" : "failed");
// Sleep(1s);
}
- {
+ for (int i = 0; i < 3; ++i) {
// query procs with normal topic request
MsgRequestTopic req;
req.set_topic("#center_query_procs");
// req.set_data("{\"proc_id\":\"#center.node\"}");
std::string s(req.SerializeAsString());
// Sleep(10ms, false);
- std::string dest(BHAddress().SerializeAsString());
+ BHAddress host;
+ printf("query with ip set\n");
+ host.set_ip("127.0.0.1");
+ host.set_port(kBHCenterPort);
+ host.set_mq_id(1000011);
+ host.set_abs_addr(10296);
+
+ std::string dest(host.SerializeAsString());
void *proc_id = 0;
int proc_id_len = 0;
DEFER1(BHFree(proc_id, proc_id_len););
@@ -247,7 +254,7 @@
} else {
MsgRequestTopicReply ret;
ret.ParseFromArray(reply, reply_len);
- printf("topic query proc : %s\n", ret.data().c_str());
+ printf("\ntopic query proc : %s\n", ret.data().c_str());
// MsgQueryProcReply result;
// if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) {
// PrintProcs(result);
@@ -325,7 +332,7 @@
for (int i = 0; i < 1; ++i) {
MsgPublish pub;
pub.set_topic(topic_ + std::to_string(i));
- pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
+ pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
std::string s(pub.SerializeAsString());
BHPublish(s.data(), s.size(), 0);
// Sleep(1s);
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index a838252..86a0897 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -19,6 +19,7 @@
#include "defs.h"
#include "node_center.h"
#include "tcp_connection.h"
+#include "tcp_proxy.h"
#include "tcp_server.h"
#include "util.h"
#include <sys/ioctl.h>
@@ -33,34 +34,54 @@
BOOST_AUTO_TEST_CASE(TcpTest)
{
- SharedMemory &shm = TestShm();
-
const std::string connect_addr = "127.0.0.1";
const uint16_t port = kBHCenterPort;
- boost::asio::io_context io;
+ IoService io;
tcp::endpoint dest(ip::address::from_string(connect_addr), port);
- MsgRequestTopic req;
- req.set_topic("#center_query_procs");
- req.set_data("");
- auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
- auto route = head.add_route();
- route->set_mq_id(12345);
- route->set_abs_addr(67890);
- head.mutable_dest()->set_ip(connect_addr);
- head.mutable_dest()->set_port(port);
- head.mutable_dest()->set_mq_id(1000011);
- head.mutable_dest()->set_abs_addr(10296);
+ auto NewRequest = [&]() {
+ MsgRequestTopic req;
+ req.set_topic("#center_query_procs");
+ req.set_data("");
+ auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
+ auto route = head.add_route();
+ route->set_mq_id(12345);
+ route->set_abs_addr(67890);
- auto request(MsgI::Serialize(head, req));
- for (int i = 0; i < 1; ++i) {
- LOG_DEBUG() << "request size: " << request.size();
- TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm()));
+ head.mutable_dest()->set_ip(connect_addr);
+ head.mutable_dest()->set_port(port);
+ head.mutable_dest()->set_mq_id(1000011);
+ head.mutable_dest()->set_abs_addr(10296);
+
+ return (MsgI::Serialize(head, req));
+ };
+ auto onReply = [](BHMsgHead &head, std::string body_content) {
+ static int n = 0;
+ printf("reply %d: ", ++n);
+ MsgRequestTopicReply reply;
+ if (reply.ParseFromString(body_content)) {
+ if (IsSuccess(reply.errmsg().errcode())) {
+ printf("\ncontent: %s\n", reply.data().c_str());
+ } else {
+ printf("error: %s\n", reply.errmsg().errstring().c_str());
+ }
+ } else {
+ printf("parse error\n");
+ }
+ };
+ for (int i = 0; i < 100; ++i) {
+ auto request = NewRequest();
+ TcpRequest1::Create(io.io(), dest, request, onReply);
}
-
- io.run();
+ Sleep(2s);
+ printf("-------------------------------------------------------\n");
+ for (int i = 0; i < 3; ++i) {
+ auto request = NewRequest();
+ TcpRequest1::Create(io.io(), dest, request, onReply);
+ }
+ Sleep(2s);
printf("TcpTest\n");
}
--
Gitblit v1.8.0