tcp server call center to send proxy requests.
| | |
| | | #include "center.h" |
| | | #include "center_topic_node.h" |
| | | #include "node_center.h" |
| | | #include "tcp_server.h" |
| | | #include <chrono> |
| | | |
| | | using namespace std::chrono; |
| | |
| | | } |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | |
| | | sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); |
| | | } |
| | | topic_node_->Start(); |
| | | tcp_server_->Start(); |
| | | return true; |
| | | } |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | tcp_server_->Stop(); |
| | | topic_node_->Stop(); |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |
| | |
| | | #include <map> |
| | | #include <memory> |
| | | class CenterTopicNode; |
| | | class TcpServer; |
| | | |
| | | class BHCenter |
| | | { |
| | |
| | | |
| | | std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; |
| | | std::unique_ptr<CenterTopicNode> topic_node_; |
| | | std::unique_ptr<TcpServer> tcp_server_; |
| | | }; |
| | | |
| | | #endif // end of include guard: CENTER_TM9OUQTG |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: io_service.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年05月27日 13时25分18秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "io_service.h" |
| | | #include <chrono> |
| | | using namespace std::chrono_literals; |
| | | |
| | | bool IoService::Start() |
| | | { |
| | | Stop(); |
| | | bool cur = false; |
| | | if (!run_.compare_exchange_strong(cur, true)) { |
| | | return false; |
| | | } |
| | | |
| | | auto proc = [this]() { |
| | | while (run_) { |
| | | io_.run_one_for(100ms); |
| | | } |
| | | OnStop(); |
| | | }; |
| | | std::thread(proc).swap(worker_); |
| | | return true; |
| | | } |
| | | |
| | | void IoService::Stop() |
| | | { |
| | | bool cur = true; |
| | | if (run_.compare_exchange_strong(cur, false)) { |
| | | if (worker_.joinable()) { |
| | | worker_.join(); |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: io_service.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年05月27日 13时25分37秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef IO_SERVICE_ODKKJG3D |
| | | #define IO_SERVICE_ODKKJG3D |
| | | |
| | | #include <boost/asio.hpp> |
| | | #include <thread> |
| | | |
| | | class IoService |
| | | { |
| | | public: |
| | | IoService() : |
| | | run_(false) {} |
| | | bool Start(); |
| | | void Stop(); |
| | | |
| | | typedef boost::asio::io_context io_service_t; |
| | | io_service_t &io() { return io_; } |
| | | |
| | | private: |
| | | virtual void OnStop() {} |
| | | io_service_t io_; |
| | | std::thread worker_; |
| | | std::atomic<bool> run_; |
| | | }; |
| | | |
| | | #endif // end of include guard: IO_SERVICE_ODKKJG3D |
| | |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | | } |
| | | bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | auto ssn = dest.id_ - (dest.id_ % 10); |
| | | LOG_DEBUG() << "prox ssn " << ssn; |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos == nodes_.end()) { |
| | | LOG_ERROR() << "proxy msg, ssn not found."; |
| | | return false; |
| | | } |
| | | auto &node = pos->second; |
| | | if (!Valid(*node)) { return false; } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(sender.id()); |
| | | route->set_abs_addr(sender.AbsAddr()); |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) |
| | | { |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | | bool OnCommand(ShmSocket &socket, const int64_t val); |
| | |
| | | */ |
| | | #include "tcp_connection.h" |
| | | #include "log.h" |
| | | #include "msg.h" |
| | | #include "node_center.h" |
| | | #include "shm_socket.h" |
| | | |
| | | namespace |
| | | { |
| | | template <class C> |
| | | auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); } |
| | | auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); } |
| | | using boost::asio::async_read; |
| | | using boost::asio::async_write; |
| | | |
| | | typedef std::function<void()> VoidHandler; |
| | | typedef std::function<void(size_t)> SizeHandler; |
| | | |
| | | template <class T, class... Param> |
| | | auto TcpCallback(T &conn, std::function<void(Param...)> const &func) |
| | | { |
| | | auto self(conn.shared_from_this()); |
| | | return [self, func](bserror_t ec, Param... size) { |
| | | if (!ec) { |
| | | func(size...); |
| | | } else { |
| | | self->OnError(ec); |
| | | } |
| | | }; |
| | | } |
| | | |
| | | template <class T> |
| | | auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); } |
| | | |
| | | template <class T> |
| | | auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); } |
| | | |
| | | template <class T> |
| | | auto TcpCBSize(T &conn, VoidHandler const &func) |
| | | { |
| | | return TcpCBSize(conn, [func](size_t) { func(); }); |
| | | } |
| | | |
| | | bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content) |
| | | { |
| | | const char *p = buffer.data(); |
| | | LOG_DEBUG() << "msg len " << len; |
| | | if (4 > len) { return false; } |
| | | uint32_t head_len = Get32(p); |
| | | LOG_DEBUG() << "head_len " << head_len; |
| | | if (head_len > 1024 * 4) { |
| | | throw std::runtime_error("unexpected tcp reply data."); |
| | | } |
| | | auto before_body = 4 + head_len + 4; |
| | | if (before_body > len) { |
| | | if (before_body > buffer.size()) { |
| | | buffer.resize(before_body); |
| | | } |
| | | return false; |
| | | } |
| | | if (!head.ParseFromArray(p + 4, head_len)) { |
| | | throw std::runtime_error("tcp recv invalid reply head."); |
| | | } |
| | | uint32_t body_len = Get32(p + 4 + head_len); |
| | | buffer.resize(before_body + body_len); |
| | | if (buffer.size() > len) { return false; } |
| | | body_content.assign(p + before_body, body_len); |
| | | return true; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) : |
| | | socket_(io), remote_(addr), request_(std::move(request)) {} |
| | | void TcpRequest1::Connect() |
| | | /// request ----------------------------------------------------------- |
| | | |
| | | void TcpRequest1::OnError(bserror_t ec) |
| | | { |
| | | auto self = shared_from_this(); |
| | | socket_.async_connect(remote_, [this, self](bserror_t ec) { |
| | | if (!ec) { |
| | | SendRequest(); |
| | | } else { |
| | | LOG_ERROR() << "connect error " << ec; |
| | | Close(); |
| | | } |
| | | }); |
| | | LOG_ERROR() << "tcp client error: " << ec; |
| | | Close(); |
| | | } |
| | | |
| | | void TcpRequest1::Start() |
| | | { |
| | | auto readReply = [this]() { |
| | | recv_buffer_.resize(1000); |
| | | recv_len_ = 0; |
| | | socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | }; |
| | | auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); }; |
| | | |
| | | socket_.async_connect(remote_, TcpCB(*this, request)); |
| | | } |
| | | void TcpRequest1::Close() |
| | | { |
| | | LOG_DEBUG() << "client close"; |
| | | socket_.close(); |
| | | } |
| | | void TcpRequest1::OnRead(size_t size) |
| | | { |
| | | LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_; |
| | | |
| | | recv_len_ += size; |
| | | BHMsgHead head; |
| | | std::string body_content; |
| | | bool recv_done = false; |
| | | try { |
| | | recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << e.what(); |
| | | Close(); |
| | | return; |
| | | } |
| | | |
| | | if (recv_done) { |
| | | // just pass to client, no check, client will check it anyway. |
| | | LOG_DEBUG() << "route size: " << head.route_size(); |
| | | if (head.route_size() < 1) { return; } |
| | | auto &back = head.route(head.route_size() - 1); |
| | | MQInfo dest = {back.mq_id(), back.abs_addr()}; |
| | | head.mutable_route()->RemoveLast(); |
| | | |
| | | LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_; |
| | | MsgRequestTopicReply reply; |
| | | if (reply.ParseFromString(body_content)) { |
| | | LOG_DEBUG() << "err msg: " << reply.errmsg().errstring(); |
| | | LOG_DEBUG() << "content : " << reply.data(); |
| | | } |
| | | Close(); |
| | | return; |
| | | shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size())); |
| | | } else { // read again |
| | | LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | } |
| | | } |
| | | |
| | | /// reply -------------------------------------------------------------- |
| | | |
| | | void TcpReply1::OnError(bserror_t ec) { Close(); } |
| | | void TcpReply1::Close() |
| | | { |
| | | LOG_DEBUG() << "server close."; |
| | | socket_.close(); |
| | | } |
| | | |
| | | void TcpRequest1::SendRequest() |
| | | { |
| | | LOG_INFO() << "client sending request " << request_; |
| | | auto self = shared_from_this(); |
| | | async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) { |
| | | if (!ec) { |
| | | ReadReply(); |
| | | } else { |
| | | Close(); |
| | | } |
| | | }); |
| | | } |
| | | void TcpRequest1::ReadReply() |
| | | { |
| | | buffer_.resize(1000); |
| | | auto self = shared_from_this(); |
| | | socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) { |
| | | if (!ec) { |
| | | printf("reply data: %s\n", buffer_.data()); |
| | | } else { |
| | | Close(); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | TcpReply1::TcpReply1(tcp::socket sock) : |
| | | socket_(std::move(sock)) {} |
| | | |
| | | void TcpReply1::Start() |
| | | { |
| | | LOG_INFO() << "server session reading..."; |
| | | recv_buffer_.resize(1000); |
| | | auto self(shared_from_this()); |
| | | socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) { |
| | | LOG_INFO() << "server read : " << recv_buffer_.data(); |
| | | // fake reply |
| | | if (!ec) { |
| | | send_buffer_ = std::string(recv_buffer_.data(), size) + " reply"; |
| | | async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) { |
| | | socket_.close(); |
| | | }); |
| | | socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | } |
| | | |
| | | void TcpReply1::OnRead(size_t size) |
| | | { |
| | | recv_len_ += size; |
| | | BHMsgHead head; |
| | | std::string body_content; |
| | | bool recv_done = false; |
| | | try { |
| | | recv_done = CheckData(recv_buffer_, recv_len_, head, body_content); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << e.what(); |
| | | Close(); |
| | | return; |
| | | } |
| | | |
| | | auto ParseBody = [&](auto &req) { |
| | | const char *p = recv_buffer_.data(); |
| | | uint32_t size = Get32(p); |
| | | p += 4; |
| | | p += size; |
| | | size = Get32(p); |
| | | p += 4; |
| | | return req.ParseFromArray(p, size); |
| | | }; |
| | | |
| | | if (recv_done) { |
| | | LOG_DEBUG() << "request data: " << size; |
| | | auto self(shared_from_this()); |
| | | MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; |
| | | if (remote.id_ && remote.offset_) { |
| | | auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | auto &scenter = *pscenter_; |
| | | if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) { |
| | | send_buffer_ = "fake reply"; |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | } |
| | | } else { |
| | | socket_.close(); |
| | | LOG_DEBUG() << "no address"; |
| | | send_buffer_ = "no address"; |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | } else { // read again |
| | | LOG_DEBUG() << "not complete, read again " << recv_buffer_.size(); |
| | | socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); |
| | | } |
| | | }; |
| | |
| | | #ifndef TCP_CONNECTION_H373GIL5 |
| | | #define TCP_CONNECTION_H373GIL5 |
| | | |
| | | #include "bh_util.h" |
| | | #include "node_center.h" |
| | | #include "tcp_common.h" |
| | | #include <functional> |
| | | #include <memory> |
| | | |
| | | class ShmSocket; |
| | | class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1> |
| | | { |
| | | public: |
| | | static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) |
| | | static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) |
| | | { |
| | | std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect(); |
| | | std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start(); |
| | | } |
| | | |
| | | TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request); |
| | | TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) : |
| | | socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {} |
| | | void OnError(bserror_t ec); |
| | | |
| | | private: |
| | | void Connect(); |
| | | void Start(); |
| | | void Close(); |
| | | void SendRequest(); |
| | | void ReadReply(); |
| | | void OnRead(size_t size); |
| | | |
| | | tcp::socket socket_; |
| | | ShmSocket &shm_socket_; // send reply |
| | | tcp::endpoint remote_; |
| | | std::string request_; |
| | | std::vector<char> buffer_; |
| | | std::vector<char> recv_buffer_; |
| | | size_t recv_len_ = 0; |
| | | }; |
| | | |
| | | class NodeCenter; |
| | | class TcpReply1 : public std::enable_shared_from_this<TcpReply1> |
| | | { |
| | | public: |
| | | static void Create(tcp::socket sock) |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | static void Create(tcp::socket sock, CenterPtr pscenter) |
| | | { |
| | | std::make_shared<TcpReply1>(std::move(sock))->Start(); |
| | | std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start(); |
| | | } |
| | | |
| | | TcpReply1(tcp::socket sock); |
| | | void Start(); |
| | | TcpReply1(tcp::socket sock, CenterPtr pscenter) : |
| | | socket_(std::move(sock)), pscenter_(pscenter) {} |
| | | void OnError(bserror_t ec); |
| | | |
| | | private: |
| | | void Start(); |
| | | void Close(); |
| | | void OnRead(size_t size); |
| | | |
| | | tcp::socket socket_; |
| | | CenterPtr pscenter_; |
| | | std::vector<char> recv_buffer_; |
| | | uint32_t recv_len_ = 0; |
| | | std::string send_buffer_; |
| | | }; |
| | | |
| | |
| | | auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | | auto &dest = head.dest(); |
| | | if (dest.ip().empty() || dest.port() == 0) { return; } |
| | | bool r = Send(dest.ip(), dest.port(), msg.content()); |
| | | // TODO check send fail. |
| | | Request(dest.ip(), dest.port(), msg.content()); |
| | | }; |
| | | local_->Start(1, localProc); |
| | | |
| | |
| | | io_context_.run_one_for(std::chrono::milliseconds(100)); |
| | | } |
| | | }; |
| | | std::thread(proxyProc).swap(worker_); |
| | | return true; |
| | | } |
| | | |
| | | void TcpProxy::Stop() |
| | | { |
| | | local_.reset(); |
| | | |
| | | bool cur = true; |
| | | if (run_.compare_exchange_strong(cur, false)) { |
| | | if (worker_.joinable()) { |
| | | worker_.join(); |
| | | } |
| | | local_.reset(); |
| | | } |
| | | } |
| | | |
| | | bool TcpProxy::Send(const std::string &ip, int port, std::string &&content) |
| | | bool TcpProxy::Request(const std::string &ip, int port, std::string &&content) |
| | | { |
| | | if (content.empty()) { return false; } |
| | | |
| | | tcp::endpoint dest(ip::address::from_string(ip), port); |
| | | TcpRequest1::Create(io_context_, dest, std::move(content)); |
| | | |
| | | // char tag[sizeof(kBHTcpServerTag)] = {0}; |
| | | |
| | | // int n = read(sock, tag, sizeof(tag)); |
| | | // if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) { |
| | | // send(sock, content.data(), content.size(), 0); |
| | | // connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); }; |
| | | // // success |
| | | // } |
| | | TcpRequest1::Create(io_context_, dest, std::move(content), *local_); |
| | | } |
| | |
| | | void Stop(); |
| | | |
| | | private: |
| | | bool Send(const std::string &ip, int port, std::string &&content); |
| | | bool Request(const std::string &ip, int port, std::string &&content); |
| | | std::unique_ptr<ShmSocket> local_; |
| | | |
| | | boost::asio::io_context io_context_; |
| | |
| | | |
| | | using namespace std::chrono_literals; |
| | | |
| | | TcpServer::TcpServer(int port) : |
| | | run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port)) |
| | | TcpServer::TcpServer(int port, CenterPtr pscenter) : |
| | | listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) |
| | | { |
| | | Accept(); |
| | | } |
| | | |
| | | TcpServer::~TcpServer() { Stop(); } |
| | | |
| | | bool TcpServer::Start() |
| | | void TcpServer::OnStop() |
| | | { |
| | | Stop(); |
| | | bool cur = false; |
| | | if (run_.compare_exchange_strong(cur, true)) { |
| | | auto proc = [this]() { |
| | | while (run_) { |
| | | io_.run_one_for(100ms); |
| | | } |
| | | }; |
| | | std::thread(proc).swap(worker_); |
| | | } |
| | | } |
| | | void TcpServer::Stop() |
| | | { |
| | | bool cur = true; |
| | | if (run_.compare_exchange_strong(cur, false)) { |
| | | io_.post([this]() { |
| | | listener_.close(); |
| | | }); |
| | | std::this_thread::sleep_for(1s); |
| | | if (worker_.joinable()) { |
| | | worker_.join(); |
| | | } |
| | | } |
| | | listener_.close(); |
| | | } |
| | | |
| | | void TcpServer::Accept() |
| | |
| | | listener_.async_accept([this](bserror_t ec, tcp::socket sock) { |
| | | if (!ec) { |
| | | LOG_INFO() << "server accept client"; |
| | | TcpReply1::Create(std::move(sock)); |
| | | TcpReply1::Create(std::move(sock), pscenter_); |
| | | } |
| | | Accept(); |
| | | }); |
| | |
| | | #ifndef TCP_SERVER_795VXR94 |
| | | #define TCP_SERVER_795VXR94 |
| | | |
| | | #include "bh_util.h" |
| | | #include "io_service.h" |
| | | #include "tcp_common.h" |
| | | #include <thread> |
| | | |
| | | class TcpServer |
| | | class NodeCenter; |
| | | class TcpServer : public IoService |
| | | { |
| | | public: |
| | | explicit TcpServer(int port); |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | TcpServer(int port, CenterPtr pscenter); |
| | | ~TcpServer(); |
| | | bool Start(); |
| | | void Stop(); |
| | | |
| | | private: |
| | | virtual void OnStop(); |
| | | void Accept(); |
| | | std::thread worker_; |
| | | std::atomic<bool> run_; |
| | | boost::asio::io_context io_; |
| | | tcp::acceptor listener_; |
| | | CenterPtr pscenter_; |
| | | }; |
| | | |
| | | #endif // end of include guard: TCP_SERVER_795VXR94 |
| | |
| | | }; |
| | | OffsetType offset_; |
| | | SharedMemory *pshm_; |
| | | void *Alloc(const size_t size) |
| | | |
| | | void *Alloc(const size_t size, const void *src = nullptr) |
| | | { |
| | | void *p = shm().Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | | auto pmeta = new (p) Meta(size); |
| | | p = pmeta + 1; |
| | | if (src) { |
| | | memcpy(p, src, size); |
| | | } |
| | | } |
| | | return p; |
| | | } |
| | |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | void *Pack(const std::string &content) |
| | | void *Pack(const BHMsgHead &head, const uint32_t head_len, const std::string &body_content) |
| | | { |
| | | void *addr = get(); |
| | | if (addr) { |
| | | memcpy(addr, content.data(), content.size()); |
| | | meta()->size_ = content.size(); |
| | | auto p = static_cast<char *>(addr); |
| | | auto Pack1 = [&p](uint32_t len, auto &&writer) { |
| | | Put32(p, len); |
| | | p += sizeof(len); |
| | | writer(p, len); |
| | | p += len; |
| | | }; |
| | | Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); }); |
| | | Pack1(body_content.size(), [&](void *p, int len) { memcpy(p, body_content.data(), len); }); |
| | | meta()->size_ = 4 + head_len + 4 + body_content.size(); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | void *Pack(const void *src, const size_t size) |
| | | { |
| | | void *addr = get(); |
| | | if (addr && src) { |
| | | memcpy(addr, src, size); |
| | | meta()->size_ = size; |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | void *Pack(const std::string &content) { return Pack(content.data(), content.size()); } |
| | | |
| | | bool Make(void *addr) |
| | | { |
| | |
| | | uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; |
| | | return Make(size) && Pack(head, head_len, body, body_len); |
| | | } |
| | | inline bool Make(const BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | uint32_t head_len = head.ByteSizeLong(); |
| | | uint32_t size = sizeof(head_len) + head_len + sizeof(uint32_t) + body_content.size(); |
| | | return Make(size) && Pack(head, head_len, body_content); |
| | | } |
| | | |
| | | template <class Body> |
| | | inline bool Fill(const BHMsgHead &head, const Body &body) |
| | | { |
| | |
| | | return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len); |
| | | } |
| | | |
| | | inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); } |
| | | inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); } |
| | | inline bool Make(const void *src, const size_t size) { return Make(Alloc(size, src)); } |
| | | inline bool Fill(const void *src, const size_t size) { return valid() && (meta()->capacity_ >= size) && Pack(src, size); } |
| | | |
| | | inline bool Make(const std::string &content) { return Make(content.data(), content.size()); } |
| | | inline bool Fill(const std::string &content) { return Fill(content.data(), content.size()); } |
| | | |
| | | inline bool Make(const size_t size) { return Make(Alloc(size)); } |
| | | |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) |
| | | bool ShmSocket::Send(const MQInfo &remote, std::string &&content) |
| | | { |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return false; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | | Send(remote, msg); |
| | | } else { |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | SendImpl(remote, msg); |
| | | return true; |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | return RequestAlloc(size, OnResult); |
| | | } |
| | | |
| | | bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb) |
| | | { |
| | | try { |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | SendImpl(remote, msg, onExpireRemoveCB); |
| | | return true; |
| | | } catch (std::exception &e) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) |
| | | { |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb)); |
| | | }; |
| | | |
| | | return RequestAlloc(size, OnResult); |
| | | } |
| | | |
| | | bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) |
| | | { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag |
| | | #if 0 |
| | | // self alloc |
| | | MsgI msg(shm()); |
| | | if (msg.Make(size)) { |
| | | DEFER1(msg.Release()); |
| | | return OnResult(msg); |
| | | return onResult(msg); |
| | | } else { |
| | | return false; |
| | | } |
| | | #else |
| | | // center alloc |
| | | return RequestAlloc(size, OnResult); |
| | | #endif |
| | | } |
| | | |
| | | bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) |
| | | { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag |
| | | // LOG_FUNCTION; |
| | | if (node_proc_index_ == -1 || socket_index_ == -1) { |
| | | LOG_ERROR() << "socket not inited."; |
| | | return false; |
| | | } |
| | | int id = (++alloc_id_) & MaskBits(28); |
| | |
| | | |
| | | bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); |
| | | |
| | | bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb); |
| | | template <class Body> |
| | | bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | { |
| | | return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) |
| | | { |
| | | return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); |
| | | } |
| | | bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); } |
| | | bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb); |
| | | |
| | | template <class Body> |
| | | bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); } |
| | | bool Send(const MQInfo &remote, std::string &&content); |
| | | bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); } |
| | | |
| | | template <class... T> |
| | | bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) |
| | | { |
| | |
| | | private: |
| | | bool StopNoLock(); |
| | | bool RunningNoLock() { return !workers_.empty(); } |
| | | |
| | | bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); |
| | | |
| | | template <class... Rest> |
| | | bool SendImpl(Rest &&...rest) |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | |
| | | #include "defs.h" |
| | | #include "node_center.h" |
| | | #include "tcp_connection.h" |
| | | #include "tcp_server.h" |
| | | #include "util.h" |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(TcpTest) |
| | | { |
| | | const std::string bind_addr = "127.0.0.1"; |
| | | const std::string connect_addr = "127.0.0.1"; |
| | | const uint16_t port = 10000; |
| | | SharedMemory &shm = TestShm(); |
| | | |
| | | TcpServer server(port); |
| | | server.Start(); |
| | | const std::string connect_addr = "127.0.0.1"; |
| | | const uint16_t port = kBHCenterPort; |
| | | |
| | | boost::asio::io_context io; |
| | | |
| | | tcp::endpoint dest(ip::address::from_string(connect_addr), port); |
| | | for (int i = 0; i < 10; ++i) { |
| | | TcpRequest1::Create(io, dest, "client->server " + std::to_string(i)); |
| | | MsgRequestTopic req; |
| | | req.set_topic("#center_query_procs"); |
| | | req.set_data(""); |
| | | auto head = InitMsgHead(GetType(req), "#test_proc", 1000000); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(12345); |
| | | route->set_abs_addr(67890); |
| | | |
| | | head.mutable_dest()->set_ip(connect_addr); |
| | | head.mutable_dest()->set_port(port); |
| | | head.mutable_dest()->set_mq_id(1000011); |
| | | head.mutable_dest()->set_abs_addr(10296); |
| | | |
| | | auto request(MsgI::Serialize(head, req)); |
| | | for (int i = 0; i < 1; ++i) { |
| | | LOG_DEBUG() << "request size: " << request.size(); |
| | | TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm())); |
| | | } |
| | | |
| | | io.run(); |