From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- src/socket.cpp | 48 +++++++++++++++++------------------------------- 1 files changed, 17 insertions(+), 31 deletions(-) diff --git a/src/socket.cpp b/src/socket.cpp index f2b29f4..116175d 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -43,51 +43,37 @@ bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { - auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { - auto Find = [&](RecvCB &cb) { - std::lock_guard<std::mutex> lock(mutex()); - const std::string &msgid = head.msg_id(); - auto pos = async_cbs_.find(msgid); - if (pos != async_cbs_.end()) { - cb.swap(pos->second); - async_cbs_.erase(pos); - return true; - } else { - return false; - } - }; - + auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { RecvCB cb; - if (Find(cb)) { + if (async_cbs_->Find(head.msg_id(), cb)) { cb(socket, imsg, head); } else if (onData) { onData(socket, imsg, head); } // else ignored, or dropped }; - std::lock_guard<std::mutex> lock(mutex_); - StopNoLock(); - auto RecvProc = [this, onRecv, onIdle]() { - while (run_) { - try { - MsgI imsg; - if (mq().Recv(imsg, 10)) { - DEFER1(imsg.Release(shm())); - BHMsgHead head; - if (imsg.ParseHead(head)) { - onRecv(*this, imsg, head); - } - } else if (onIdle) { - onIdle(*this); + auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { + try { + MsgI imsg; + if (mq().Recv(imsg, 10)) { + DEFER1(imsg.Release(shm())); + BHMsgHead head; + if (imsg.ParseHead(head)) { + onRecvWithPerMsgCB(*this, imsg, head); } - } catch (...) { + } else if (onIdle) { + onIdle(*this); } + } catch (...) { } }; + std::lock_guard<std::mutex> lock(mutex_); + StopNoLock(); + run_.store(true); for (int i = 0; i < nworker; ++i) { - workers_.emplace_back(RecvProc); + workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } }); } return true; } -- Gitblit v1.8.0