From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 17:32:35 +0800
Subject: [PATCH] server resend failed; rename msgs; refactor.

---
 src/topic_request.h          |   32 --
 .vscode/tasks.json           |   28 --
 src/socket.h                 |   16 +
 src/topic_reply.cpp          |  142 ++++++++++++
 src/reqrep_center.cpp        |   22 -
 src/topic_reply.h            |   52 ++++
 .vscode/settings.json        |    6 
 src/socket.cpp               |   10 
 proto/source/bhome_msg.proto |   70 ++++--
 utest/utest.cpp              |   19 +
 proto/source/error_msg.proto |   16 +
 src/shm_queue.cpp            |   18 -
 src/msg.cpp                  |   33 +-
 utest/speed_test.cpp         |    2 
 .vscode/launch.json          |    5 
 src/shm_queue.h              |   30 +-
 src/pubsub.cpp               |    4 
 src/pubsub_center.cpp        |    4 
 src/topic_request.cpp        |  106 +--------
 19 files changed, 375 insertions(+), 240 deletions(-)

diff --git a/.vscode/launch.json b/.vscode/launch.json
index c959458..9eeb23e 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -9,7 +9,10 @@
             "type": "cppdbg",
             "request": "launch",
             "program": "${workspaceFolder}/utest/utest",
-            "args": [],
+            "args": [
+                "-t",
+                "ReqRepTest"
+            ],
             "stopAtEntry": false,
             "cwd": "${workspaceFolder}",
             "environment": [],
diff --git a/.vscode/settings.json b/.vscode/settings.json
index c9d8618..d5005e9 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -56,5 +56,9 @@
         "typeindex": "cpp",
         "typeinfo": "cpp",
         "variant": "cpp"
-    }
+    },
+    "files.exclude": {
+        "**/*.un~": true
+    },
+    "cmake.configureOnOpen": false
 }
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 352fba0..84142bc 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -3,32 +3,13 @@
         {
             "type": "cppbuild",
             "label": "C/C++: g++ build active file",
-            "command": "/usr/bin/g++",
+            "command": "ninja",
             "args": [
-                "-g",
-                "${file}",
-                "-o",
-                "${fileDirname}/${fileBasenameNoExtension}"
+                "-C",
+                "../build"
             ],
             "options": {
-                "cwd": "${workspaceFolder}"
-            },
-            "problemMatcher": [
-                "$gcc"
-            ],
-            "group": {
-                "kind": "build",
-                "isDefault": true
-            },
-            "detail": "Task generated by Debugger."
-        },
-        {
-            "type": "cppbuild",
-            "label": "C/C++: g++ build active file",
-            "command": "make",
-            "args": ["build"],
-            "options": {
-                "cwd": "${workspaceFolder}"
+                "cwd": "${workspaceFolder}/utest"
             },
             "problemMatcher": [
                 "$gcc"
@@ -36,7 +17,6 @@
             "group": "build",
             "detail": "compiler: /usr/bin/g++"
         }
-        
     ],
     "version": "2.0.0"
 }
\ No newline at end of file
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index a8e5073..9827f17 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -2,7 +2,11 @@
 
 option optimize_for = LITE_RUNTIME;
 
+import "google/protobuf/descriptor.proto";
+import "error_msg.proto";
+
 package bhome.msg;
