From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 14:15:41 +0800
Subject: [PATCH] join pub/sub to node; refactor.

---
 src/socket.cpp |   48 +++++++++++++++++-------------------------------
 1 files changed, 17 insertions(+), 31 deletions(-)

diff --git a/src/socket.cpp b/src/socket.cpp
index f2b29f4..116175d 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -43,51 +43,37 @@
 
 bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
 {
-	auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
-		auto Find = [&](RecvCB &cb) {
-			std::lock_guard<std::mutex> lock(mutex());
-			const std::string &msgid = head.msg_id();
-			auto pos = async_cbs_.find(msgid);
-			if (pos != async_cbs_.end()) {
-				cb.swap(pos->second);
-				async_cbs_.erase(pos);
-				return true;
-			} else {
-				return false;
-			}
-		};
-
+	auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
 		RecvCB cb;
-		if (Find(cb)) {
+		if (async_cbs_->Find(head.msg_id(), cb)) {
 			cb(socket, imsg, head);
 		} else if (onData) {
 			onData(socket, imsg, head);
 		} // else ignored, or dropped
 	};
 
-	std::lock_guard<std::mutex> lock(mutex_);
-	StopNoLock();
-	auto RecvProc = [this, onRecv, onIdle]() {
-		while (run_) {
-			try {
-				MsgI imsg;
-				if (mq().Recv(imsg, 10)) {
-					DEFER1(imsg.Release(shm()));
-					BHMsgHead head;
-					if (imsg.ParseHead(head)) {
-						onRecv(*this, imsg, head);
-					}
-				} else if (onIdle) {
-					onIdle(*this);
+	auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
+		try {
+			MsgI imsg;
+			if (mq().Recv(imsg, 10)) {
+				DEFER1(imsg.Release(shm()));
+				BHMsgHead head;
+				if (imsg.ParseHead(head)) {
+					onRecvWithPerMsgCB(*this, imsg, head);
 				}
-			} catch (...) {
+			} else if (onIdle) {
+				onIdle(*this);
 			}
+		} catch (...) {
 		}
 	};
 
+	std::lock_guard<std::mutex> lock(mutex_);
+	StopNoLock();
+
 	run_.store(true);
 	for (int i = 0; i < nworker; ++i) {
-		workers_.emplace_back(RecvProc);
+		workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
 	}
 	return true;
 }

--
Gitblit v1.8.0