From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 12 四月 2021 18:29:41 +0800
Subject: [PATCH] add fail-resend support.

---
 src/socket.cpp |   56 ++++++++++++++++++++++----------------------------------
 1 files changed, 22 insertions(+), 34 deletions(-)

diff --git a/src/socket.cpp b/src/socket.cpp
index f2b29f4..2c55665 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -43,51 +43,39 @@
 
 bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
 {
-	auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
-		auto Find = [&](RecvCB &cb) {
-			std::lock_guard<std::mutex> lock(mutex());
-			const std::string &msgid = head.msg_id();
-			auto pos = async_cbs_.find(msgid);
-			if (pos != async_cbs_.end()) {
-				cb.swap(pos->second);
-				async_cbs_.erase(pos);
-				return true;
-			} else {
-				return false;
-			}
-		};
-
+	auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
 		RecvCB cb;
-		if (Find(cb)) {
+		if (per_msg_cbs_->Find(head.msg_id(), cb)) {
 			cb(socket, imsg, head);
 		} else if (onData) {
 			onData(socket, imsg, head);
-		} // else ignored, or dropped
+		} else { // else ignored, or dropped
+		}
+	};
+
+	auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
+		try {
+			MsgI imsg;
+			if (mq().Recv(imsg, 10)) {
+				DEFER1(imsg.Release(shm()));
+				BHMsgHead head;
+				if (imsg.ParseHead(head)) {
+					onRecvWithPerMsgCB(*this, imsg, head);
+				}
+			}
+			if (onIdle) {
+				onIdle(*this);
+			}
+		} catch (...) {
+		}
 	};
 
 	std::lock_guard<std::mutex> lock(mutex_);
 	StopNoLock();
-	auto RecvProc = [this, onRecv, onIdle]() {
-		while (run_) {
-			try {
-				MsgI imsg;
-				if (mq().Recv(imsg, 10)) {
-					DEFER1(imsg.Release(shm()));
-					BHMsgHead head;
-					if (imsg.ParseHead(head)) {
-						onRecv(*this, imsg, head);
-					}
-				} else if (onIdle) {
-					onIdle(*this);
-				}
-			} catch (...) {
-			}
-		}
-	};
 
 	run_.store(true);
 	for (int i = 0; i < nworker; ++i) {
-		workers_.emplace_back(RecvProc);
+		workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
 	}
 	return true;
 }

--
Gitblit v1.8.0