+
 
 // message format : header(BHMsgHead) + body(variable types)
 message BHAddress {
@@ -13,7 +17,7 @@
 
 message ProcInfo
 {
-	bytes id = 1;
+	bytes id = 1; // serial number, maybe managed
 	bytes name = 2;
 	bytes public_info = 3;
 	bytes private_info = 4;
@@ -28,6 +32,10 @@
 	bytes topic = 6; // for request route
 }
 
+message BHMsgBody {
+	bytes data = 1;
+}
+
 message BHMsg { // deprecated
 	bytes msg_id = 1;
 	int64 timestamp = 2;
@@ -38,55 +46,71 @@
 
 enum MsgType {
 	kMsgTypeInvalid = 0;
-	kMsgTypeRequest = 1;
-	kMsgTypeReply = 2;
-	kMsgTypePublish = 3;
-	kMsgTypeSubscribe = 4;
-	kMsgTypeUnsubscribe = 5;
 
-	kMsgTypeProcQueryTopic = 6;
-	kMsgTypeProcQueryTopicReply = 7;
-	kMsgTypeProcRegisterTopics = 8;
-	kMsgTypeProcHeartbeat = 9;
+	kMsgTypeCommonReply = 2;
+
+	kMsgTypeRegister= 10;
+	// kMsgTypeRegisterReply= 11;
+	kMsgTypeHeartbeat = 12;
+	// kMsgTypeHeartbeatReply = 13;
+	kMsgTypeQueryTopic = 14;
+	kMsgTypeQueryTopicReply = 15;
+	kMsgTypeRequestTopic = 16;
+	kMsgTypeRequestTopicReply = 17;
+
+	kMsgTypePublish = 100;
+	// kMsgTypePublishReply = 101;
+	kMsgTypeSubscribe = 102;
+	// kMsgTypeSubscribeReply = 103;
+	kMsgTypeUnsubscribe = 104;
+	// kMsgTypeUnsubscribeReply = 105;
+
 }
 
-message DataPub {
+message MsgPub {
 	bytes topic = 1;
 	bytes data = 2; 
 }
 
-message DataSub {
+message MsgSub {
 	repeated bytes topics = 1;
 }
 
-message DataRequest {
+message MsgCommonReply {
+	ErrorMsg errmsg = 1;
+}
+
+message MsgRequestTopic {
 	bytes topic = 1;
 	bytes data = 2; 
 }
 
-message DataReply {
-	bytes data = 1; 
+message MsgRequestTopicReply {
+	ErrorMsg errmsg = 1;
+	bytes data = 2; 
 }
 
-message DataProcRegister
+message MsgRegister
 {
 	ProcInfo proc = 1;
 	repeated bytes topics = 2;
 }
 
-message DataProcHeartbeat
+message MsgHeartbeat
 {
 	ProcInfo proc = 1;
 }
 
-message DataProcQueryTopic {
+message MsgQueryTopic {
 	bytes topic = 1;
 }
 
-message DataProcQueryTopicReply {
-	BHAddress address = 1;
+message MsgQueryTopicReply {
+	ErrorMsg errmsg = 1;
+	BHAddress address = 2;
 }
 
-service TopicRequestReplyService {
-	rpc Request (DataRequest) returns (DataReply);
-}
\ No newline at end of file
+service TopicRPC {
+	rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
+	rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
+}
diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto
new file mode 100644
index 0000000..f283108
--- /dev/null
+++ b/proto/source/error_msg.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+option optimize_for = LITE_RUNTIME;
+
+package bhome.msg;
+
+enum ErrorCode {
+    eSuccess = 0;
+    eError = 1;
+    eInvalidInput = 2;
+}
+
+message ErrorMsg {
+    ErrorCode errCode = 1;
+    bytes errString = 2;
+}
diff --git a/src/msg.cpp b/src/msg.cpp
index c1dfff9..8752066 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -20,7 +20,10 @@
 
 namespace bhome_msg
 {
-
+/*TODO change msg format, header has proc info;
+reply has errer msg
+    center accept request and route.;
+//*/
 const uint32_t kMsgTag = 0xf1e2d3c4;
 const uint32_t kMsgPrefixLen = 4;
 
@@ -43,9 +46,9 @@
 
 BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
 {
-	BHMsg msg(InitMsg(kMsgTypeRequest));
+	BHMsg msg(InitMsg(kMsgTypeRequestTopic));
 	AddRoute(msg, src_id);
-	DataRequest req;
+	MsgRequestTopic req;
 	req.set_topic(topic);
 	req.set_data(data, size);
 	msg.set_body(req.SerializeAsString());
@@ -54,9 +57,9 @@
 
 BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
 {
-	BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
+	BHMsg msg(InitMsg(kMsgTypeRegister));
 	AddRoute(msg, src_id);
-	DataProcRegister reg;
+	MsgRegister reg;
 	reg.mutable_proc()->Swap(&info);
 	for (auto &t : topics) {
 		reg.add_topics(t);
@@ -67,9 +70,9 @@
 
 BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
 {
-	BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
+	BHMsg msg(InitMsg(kMsgTypeHeartbeat));
 	AddRoute(msg, src_id);
-	DataProcRegister reg;
+	MsgHeartbeat reg;
 	reg.mutable_proc()->Swap(&info);
 	msg.set_body(reg.SerializeAsString());
 	return msg;
@@ -78,8 +81,8 @@
 BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
 {
 	assert(data && size);
-	BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
-	DataReply reply;
+	BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
+	MsgRequestTopicReply reply;
 	reply.set_data(data, size);
 	msg.set_body(reply.SerializeAsString());
 	return msg;
@@ -90,7 +93,7 @@
 	assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
 	BHMsg msg(InitMsg(sub_unsub));
 	AddRoute(msg, client);
-	DataSub subs;
+	MsgSub subs;
 	for (auto &t : topics) {
 		subs.add_topics(t);
 	}
@@ -105,7 +108,7 @@
 {
 	assert(data && size);
 	BHMsg msg(InitMsg(kMsgTypePublish));
-	DataPub pub;
+	MsgPub pub;
 	pub.set_topic(topic);
 	pub.set_data(data, size);
 	msg.set_body(pub.SerializeAsString());
@@ -114,17 +117,17 @@
 
 BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
 {
-	BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
+	BHMsg msg(InitMsg(kMsgTypeQueryTopic));
 	AddRoute(msg, client);
-	DataProcQueryTopic query;
+	MsgQueryTopic query;
 	query.set_topic(topic);
 	msg.set_body(query.SerializeAsString());
 	return msg;
 }
 BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
 {
-	BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
-	DataProcQueryTopicReply reply;
+	BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
+	MsgQueryTopicReply reply;
 	reply.mutable_address()->set_mq_id(mqid);
 	msg.set_body(reply.SerializeAsString());
 	return msg;
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 8d26e0b..90688ec 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -49,7 +49,7 @@
 {
 	auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
 		if (msg.type() == kMsgTypePublish) {
-			DataPub d;
+			MsgPub d;
 			if (d.ParseFromString(msg.body())) {
 				tdcb(d.topic(), d.data());
 			}
@@ -65,7 +65,7 @@
 {
 	BHMsg msg;
 	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
-		DataPub d;
+		MsgPub d;
 		if (d.ParseFromString(msg.body())) {
 			d.mutable_topic()->swap(topic);
 			d.mutable_data()->swap(data);
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index b3af47d..698327e 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -94,7 +94,7 @@
 		auto &shm = socket.shm();
 
 		auto OnSubChange = [&](auto &&update) {
-			DataSub sub;
+			MsgSub sub;
 			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
 				assert(sizeof(MQId) == msg.route(0).mq_id().size());
 				MQId client;
@@ -106,7 +106,7 @@
 		auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
 
 		auto OnPublish = [&]() {
-			DataPub pub;
+			MsgPub pub;
 			if (!pub.ParseFromString(msg.body())) {
 				return;
 			}
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index e52b0fd..ce35d1c 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -100,12 +100,6 @@
 	std::unordered_map<ProcId, Node> nodes_;
 };
 
-Synced<NodeCenter> &Center()
-{
-	static Synced<NodeCenter> s;
-	return s;
-}
-
 } // namespace
 
 BHCenter::MsgHandler MakeReqRepCenter()
@@ -120,7 +114,7 @@
 		time_t now = 0;
 		time(&now);
 		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket.Pending());
+			printf("center queue size: %ld\n", socket.Pending());
 		}
 #endif
 		auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
@@ -128,7 +122,7 @@
 		auto OnRegister = [&]() {
 			if (msg.route_size() != 1) { return; }
 
-			DataProcRegister reg;
+			MsgRegister reg;
 			if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
 				center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
 			}
@@ -138,7 +132,7 @@
 			if (msg.route_size() != 1) { return; }
 			auto &src_mq = msg.route(0).mq_id();
 
-			DataProcHeartbeat hb;
+			MsgHeartbeat hb;
 			if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
 				center->Heartbeat(*hb.mutable_proc(), SrcMQ());
 			}
@@ -147,7 +141,7 @@
 		auto OnQueryTopic = [&]() {
 			if (msg.route_size() != 1) { return; }
 
-			DataProcQueryTopic query;
+			MsgQueryTopic query;
 			NodeCenter::ProcAddr dest;
 			if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
 				MQId remote;
@@ -161,9 +155,9 @@
 		};
 
 		switch (msg.type()) {
-		case kMsgTypeProcRegisterTopics: OnRegister(); return true;
-		case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
-		case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
+		case kMsgTypeRegister: OnRegister(); return true;
+		case kMsgTypeHeartbeat: OnHeartbeat(); return true;
+		case kMsgTypeQueryTopic: OnQueryTopic(); return true;
 		default: return false;
 		}
 	};
@@ -176,4 +170,4 @@
 
 	const int kMaxWorker = 16;
 	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
\ No newline at end of file
+}
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index de38229..dcb5a9e 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -76,24 +76,16 @@
 	Queue *remote = Find(shm, MsgQIdToName(remote_id));
 	return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
 }
+bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+{
+	Queue *remote = Find(shm, MsgQIdToName(remote_id));
+	return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
+}
 
 // Test shows that in the 2 cases:
 // 1) build msg first, then find remote queue;
 // 2) find remote queue first, then build msg;
 // 1 is about 50% faster than 2, maybe cache related.
-
-bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend)
-{
-	MsgI msg;
-	if (msg.Make(shm(), data)) {
-		if (Send(remote_id, msg, timeout_ms, onsend)) {
-			return true;
-		} else {
-			msg.Release(shm());
-		}
-	}
-	return false;
-}
 
 bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
 {
diff --git a/src/shm_queue.h b/src/shm_queue.h
index e9b3a1a..ab8a88c 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -134,22 +134,26 @@
 	bool Recv(BHMsg &msg, const int timeout_ms);
 	bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
 	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
-	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+
+	template <class... Extra>
+	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra)
 	{
-		return Send(shm, remote_id, msg, timeout_ms, []() {});
+		return Send(shm(), remote_id, msg, timeout_ms, extra...);
 	}
-	bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend);
-	bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms)
+
+	template <class... Extra>
+	bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra)
 	{
-		return Send(remote_id, msg, timeout_ms, []() {});
-	}
-	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
-	{
-		return Send(shm(), remote_id, msg, timeout_ms, onsend);
-	}
-	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
-	{
-		return Send(shm(), remote_id, msg, timeout_ms);
+		MsgI msg;
+		if (msg.Make(shm(), data)) {
+			if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
+				return true;
+			} else {
+				msg.Release(shm());
+			}
+		}
+		return false;
 	}
 	size_t Pending() const { return data()->size(); }
 };
diff --git a/src/socket.cpp b/src/socket.cpp
index 73681f1..b9def0c 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -49,15 +49,15 @@
 	Stop(); //TODO should stop in sub class, incase thread access sub class data.
 }
 
-bool ShmSocket::Start(const RecvCB &onData, int nworker)
+bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
 {
-	if (!mq_) {
-		return false;
+	if (!mq_ || !onData) {
+		return false; // TODO error code.
 	}
 
 	std::lock_guard<std::mutex> lock(mutex_);
 	StopNoLock();
-	auto RecvProc = [this, onData]() {
+	auto RecvProc = [this, onData, onIdle]() {
 		while (run_) {
 			try {
 				MsgI imsg;
@@ -67,6 +67,8 @@
 					if (imsg.Unpack(msg)) {
 						onData(*this, imsg, msg);
 					}
+				} else if (onIdle) {
+					onIdle(*this);
 				}
 			} catch (...) {
 			}
diff --git a/src/socket.h b/src/socket.h
index 1a3d47b..57d0ae4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -37,6 +37,7 @@
 	typedef bhome_shm::SharedMemory Shm;
 	typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
 	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
+	typedef std::function<void(ShmSocket &sock)> IdleCB;
 
 	ShmSocket(Shm &shm, const void *id, const int len);
 	ShmSocket(Shm &shm, const int len = 12);
@@ -44,22 +45,27 @@
 
 	Shm &shm() { return shm_; }
 	// start recv.
-	bool Start(const RecvCB &onData, int nworker = 1);
+	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1);
+	bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); }
+	bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1)
+	{
+		return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker);
+	}
 	bool Start(const RecvBHMsgCB &onData, int nworker = 1)
 	{
-		return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
+		return Start(onData, IdleCB(), nworker);
 	}
 	bool Stop();
 	size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
+
+	bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
+	bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
 
 protected:
 	const Shm &shm() const { return shm_; }
 	Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
 	const Queue &mq() const { return *mq_; }
 	std::mutex &mutex() { return mutex_; }
-
-	bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
-	bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
 
 private:
 	bool StopNoLock();
diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp
new file mode 100644
index 0000000..356cf3e
--- /dev/null
+++ b/src/topic_reply.cpp
@@ -0,0 +1,142 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  topic_reply.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�06鏃� 14鏃�40鍒�52绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "topic_reply.h"
+#include <chrono>
+#include <list>
+
+using namespace bhome_msg;
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+namespace
+{
+struct SrcInfo {
+	std::vector<BHAddress> route;
+	std::string msg_id;
+};
+
+class FailedQ
+{
+	struct FailedMsg {
+		steady_clock::time_point xpr;
+		std::string remote_;
+		BHMsg msg_;
+		FailedMsg(const std::string &addr, BHMsg &&msg) :
+		    xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
+		bool Expired() { return steady_clock::now() > xpr; }
+	};
+	typedef std::list<FailedMsg> Queue;
+	Synced<Queue> queue_;
+
+public:
+	void Push(const std::string &remote, BHMsg &&msg)
+	{
+		queue_->emplace_back(remote, std::move(msg));
+	}
+	void TrySend(ShmSocket &socket, const int timeout_ms = 0)
+	{
+		queue_.Apply([&](Queue &q) {
+			if (!q.empty()) {
+				auto it = q.begin();
+				do {
+					if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
+						it = q.erase(it);
+					} else {
+						++it;
+					}
+				} while (it != q.end());
+			}
+		});
+	}
+};
+
+} // namespace
+
+bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
+{
+	//TODO check reply?
+	return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+}
+bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
+{
+	return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+}
+bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
+{
+	auto failed_q = std::make_shared<FailedQ>();
+
+	auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
+
+	auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
+		if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
+			MsgRequestTopic req;
+			if (req.ParseFromString(msg.body())) {
+				std::string out;
+				if (rcb(req.topic(), req.data(), out)) {
+					BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
+					for (int i = 0; i < msg.route_size() - 1; ++i) {
+						msg.add_route()->Swap(msg.mutable_route(i));
+					}
+					if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
+						failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
+					}
+				}
+			}
+		} else {
+			// ignored, or dropped
+		}
+
+		onIdle(*this);
+	};
+
+	return rcb && Start(onRecv, onIdle, nworker);
+}
+
+bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+	BHMsg msg;
+	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
+		MsgRequestTopic request;
+		if (request.ParseFromString(msg.body())) {
+			request.mutable_topic()->swap(topic);
+			request.mutable_data()->swap(data);
+			SrcInfo *p = new SrcInfo;
+			p->route.assign(msg.route().begin(), msg.route().end());
+			p->msg_id = msg.msg_id();
+			src_info = p;
+			return true;
+		}
+	}
+	return false;
+}
+
+bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+	SrcInfo *p = static_cast<SrcInfo *>(src_info);
+	DEFER1(delete p);
+	if (!p || p->route.empty()) {
+		return false;
+	}
+
+	BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
+	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+		msg.add_route()->Swap(&p->route[i]);
+	}
+
+	return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
+}
\ No newline at end of file
diff --git a/src/topic_reply.h b/src/topic_reply.h
new file mode 100644
index 0000000..090ad88
--- /dev/null
+++ b/src/topic_reply.h
@@ -0,0 +1,52 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  topic_reply.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�06鏃� 14鏃�41鍒�12绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TOPIC_REPLY_3RVYPPWI
+#define TOPIC_REPLY_3RVYPPWI
+
+#include "bh_util.h"
+#include "defs.h"
+#include "msg.h"
+#include "socket.h"
+#include <deque>
+#include <functional>
+
+using bhome::msg::ProcInfo;
+
+class SocketReply : private ShmSocket
+{
+	typedef ShmSocket Socket;
+
+public:
+	SocketReply(Socket::Shm &shm) :
+	    Socket(shm, 64) {}
+	SocketReply() :
+	    SocketReply(BHomeShm()) {}
+	~SocketReply() { Stop(); }
+
+	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+	bool StartWorker(const OnRequest &rcb, int nworker = 2);
+	bool Stop() { return Socket::Stop(); }
+	bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+	bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
+	bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
+	bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
+
+private:
+};
+
+#endif // end of include guard: TOPIC_REPLY_3RVYPPWI
diff --git a/src/reqrep.cpp b/src/topic_request.cpp
similarity index 65%
rename from src/reqrep.cpp
rename to src/topic_request.cpp
index 25c0826..ce7c1a8 100644
--- a/src/reqrep.cpp
+++ b/src/topic_request.cpp
@@ -1,9 +1,9 @@
 /*
  * =====================================================================================
  *
- *       Filename:  reqrep.cpp
+ *       Filename:  topic_request.cpp
  *
- *    Description:  topic request/reply sockets
+ *    Description:  topic request sockets
  *
  *        Version:  1.0
  *        Created:  2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
@@ -15,7 +15,7 @@
  *
  * =====================================================================================
  */
