From 026bbfaf2b5d73a26b8e2fa49158883ef64c211b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 27 五月 2021 13:51:26 +0800 Subject: [PATCH] tcp server call center to send proxy requests. --- src/shm_socket.cpp | 58 +++++++++++++++++++++++++++++++++++++++------------------- 1 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index 11824d7..d54168d 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -145,46 +145,66 @@ return false; } -bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +bool ShmSocket::Send(const MQInfo &remote, std::string &&content) { size_t size = content.size(); - auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable { if (!msg.Fill(content)) { return false; } try { - if (!cb) { - Send(remote, msg); - } else { - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - Send(remote, msg, onExpireRemoveCB); - } + SendImpl(remote, msg); return true; } catch (...) { SetLastError(eError, "Send internal error."); return false; } }; + + return RequestAlloc(size, OnResult); +} + +bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb) +{ + try { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + SendImpl(remote, msg, onExpireRemoveCB); + return true; + } catch (std::exception &e) { + SetLastError(eError, "Send internal error."); + return false; + } +} + +bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +{ + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb)); + }; + + return RequestAlloc(size, OnResult); +} + +bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) +{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag #if 0 // self alloc MsgI msg(shm()); if (msg.Make(size)) { DEFER1(msg.Release()); - return OnResult(msg); + return onResult(msg); + } else { + return false; } -#else - // center alloc - return RequestAlloc(size, OnResult); #endif -} -bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) -{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag // LOG_FUNCTION; if (node_proc_index_ == -1 || socket_index_ == -1) { + LOG_ERROR() << "socket not inited."; return false; } int id = (++alloc_id_) & MaskBits(28); -- Gitblit v1.8.0