From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/shm_socket.cpp | 88 ++++++++++++++++++++++++++----------------- 1 files changed, 53 insertions(+), 35 deletions(-) diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index 4b687d5..fa10dd3 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -20,26 +20,21 @@ #include "bh_util.h" #include "defs.h" #include "msg.h" +#include "sleeper.h" #include <chrono> + using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_msg; using namespace bhome_shm; -ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : - run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); } -ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : - run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); } -ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : - run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); } - ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) { auto ioProc = [this, onData, onRaw, onIdle]() { - auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; + auto DoSend = [this]() { return send_buffer_.TrySend(); }; auto DoRecv = [=] { // do not recv if no cb is set. if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; } @@ -71,7 +66,7 @@ if (IsCmd(val)) { onCmdCB(*this, val); } else { - MsgI imsg(val); + MsgI imsg(val, shm()); DEFER1(imsg.Release()); BHMsgHead head; if (imsg.ParseHead(head)) { @@ -90,11 +85,15 @@ }; try { + thread_local FibUSleeper sleeper(1000 * 10); + bool more_to_send = DoSend(); bool more_to_recv = DoRecv(); if (onIdle) { onIdle(*this); } if (!more_to_send && !more_to_recv) { - robust::QuickSleep(); + sleeper.Sleep(); + } else { + sleeper.Reset(); } } catch (...) { } @@ -107,7 +106,7 @@ while (run_) { ioProc(); } // try send pending msgs. auto end_time = steady_clock::now() + 3s; - while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) { + while (send_buffer_.TrySend() && steady_clock::now() < end_time) { // LOG_DEBUG() << "try send pending msgs."; } }; @@ -139,46 +138,65 @@ return false; } -bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +bool ShmSocket::Send(const MQInfo &remote, std::string &&content) { size_t size = content.size(); - auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable { if (!msg.Fill(content)) { return false; } try { - if (!cb) { - Send(remote, msg); - } else { - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - Send(remote, msg, onExpireRemoveCB); - } + SendImpl(remote, msg); return true; } catch (...) { SetLastError(eError, "Send internal error."); return false; } }; -#if 0 - // self alloc - MsgI msg; - if (msg.Make(size)) { - DEFER1(msg.Release()); - return OnResult(msg); - } -#else - // center alloc + return RequestAlloc(size, OnResult); -#endif +} + +bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb) +{ + try { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + return SendImpl(remote, msg, onExpireRemoveCB); + } catch (std::exception &e) { + SetLastError(eError, "Send internal error."); + return false; + } +} + +bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +{ + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb)); + }; + + return RequestAlloc(size, OnResult); } bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag +#if 0 + // self alloc + MsgI msg(shm()); + if (msg.Make(size)) { + DEFER1(msg.Release()); + return onResult(msg); + } else { + return false; + } +#endif + // LOG_FUNCTION; if (node_proc_index_ == -1 || socket_index_ == -1) { + LOG_ERROR() << "socket not inited."; return false; } int id = (++alloc_id_) & MaskBits(28); @@ -188,7 +206,7 @@ (id << 4) | EncodeCmd(eCmdAllocRequest0); auto rawCB = [onResult](ShmSocket &sock, int64_t &val) { - MsgI msg((val >> 32) & MaskBits(31)); + MsgI msg(((val >> 32) & MaskBits(31)), sock.shm()); DEFER1(msg.Release()); onResult(msg); return true; @@ -200,5 +218,5 @@ alloc_cbs_->Pick(id, cb_no_use); }; - return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); + return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB); } \ No newline at end of file -- Gitblit v1.8.0