-#include "reqrep.h"
+#include "topic_request.h"
 #include "bh_util.h"
 #include "msg.h"
 #include <chrono>
@@ -40,10 +40,10 @@
 		};
 
 		RecvBHMsgCB cb;
-		if (Find(cb)) {
+		if (Find(cb) && cb) {
 			cb(msg);
-		} else if (msg.type() == kMsgTypeReply) {
-			DataReply reply;
+		} else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
+			MsgRequestTopicReply reply;
 			if (reply.ParseFromString(msg.body())) {
 				rrcb(reply.data());
 			}
@@ -74,8 +74,8 @@
 	auto Call = [&](const void *remote) {
 		const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
 		auto onRecv = [cb](BHMsg &msg) {
-			if (msg.type() == kMsgTypeReply) {
-				DataReply reply;
+			if (msg.type() == kMsgTypeRequestTopicReply) {
+				MsgRequestTopicReply reply;
 				if (reply.ParseFromString(msg.body())) {
 					cb(reply.data());
 				}
@@ -103,16 +103,22 @@
 		if (QueryRPCTopic(topic, addr, timeout_ms)) {
 			const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
 			BHMsg reply;
-			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
-				DataReply dr;
+			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
+				MsgRequestTopicReply dr;
 				if (dr.ParseFromString(reply.body())) {
 					dr.mutable_data()->swap(out);
 					return true;
+				} else {
+					printf("error parse reply.\n");
 				}
+			} else {
+				printf("error recv data. line: %d\n", __LINE__);
 			}
 		} else {
+			printf("error recv data. line: %d\n", __LINE__);
 		}
 	} catch (...) {
+		printf("error recv data. line: %d\n", __LINE__);
 	}
 	return false;
 }
@@ -186,8 +192,8 @@
 	BHMsg result;
 	const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
 	if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeProcQueryTopicReply) {
-			DataProcQueryTopicReply reply;
+		if (result.type() == kMsgTypeQueryTopicReply) {
+			MsgQueryTopicReply reply;
 			if (reply.ParseFromString(result.body())) {
 				addr = reply.address();
 				if (addr.mq_id().empty()) {
@@ -202,79 +208,3 @@
 	}
 	return false;
 }
-
-// reply socket
-namespace
-{
-struct SrcInfo {
-	std::vector<BHAddress> route;
-	std::string msg_id;
-};
-
-} // namespace
-
-bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
-{
-	//TODO check reply?
-	return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
-}
-bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
-{
-	return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
-}
-bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
-{
-	auto onRecv = [this, rcb](BHMsg &msg) {
-		if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
-			DataRequest req;
-			if (req.ParseFromString(msg.body())) {
-				std::string out;
-				if (rcb(req.topic(), req.data(), out)) {
-					BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
-					for (int i = 0; i < msg.route_size() - 1; ++i) {
-						msg.add_route()->Swap(msg.mutable_route(i));
-					}
-					SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
-				}
-			}
-		} else {
-			// ignored, or dropped
-		}
-	};
-
-	return rcb && Start(onRecv, nworker);
-}
-
-bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
-{
-	BHMsg msg;
-	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
-		DataRequest request;
-		if (request.ParseFromString(msg.body())) {
-			request.mutable_topic()->swap(topic);
-			request.mutable_data()->swap(data);
-			SrcInfo *p = new SrcInfo;
-			p->route.assign(msg.route().begin(), msg.route().end());
-			p->msg_id = msg.msg_id();
-			src_info = p;
-			return true;
-		}
-	}
-	return false;
-}
-
-bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
-{
-	SrcInfo *p = static_cast<SrcInfo *>(src_info);
-	DEFER1(delete p);
-	if (!p || p->route.empty()) {
-		return false;
-	}
-
-	BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
-	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
-		msg.add_route()->Swap(&p->route[i]);
-	}
-
-	return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
-}
\ No newline at end of file
diff --git a/src/reqrep.h b/src/topic_request.h
similarity index 76%
rename from src/reqrep.h
rename to src/topic_request.h
index 8a4743c..6765dc2 100644
--- a/src/reqrep.h
+++ b/src/topic_request.h
@@ -1,9 +1,9 @@
 /*
  * =====================================================================================
  *
- *       Filename:  reqrep.h
+ *       Filename:  topic_request.h
  *
- *    Description:  topic request/reply sockets
+ *    Description:  topic request socket
  *
  *        Version:  1.0
  *        Created:  2021骞�04鏈�01鏃� 09鏃�36鍒�06绉�
@@ -15,8 +15,8 @@
  *
  * =====================================================================================
  */
