tcp proxy requests, need more test.
| | |
| | | */ |
| | | #include "center.h" |
| | | #include "center_topic_node.h" |
| | | #include "io_service.h" |
| | | #include "node_center.h" |
| | | #include "tcp_proxy.h" |
| | | #include "tcp_server.h" |
| | | #include <chrono> |
| | | |
| | |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) |
| | | { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | |
| | | if (!head.dest().ip().empty()) { // other host, proxy |
| | | auto valid = [&]() { return head.route_size() == 1; }; |
| | | if (!valid()) { return false; } |
| | | |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | typedef MsgRequestTopicReply Reply; |
| | | Reply reply; |
| | | if (!center->CheckMsg(head, reply)) { |
| | | replyer(reply); |
| | | } else { |
| | | auto onResult = [¢er](BHMsgHead &head, std::string body_content) { |
| | | if (head.route_size() > 0) { |
| | | auto &back = head.route(head.route_size() - 1); |
| | | MQInfo dest = {back.mq_id(), back.abs_addr()}; |
| | | head.mutable_route()->RemoveLast(); |
| | | center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); |
| | | } |
| | | }; |
| | | if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) { |
| | | replyer(MakeReply<Reply>(eError, "send request failed.")); |
| | | } else { |
| | | // success |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | // ignore other msgs for now. |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(ProcInit); |
| | | CASE_ON_MSG_TYPE(Register); |
| | |
| | | { |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr, shm); |
| | | io_service_.reset(new IoService); |
| | | tcp_proxy_.reset(new TcpProxy(io_service_->io())); |
| | | |
| | | AddCenter(center_ptr, shm, *tcp_proxy_); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | } |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr)); |
| | | tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | |
| | | sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); |
| | | } |
| | | topic_node_->Start(); |
| | | tcp_server_->Start(); |
| | | return true; |
| | | } |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | tcp_server_->Stop(); |
| | | tcp_proxy_.reset(); |
| | | tcp_server_.reset(); |
| | | io_service_.reset(); |
| | | topic_node_->Stop(); |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |
| | |
| | | #include <memory> |
| | | class CenterTopicNode; |
| | | class TcpServer; |
| | | class TcpProxy; |
| | | class IoService; |
| | | |
| | | class BHCenter |
| | | { |
| | |
| | | |
| | | std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; |
| | | std::unique_ptr<CenterTopicNode> topic_node_; |
| | | |
| | | std::unique_ptr<IoService> io_service_; |
| | | std::unique_ptr<TcpServer> tcp_server_; |
| | | std::unique_ptr<TcpProxy> tcp_proxy_; |
| | | }; |
| | | |
| | | #endif // end of include guard: CENTER_TM9OUQTG |
| | |
| | | *reply.mutable_errmsg() = data.errmsg(); |
| | | reply.set_data(ToJson(data)); |
| | | } else { |
| | | SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic()); |
| | | SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic()); |
| | | } |
| | | pnode_->ServerSendReply(src_info, reply); |
| | | }; |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "io_service.h" |
| | | #include <chrono> |
| | | using namespace std::chrono_literals; |
| | | |
| | | bool IoService::Start() |
| | | IoService::IoService() : |
| | | guard_(io_.get_executor()) |
| | | { |
| | | Stop(); |
| | | bool cur = false; |
| | | if (!run_.compare_exchange_strong(cur, true)) { |
| | | return false; |
| | | } |
| | | |
| | | auto proc = [this]() { |
| | | while (run_) { |
| | | io_.run_one_for(100ms); |
| | | } |
| | | OnStop(); |
| | | }; |
| | | std::thread(proc).swap(worker_); |
| | | return true; |
| | | std::thread([this]() { io_.run(); }).swap(worker_); |
| | | } |
| | | |
| | | void IoService::Stop() |
| | | IoService::~IoService() |
| | | { |
| | | bool cur = true; |
| | | if (run_.compare_exchange_strong(cur, false)) { |
| | | if (worker_.joinable()) { |
| | | worker_.join(); |
| | | } |
| | | } |
| | | guard_.reset(); |
| | | io_.stop(); // normally not needed, but make sure run() exits. |
| | | if (worker_.joinable()) |
| | | worker_.join(); |
| | | } |
| | |
| | | class IoService |
| | | { |
| | | public: |
| | | IoService() : |
| | | run_(false) {} |
| | | bool Start(); |
| | | void Stop(); |
| | | IoService(); |
| | | ~IoService(); |
| | | |
| | | typedef boost::asio::io_context io_service_t; |
| | | io_service_t &io() { return io_; } |
| | | |
| | | private: |
| | | virtual void OnStop() {} |
| | | io_service_t io_; |
| | | typedef boost::asio::executor_work_guard<io_service_t::executor_type> guard_t; |
| | | guard_t guard_; |
| | | std::thread worker_; |
| | | std::atomic<bool> run_; |
| | | }; |
| | | |
| | | #endif // end of include guard: IO_SERVICE_ODKKJG3D |
| | |
| | | return; |
| | | } |
| | | // LOG_FUNCTION; |
| | | const size_t total = msgs_.size(); |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t limit = std::max(10000ul, total / 10); |
| | | int64_t n = 0; |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | |
| | | ++n; |
| | | }; |
| | | int n = now - msg.timestamp(); |
| | | if (n < 10) { |
| | | if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > NodeTimeoutSec()) { |
| | | Free(); |
| | | } else { |
| | | ++it; |
| | | } else if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > 60) { |
| | | Free(); |
| | | } |
| | | } |
| | | if (n > 0) { |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; |
| | | } |
| | | } |
| | | |
| | |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | | } |
| | | bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | |
| | | NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) |
| | | { |
| | | auto ssn = dest.id_ - (dest.id_ % 10); |
| | | LOG_DEBUG() << "prox ssn " << ssn; |
| | | Node node; |
| | | auto ssn = mq_id - (mq_id % 10); |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos == nodes_.end()) { |
| | | LOG_ERROR() << "proxy msg, ssn not found."; |
| | | if (pos != nodes_.end()) { |
| | | node = pos->second; |
| | | } |
| | | return node; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | return false; |
| | | } |
| | | auto &node = pos->second; |
| | | if (!Valid(*node)) { return false; } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | | auto route = head.add_route(); |
| | |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node) { |
| | | LOG_ERROR() << id() << " pass remote reply , ssn not found."; |
| | | return false; |
| | | } |
| | | auto offset = node->addrs_[dest.id_]; |
| | | if (offset != dest.offset_) { |
| | | LOG_ERROR() << id() << " pass remote reply, dest address not match"; |
| | | return false; |
| | | } |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return DefaultSender(node->shm_).Send(dest, msg); |
| | | } |
| | | |
| | | void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) |
| | | { |
| | | // LOG_FUNCTION; |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | | bool OnCommand(ShmSocket &socket, const int64_t val); |
| | |
| | | { |
| | | return HandleMsg<MsgCommonReply, Func>(head, op); |
| | | } |
| | | template <class Reply> |
| | | bool CheckMsg(const BHMsgHead &head, Reply &reply) |
| | | { |
| | | bool r = false; |
| | | auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); }; |
| | | reply = HandleMsg<Reply>(head, onOk); |
| | | return r; |
| | | } |
| | | |
| | | MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg); |
| | | MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); |
| | |
| | | return node && Valid(*node); |
| | | } |
| | | void RemoveNode(Node &node); |
| | | Node GetNode(const MQId mq); |
| | | |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | |
| | | #include <boost/asio.hpp> |
| | | #include <boost/uuid/string_generator.hpp> |
| | | #include <boost/uuid/uuid.hpp> |
| | | #include <functional> |
| | | #include <string> |
| | | |
| | | namespace ip = boost::asio::ip; |
| | | using boost::asio::ip::tcp; |
| | | typedef boost::system::error_code bserror_t; |
| | | |
| | | const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39"); |
| | | namespace bhome_msg |
| | | { |
| | | class BHMsgHead; |
| | | } |
| | | typedef std::function<void(bhome_msg::BHMsgHead &head, std::string body_content)> ReplyCB; |
| | | |
| | | #endif // end of include guard: TCP_COMMON_8S8O7OV |
| | |
| | | #include "log.h" |
| | | #include "msg.h" |
| | | #include "node_center.h" |
| | | #include "proto.h" |
| | | #include "shm_socket.h" |
| | | |
| | | namespace |
| | |
| | | bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content) |
| | | { |
| | | const char *p = buffer.data(); |
| | | LOG_DEBUG() << "msg len " << len; |
| | | if (4 > len) { return false; } |
| | | uint32_t head_len = Get32(p); |
| | | LOG_DEBUG() << "head_len " << head_len; |
| | | if (head_len > 1024 * 4) { |
| | | throw std::runtime_error("unexpected tcp reply data."); |
| | | } |
| | |
| | | |
| | | /// request ----------------------------------------------------------- |
| | | |
| | | void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content) |
| | | { |
| | | if (reply_cb_) { |
| | | reply_cb_(head, std::move(body_content)); |
| | | } |
| | | } |
| | | |
| | | void TcpRequest1::OnError(bserror_t ec) |
| | | { |
| | | LOG_ERROR() << "tcp client error: " << ec; |
| | | // LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message(); |
| | | BHMsgHead head; |
| | | std::string body_content; |
| | | try { |
| | | std::vector<char> req(request_.begin(), request_.end()); |
| | | if (CheckData(req, req.size(), head, body_content)) { |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString()); |
| | | } |
| | | } |
| | | } catch (std::exception &e) { |
| | | } |
| | | Close(); |
| | | } |
| | | |
| | | void TcpRequest1::Start() |
| | | { |
| | | auto readReply = [this]() { |
| | | // if (!reply_cb_) { return; } // no reply needed, maybe safe to close? |
| | | recv_buffer_.resize(1000); |
| | | recv_len_ = 0; |
| | | socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | |
| | | |
| | | socket_.async_connect(remote_, TcpCB(*this, request)); |
| | | } |
| | | void TcpRequest1::Close() |
| | | { |
| | | LOG_DEBUG() << "client close"; |
| | | socket_.close(); |
| | | } |
| | | void TcpRequest1::Close() { socket_.close(); } |
| | | void TcpRequest1::OnRead(size_t size) |
| | | { |
| | | LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_; |
| | | |
| | | recv_len_ += size; |
| | | BHMsgHead head; |
| | | std::string body_content; |
| | | bool recv_done = false; |
| | | try { |
| | | recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); |
| | | if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply. |
| | | Close(); |
| | | SendReply(head, std::move(body_content)); |
| | | } else { // not complete, read again |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | } |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << e.what(); |
| | | Close(); |
| | | return; |
| | | } |
| | | |
| | | if (recv_done) { |
| | | // just pass to client, no check, client will check it anyway. |
| | | LOG_DEBUG() << "route size: " << head.route_size(); |
| | | if (head.route_size() < 1) { return; } |
| | | auto &back = head.route(head.route_size() - 1); |
| | | MQInfo dest = {back.mq_id(), back.abs_addr()}; |
| | | head.mutable_route()->RemoveLast(); |
| | | |
| | | LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_; |
| | | MsgRequestTopicReply reply; |
| | | if (reply.ParseFromString(body_content)) { |
| | | LOG_DEBUG() << "err msg: " << reply.errmsg().errstring(); |
| | | LOG_DEBUG() << "content : " << reply.data(); |
| | | } |
| | | Close(); |
| | | return; |
| | | shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size())); |
| | | } else { // read again |
| | | LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | } |
| | | } |
| | | |
| | |
| | | void TcpReply1::OnError(bserror_t ec) { Close(); } |
| | | void TcpReply1::Close() |
| | | { |
| | | LOG_DEBUG() << "server close."; |
| | | LOG_TRACE() << "server close."; |
| | | socket_.close(); |
| | | } |
| | | |
| | |
| | | return; |
| | | } |
| | | |
| | | auto ParseBody = [&](auto &req) { |
| | | const char *p = recv_buffer_.data(); |
| | | uint32_t size = Get32(p); |
| | | p += 4; |
| | | p += size; |
| | | size = Get32(p); |
| | | p += 4; |
| | | return req.ParseFromArray(p, size); |
| | | }; |
| | | |
| | | if (recv_done) { |
| | | LOG_DEBUG() << "request data: " << size; |
| | | auto self(shared_from_this()); |
| | | LOG_TRACE() << "tcp server recv request data, size: " << size; |
| | | MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; |
| | | if (remote.id_ && remote.offset_) { |
| | | auto self(shared_from_this()); |
| | | auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | auto &scenter = *pscenter_; |
| | | if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) { |
| | | send_buffer_ = "fake reply"; |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | return; |
| | | } |
| | | } else { |
| | | LOG_DEBUG() << "no address"; |
| | | send_buffer_ = "no address"; |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | } |
| | | Close(); |
| | | |
| | | } else { // read again |
| | | LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); |
| | | } else { // not complete, read again |
| | | LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | } |
| | | }; |
| | |
| | | #include <memory> |
| | | |
| | | class ShmSocket; |
| | | class NodeCenter; |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | |
| | | class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1> |
| | | { |
| | | public: |
| | | static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) |
| | | static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) |
| | | { |
| | | std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start(); |
| | | std::make_shared<TcpRequest1>(io, addr, std::move(request), cb)->Start(); |
| | | } |
| | | |
| | | TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) : |
| | | socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {} |
| | | TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) : |
| | | socket_(io), reply_cb_(cb), remote_(addr), request_(std::move(request)) {} |
| | | void OnError(bserror_t ec); |
| | | |
| | | private: |
| | | void Start(); |
| | | void Close(); |
| | | void OnRead(size_t size); |
| | | void SendReply(BHMsgHead &head, std::string body_content); |
| | | |
| | | tcp::socket socket_; |
| | | ShmSocket &shm_socket_; // send reply |
| | | ReplyCB reply_cb_; |
| | | tcp::endpoint remote_; |
| | | std::string request_; |
| | | std::vector<char> recv_buffer_; |
| | |
| | | class TcpReply1 : public std::enable_shared_from_this<TcpReply1> |
| | | { |
| | | public: |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | static void Create(tcp::socket sock, CenterPtr pscenter) |
| | | { |
| | | std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start(); |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "tcp_proxy.h" |
| | | #include "defs.h" |
| | | #include "shm_socket.h" |
| | | #include "tcp_connection.h" |
| | | |
| | | TcpProxy::TcpProxy() : |
| | | run_(false) {} |
| | | |
| | | TcpProxy::~TcpProxy() {} |
| | | |
| | | bool TcpProxy::Start(bhome_shm::SharedMemory &shm) |
| | | { |
| | | Stop(); |
| | | bool cur = false; |
| | | if (!run_.compare_exchange_strong(cur, true)) { return false; } |
| | | |
| | | auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_; |
| | | local_.reset(new ShmSocket(mq.offset_, shm, mq.id_)); |
| | | auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | | auto &dest = head.dest(); |
| | | if (dest.ip().empty() || dest.port() == 0) { return; } |
| | | Request(dest.ip(), dest.port(), msg.content()); |
| | | }; |
| | | local_->Start(1, localProc); |
| | | |
| | | auto proxyProc = [this]() { |
| | | while (run_) { |
| | | io_context_.run_one_for(std::chrono::milliseconds(100)); |
| | | } |
| | | }; |
| | | std::thread(proxyProc).swap(worker_); |
| | | return true; |
| | | } |
| | | |
| | | void TcpProxy::Stop() |
| | | { |
| | | bool cur = true; |
| | | if (run_.compare_exchange_strong(cur, false)) { |
| | | if (worker_.joinable()) { |
| | | worker_.join(); |
| | | } |
| | | local_.reset(); |
| | | } |
| | | } |
| | | |
| | | bool TcpProxy::Request(const std::string &ip, int port, std::string &&content) |
| | | bool TcpProxy::Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb) |
| | | { |
| | | if (content.empty()) { return false; } |
| | | |
| | | tcp::endpoint dest(ip::address::from_string(ip), port); |
| | | TcpRequest1::Create(io_context_, dest, std::move(content), *local_); |
| | | try { |
| | | tcp::endpoint dest(ip::address::from_string(ip), port); |
| | | TcpRequest1::Create(io_, dest, std::move(content), cb); |
| | | LOG_TRACE() << "tcp request start " << ip << ':' << port; |
| | | return true; |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "proxy request exception: " << e.what(); |
| | | return false; |
| | | } |
| | | } |
| | |
| | | #ifndef TCP_PROXY_E1YJ92U5 |
| | | #define TCP_PROXY_E1YJ92U5 |
| | | |
| | | #include "shm.h" |
| | | #include "bh_util.h" |
| | | #include "io_service.h" |
| | | #include "tcp_common.h" |
| | | #include <atomic> |
| | | #include <thread> |
| | | #include <memory> |
| | | |
| | | class ShmSocket; |
| | | class NodeCenter; |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | |
| | | class TcpProxy |
| | | { |
| | | public: |
| | | TcpProxy(); |
| | | ~TcpProxy(); |
| | | bool Start(bhome_shm::SharedMemory &shm); |
| | | void Stop(); |
| | | typedef IoService::io_service_t io_service_t; |
| | | TcpProxy(io_service_t &io) : |
| | | io_(io) {} |
| | | bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb); |
| | | |
| | | private: |
| | | bool Request(const std::string &ip, int port, std::string &&content); |
| | | std::unique_ptr<ShmSocket> local_; |
| | | |
| | | boost::asio::io_context io_context_; |
| | | std::thread worker_; |
| | | std::atomic<bool> run_; |
| | | io_service_t &io_; |
| | | }; |
| | | |
| | | #endif // end of include guard: TCP_PROXY_E1YJ92U5 |
| | |
| | | |
| | | using namespace std::chrono_literals; |
| | | |
| | | TcpServer::TcpServer(int port, CenterPtr pscenter) : |
| | | listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) |
| | | { |
| | | Accept(); |
| | | } |
| | | |
| | | TcpServer::~TcpServer() { Stop(); } |
| | | |
| | | void TcpServer::OnStop() |
| | | { |
| | | listener_.close(); |
| | | } |
| | | |
| | | void TcpServer::Accept() |
| | | { |
| | | listener_.async_accept([this](bserror_t ec, tcp::socket sock) { |
| | | if (!ec) { |
| | | LOG_INFO() << "server accept client"; |
| | | LOG_TRACE() << "server accept client"; |
| | | TcpReply1::Create(std::move(sock), pscenter_); |
| | | Accept(); |
| | | } else { |
| | | // this is already destructed by now. |
| | | if (ec.value() != ECANCELED) { |
| | | LOG_WARNING() << "tcp server accept error: " << ec; |
| | | Accept(); |
| | | } |
| | | } |
| | | Accept(); |
| | | }); |
| | | } |
| | |
| | | #include "tcp_common.h" |
| | | |
| | | class NodeCenter; |
| | | class TcpServer : public IoService |
| | | class TcpServer |
| | | { |
| | | public: |
| | | typedef IoService::io_service_t io_service_t; |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | TcpServer(int port, CenterPtr pscenter); |
| | | ~TcpServer(); |
| | | TcpServer(io_service_t &io, int port, CenterPtr pscenter) : |
| | | io_(io), listener_(io_, tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) { Accept(); } |
| | | ~TcpServer() { listener_.close(); } |
| | | |
| | | private: |
| | | virtual void OnStop(); |
| | | io_service_t &io_; |
| | | void Accept(); |
| | | tcp::acceptor listener_; |
| | | CenterPtr pscenter_; |
| | |
| | | Free(); |
| | | } |
| | | } else if (n < 0) { |
| | | LOG_FATAL() << "error double release data."; |
| | | // ns_log::GetTrace(); |
| | | LOG_FATAL() << "double release msg."; |
| | | throw std::runtime_error("double release msg."); |
| | | } |
| | | return n; |
| | |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | SendImpl(remote, msg, onExpireRemoveCB); |
| | | return true; |
| | | return SendImpl(remote, msg, onExpireRemoveCB); |
| | | } catch (std::exception &e) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | |
| | | |
| | | out_msg_id = msg_id; |
| | | |
| | | auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto &sock = SockClient(); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); |
| | | *head.mutable_dest() = remote_addr; |
| | | AddRoute(head, sock); |
| | | head.set_topic(req.topic()); |
| | | |
| | |
| | | }; |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); |
| | | if (remote_addr.ip().empty()) { |
| | | BHAddress addr; |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); |
| | | } else { |
| | | return SendTo(CenterAddr(), req, cb); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | | return false; |
| | |
| | | |
| | | try { |
| | | auto &sock = SockClient(); |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id(); |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg(shm()); |
| | | DEFER1(reply_msg.Release();); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeRequestTopicReply && |
| | | reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | MQInfo dest; |
| | | if (!remote_addr.ip().empty()) { |
| | | dest = CenterAddr(); |
| | | } else { |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | dest.offset_ = addr.abs_addr(); |
| | | dest.id_ = addr.mq_id(); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); |
| | | *head.mutable_dest() = remote_addr; |
| | | AddRoute(head, sock); |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg(shm()); |
| | | DEFER1(reply_msg.Release();); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeRequestTopicReply && |
| | | reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, __func__ + std::string(" internal errer.")); |
| | | } |
| | |
| | | } |
| | | printf("\n"); |
| | | }; |
| | | { |
| | | if (0) { |
| | | // query procs |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | | MsgQueryProc query; |
| | |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | // Sleep(1s); |
| | | } |
| | | { |
| | | for (int i = 0; i < 3; ++i) { |
| | | // query procs with normal topic request |
| | | MsgRequestTopic req; |
| | | req.set_topic("#center_query_procs"); |
| | | // req.set_data("{\"proc_id\":\"#center.node\"}"); |
| | | std::string s(req.SerializeAsString()); |
| | | // Sleep(10ms, false); |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | | BHAddress host; |
| | | printf("query with ip set\n"); |
| | | host.set_ip("127.0.0.1"); |
| | | host.set_port(kBHCenterPort); |
| | | host.set_mq_id(1000011); |
| | | host.set_abs_addr(10296); |
| | | |
| | | std::string dest(host.SerializeAsString()); |
| | | void *proc_id = 0; |
| | | int proc_id_len = 0; |
| | | DEFER1(BHFree(proc_id, proc_id_len);); |
| | |
| | | } else { |
| | | MsgRequestTopicReply ret; |
| | | ret.ParseFromArray(reply, reply_len); |
| | | printf("topic query proc : %s\n", ret.data().c_str()); |
| | | printf("\ntopic query proc : %s\n", ret.data().c_str()); |
| | | // MsgQueryProcReply result; |
| | | // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) { |
| | | // PrintProcs(result); |
| | |
| | | for (int i = 0; i < 1; ++i) { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic_ + std::to_string(i)); |
| | | pub.set_data("pub_data_" + std::string(1024 * 1, 'a')); |
| | | pub.set_data("pub_data_" + std::string(104 * 1, 'a')); |
| | | std::string s(pub.SerializeAsString()); |
| | | BHPublish(s.data(), s.size(), 0); |
| | | // Sleep(1s); |
| | |
| | | #include "defs.h" |
| | | #include "node_center.h" |
| | | #include "tcp_connection.h" |
| | | #include "tcp_proxy.h" |
| | | #include "tcp_server.h" |
| | | #include "util.h" |
| | | #include <sys/ioctl.h> |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(TcpTest) |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | |
| | | const std::string connect_addr = "127.0.0.1"; |
| | | const uint16_t port = kBHCenterPort; |
| | | |
| | | boost::asio::io_context io; |
| | | IoService io; |
| | | |
| | | tcp::endpoint dest(ip::address::from_string(connect_addr), port); |
| | | MsgRequestTopic req; |
| | | req.set_topic("#center_query_procs"); |
| | | req.set_data(""); |
| | | auto head = InitMsgHead(GetType(req), "#test_proc", 1000000); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(12345); |
| | | route->set_abs_addr(67890); |
| | | |
| | | head.mutable_dest()->set_ip(connect_addr); |
| | | head.mutable_dest()->set_port(port); |
| | | head.mutable_dest()->set_mq_id(1000011); |
| | | head.mutable_dest()->set_abs_addr(10296); |
| | | auto NewRequest = [&]() { |
| | | MsgRequestTopic req; |
| | | req.set_topic("#center_query_procs"); |
| | | req.set_data(""); |
| | | auto head = InitMsgHead(GetType(req), "#test_proc", 1000000); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(12345); |
| | | route->set_abs_addr(67890); |
| | | |
| | | auto request(MsgI::Serialize(head, req)); |
| | | for (int i = 0; i < 1; ++i) { |
| | | LOG_DEBUG() << "request size: " << request.size(); |
| | | TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm())); |
| | | head.mutable_dest()->set_ip(connect_addr); |
| | | head.mutable_dest()->set_port(port); |
| | | head.mutable_dest()->set_mq_id(1000011); |
| | | head.mutable_dest()->set_abs_addr(10296); |
| | | |
| | | return (MsgI::Serialize(head, req)); |
| | | }; |
| | | auto onReply = [](BHMsgHead &head, std::string body_content) { |
| | | static int n = 0; |
| | | printf("reply %d: ", ++n); |
| | | MsgRequestTopicReply reply; |
| | | if (reply.ParseFromString(body_content)) { |
| | | if (IsSuccess(reply.errmsg().errcode())) { |
| | | printf("\ncontent: %s\n", reply.data().c_str()); |
| | | } else { |
| | | printf("error: %s\n", reply.errmsg().errstring().c_str()); |
| | | } |
| | | } else { |
| | | printf("parse error\n"); |
| | | } |
| | | }; |
| | | for (int i = 0; i < 100; ++i) { |
| | | auto request = NewRequest(); |
| | | TcpRequest1::Create(io.io(), dest, request, onReply); |
| | | } |
| | | |
| | | io.run(); |
| | | Sleep(2s); |
| | | printf("-------------------------------------------------------\n"); |
| | | for (int i = 0; i < 3; ++i) { |
| | | auto request = NewRequest(); |
| | | TcpRequest1::Create(io.io(), dest, request, onReply); |
| | | } |
| | | Sleep(2s); |
| | | |
| | | printf("TcpTest\n"); |
| | | } |