From 11f6c600e55ca5677f93624efe44d2605cdd908d Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 20:18:38 +0800 Subject: [PATCH] reserve #,@ prefix for internal proc id and topic. --- src/shm_socket.cpp | 26 ++++++++++++++++---------- 1 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index 4b687d5..11824d7 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -20,7 +20,9 @@ #include "bh_util.h" #include "defs.h" #include "msg.h" +#include "sleeper.h" #include <chrono> + using namespace std::chrono; using namespace std::chrono_literals; @@ -28,18 +30,18 @@ using namespace bhome_shm; ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : - run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); } + run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : - run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); } + run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : - run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); } + run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); } ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) { auto ioProc = [this, onData, onRaw, onIdle]() { - auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; + auto DoSend = [this]() { return send_buffer_.TrySend(); }; auto DoRecv = [=] { // do not recv if no cb is set. if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; } @@ -71,7 +73,7 @@ if (IsCmd(val)) { onCmdCB(*this, val); } else { - MsgI imsg(val); + MsgI imsg(val, shm()); DEFER1(imsg.Release()); BHMsgHead head; if (imsg.ParseHead(head)) { @@ -90,11 +92,15 @@ }; try { + thread_local FibUSleeper sleeper(1000 * 10); + bool more_to_send = DoSend(); bool more_to_recv = DoRecv(); if (onIdle) { onIdle(*this); } if (!more_to_send && !more_to_recv) { - robust::QuickSleep(); + sleeper.Sleep(); + } else { + sleeper.Reset(); } } catch (...) { } @@ -107,7 +113,7 @@ while (run_) { ioProc(); } // try send pending msgs. auto end_time = steady_clock::now() + 3s; - while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) { + while (send_buffer_.TrySend() && steady_clock::now() < end_time) { // LOG_DEBUG() << "try send pending msgs."; } }; @@ -164,7 +170,7 @@ }; #if 0 // self alloc - MsgI msg; + MsgI msg(shm()); if (msg.Make(size)) { DEFER1(msg.Release()); return OnResult(msg); @@ -188,7 +194,7 @@ (id << 4) | EncodeCmd(eCmdAllocRequest0); auto rawCB = [onResult](ShmSocket &sock, int64_t &val) { - MsgI msg((val >> 32) & MaskBits(31)); + MsgI msg(((val >> 32) & MaskBits(31)), sock.shm()); DEFER1(msg.Release()); onResult(msg); return true; @@ -200,5 +206,5 @@ alloc_cbs_->Pick(id, cb_no_use); }; - return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); + return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB); } \ No newline at end of file -- Gitblit v1.8.0