-#ifndef REQREP_ACEH09NK
-#define REQREP_ACEH09NK
+#ifndef TOPIC_REQUEST_ACEH09NK
+#define TOPIC_REQUEST_ACEH09NK
 
 #include "bh_util.h"
 #include "defs.h"
@@ -105,26 +105,4 @@
 	TopicCache topic_cache_;
 };
 
-class SocketReply : private ShmSocket
-{
-	typedef ShmSocket Socket;
-
-public:
-	SocketReply(Socket::Shm &shm) :
-	    Socket(shm, 64) {}
-	SocketReply() :
-	    SocketReply(BHomeShm()) {}
-	~SocketReply() { Stop(); }
-
-	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
-	bool StartWorker(const OnRequest &rcb, int nworker = 2);
-	bool Stop() { return Socket::Stop(); }
-	bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
-	bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
-	bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
-	bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
-
-private:
-};
-
-#endif // end of include guard: REQREP_ACEH09NK
+#endif // end of include guard: TOPIC_REQUEST_ACEH09NK
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index dc64cc0..34f80d8 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -160,7 +160,7 @@
 	auto Server = [&]() {
 		BHMsg req;
 		while (!stop) {
-			if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
+			if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) {
 				auto &mqid = req.route()[0].mq_id();
 				MQId src_id;
 				memcpy(&src_id, mqid.data(), sizeof(src_id));
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 55a08a3..8f2a7f5 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,9 +1,10 @@
 #include "defs.h"
 #include "pubsub.h"
 #include "pubsub_center.h"
-#include "reqrep.h"
 #include "reqrep_center.h"
 #include "socket.h"
+#include "topic_reply.h"
+#include "topic_request.h"
 #include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
@@ -189,24 +190,26 @@
 				printf("count: %d\n", count.load());
 			}
 		};
