From cab831748a2a9cc18b7f18f3b5e14a4374b7ab68 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 17 五月 2021 18:34:26 +0800 Subject: [PATCH] socket send using abs addr, avoid shm find by id. --- src/socket.cpp | 40 ++++++++++++++++++++++++++++++++-------- 1 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/socket.cpp b/src/socket.cpp index 4f09517..55b43f9 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -80,15 +80,13 @@ } }; ShmMsgQueue::RawData val = 0; - auto TryRecvMore = [&]() { - for (int i = 0; i < 100; ++i) { - if (mq().TryRecv(val)) { - return true; - } + for (int i = 0; i < 100; ++i) { + if (mq().TryRecv(val)) { + onRecv(val); + return true; } - return false; - }; - return TryRecvMore() ? (onRecv(val), true) : false; + } + return false; }; try { @@ -160,6 +158,31 @@ return false; } +bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +{ + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + if (!msg.Fill(content)) { return; } + + try { + if (!cb) { + Send(remote, msg); + } else { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + Send(remote, msg, onExpireRemoveCB); + } + } catch (...) { + SetLastError(eError, "Send internal error."); + } + }; + + return RequestAlloc(size, OnResult); +} + bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag // LOG_FUNCTION; @@ -184,5 +207,6 @@ RawRecvCB cb_no_use; alloc_cbs_->Pick(id, cb_no_use); }; + return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); } \ No newline at end of file -- Gitblit v1.8.0