From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/shm_socket.cpp | 64 +++++++++++++++++++------------- 1 files changed, 38 insertions(+), 26 deletions(-) diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index 11824d7..fa10dd3 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -29,13 +29,6 @@ using namespace bhome_msg; using namespace bhome_shm; -ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : - run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } -ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : - run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } -ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : - run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); } - ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) @@ -145,46 +138,65 @@ return false; } -bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +bool ShmSocket::Send(const MQInfo &remote, std::string &&content) { size_t size = content.size(); - auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable { if (!msg.Fill(content)) { return false; } try { - if (!cb) { - Send(remote, msg); - } else { - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - Send(remote, msg, onExpireRemoveCB); - } + SendImpl(remote, msg); return true; } catch (...) { SetLastError(eError, "Send internal error."); return false; } }; + + return RequestAlloc(size, OnResult); +} + +bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb) +{ + try { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + return SendImpl(remote, msg, onExpireRemoveCB); + } catch (std::exception &e) { + SetLastError(eError, "Send internal error."); + return false; + } +} + +bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +{ + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb)); + }; + + return RequestAlloc(size, OnResult); +} + +bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) +{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag #if 0 // self alloc MsgI msg(shm()); if (msg.Make(size)) { DEFER1(msg.Release()); - return OnResult(msg); + return onResult(msg); + } else { + return false; } -#else - // center alloc - return RequestAlloc(size, OnResult); #endif -} -bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) -{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag // LOG_FUNCTION; if (node_proc_index_ == -1 || socket_index_ == -1) { + LOG_ERROR() << "socket not inited."; return false; } int id = (++alloc_id_) & MaskBits(28); -- Gitblit v1.8.0