-		client.StartWorker(onRecv, 1);
+		client.StartWorker(onRecv, 2);
 		boost::timer::auto_cpu_timer timer;
 		for (int i = 0; i < nreq; ++i) {
 			if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
 				printf("client request failed\n");
 			}
+
 			// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
 			// 	printf("client request failed\n");
 			// } else {
 			// 	++count;
 			// }
 		}
-		printf("request %s %d done ", topic.c_str(), nreq);
-		while (count.load() < nreq) {
+		do {
 			std::this_thread::yield();
-		}
+		} while (count.load() < nreq);
 		client.Stop();
+		printf("request %s %d done ", topic.c_str(), count.load());
 	};
+	std::atomic_uint64_t server_msg_count(0);
 	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
 		SocketReply server(shm);
 		ProcInfo info;
@@ -215,7 +218,8 @@
 		if (!server.Register(info, topics, 100)) {
 			printf("register failed\n");
 		}
-		auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
+		auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
+			++server_msg_count;
 			reply = topic + ':' + data;
 			return true;
 		};
@@ -229,9 +233,10 @@
 	servers.Launch(Server, "server", topics);
 	std::this_thread::sleep_for(100ms);
 	for (auto &t : topics) {
-		clients.Launch(Client, t, 1000 * 1000);
+		clients.Launch(Client, t, 1000 * 100);
 	}
 	clients.WaitAll();
+	printf("clients done, server replyed: %d\n", server_msg_count.load());
 	run = false;
 	servers.WaitAll();
 }

--
Gitblit v1.8.0