From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 14:15:41 +0800
Subject: [PATCH] join pub/sub to node; refactor.

---
 src/socket.h                 |   63 +++++--
 src/proto.h                  |    2 
 src/msg.h                    |   26 +--
 src/socket.cpp               |   48 ++---
 proto/source/bhome_msg.proto |   22 ++
 utest/utest.cpp              |   45 ++---
 src/topic_node.cpp           |  121 +++++++++++++-
 src/center.cpp               |   14 +
 src/msg.cpp                  |    8 +
 utest/speed_test.cpp         |   18 +-
 /dev/null                    |   31 ---
 utest/util.h                 |   23 ++
 src/shm_queue.h              |   21 --
 src/topic_node.h             |   22 ++
 14 files changed, 277 insertions(+), 187 deletions(-)

diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index b06b692..5056a26 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -3,6 +3,7 @@
 
 // import "google/protobuf/descriptor.proto";
 import "bhome_msg_api.proto";
+import "error_msg.proto";
 
 package bhome.msg;
 
@@ -18,12 +19,21 @@
 	bytes topic = 6; // for request route
 }
 
-message BHMsg { // deprecated
-	bytes msg_id = 1;
-	int64 timestamp = 2;
-	int32 type = 3;
-	repeated BHAddress route = 4; // for reply and proxy.
-	bytes body = 5;
+message MsgRequest {
+	MsgType type = 1;
+	// oneof body;
+}
+
+message MsgReply {
+	ErrorMsg err_msg = 1;
+	// oneof reply
+}
+
+message BHMsgBody {
+	oneof reqrep {
+		MsgRequest request = 1;
+		MsgReply reply = 2;
+	}
 }
 
 enum MsgType {
diff --git a/src/center.cpp b/src/center.cpp
index fe549b7..71c85c3 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -336,7 +336,7 @@
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
-			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
+			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
 			if (!r) {
 				printf("send reply failed.\n");
 			}
@@ -364,18 +364,20 @@
 			MsgPublish pub;
 			NodeCenter::Clients clients;
 			MsgCommonReply reply;
-			MsgI pubmsg;
 			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
 				return;
 			} else if (!center->FindClients(head, pub, clients, reply)) {
-				// send error reply.
 				MakeReplyer(socket, head, center->id())(reply);
-			} else if (pubmsg.MakeRC(socket.shm(), msg)) {
-				DEFER1(pubmsg.Release(socket.shm()));
+			} else {
+				MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+				if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+
 				for (auto &cli : clients) {
 					auto node = cli.weak_node_.lock();
 					if (node) {
-						socket.Send(cli.mq_.data(), pubmsg, 10);
+						if (!socket.Send(cli.mq_.data(), msg, 100)) {
+							printf("center route publish failed. need resend.\n");
+						}
 					}
 				}
 			}
diff --git a/src/msg.cpp b/src/msg.cpp
index c353d84..06b817e 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -78,6 +78,14 @@
 	return true;
 }
 
+bool MsgI::EnableRefCount(SharedMemory &shm)
+{
+	if (!IsCounted()) {
+		count_ = shm.New<RefCount>();
+	}
+	return IsCounted();
+}
+
 int MsgI::Release(SharedMemory &shm)
 {
 	if (IsCounted()) {
diff --git a/src/msg.h b/src/msg.h
index 661d989..10ad0d2 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -105,27 +105,21 @@
 	bool IsCounted() const { return static_cast<bool>(count_); }
 
 	template <class Body>
-	bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
-	{
-		return Make(shm, Pack(shm, head, body));
-	}
-	template <class Body>
-	bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+	inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
 	{
 		return MakeRC(shm, Pack(shm, head, body));
 	}
-	bool MakeRC(SharedMemory &shm, MsgI &a)
+
+	bool EnableRefCount(SharedMemory &shm);
+
+	template <class Body>
+	inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
 	{
-		if (a.IsCounted()) {
-			*this = a;
-			AddRef();
-			return true;
-		} else {
-			void *p = a.ptr_.get();
-			a.ptr_ = 0;
-			return MakeRC(shm, p);
-		}
+		void *p = Pack(shm, head, body);
+		auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
+		return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
 	}
+
 	bool ParseHead(BHMsgHead &head) const;
 	template <class Body>
 	bool ParseBody(Body &body) const
diff --git a/src/proto.h b/src/proto.h
index 2057711..da3bde6 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -74,5 +74,5 @@
 BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
 BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
 // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
-
+inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
 #endif // end of include guard: PROTO_UA9UWKL1
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
deleted file mode 100644
index 471c63c..0000000
--- a/src/pubsub.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  pubsub.cpp
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�03鏈�24鏃� 18鏃�44鍒�13绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#include "pubsub.h"
-#include "bh_util.h"
-#include "defs.h"
-
-using namespace std::chrono_literals;
-using namespace bhome_msg;
-
-bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
-{
-	try {
-		MsgPublish pub;
-		pub.set_topic(topic);
-		pub.set_data(data, size);
-		BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
-		MsgI imsg;
-		if (imsg.MakeRC(shm(), head, pub)) {
-			DEFER1(imsg.Release(shm()));
-			return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
-		}
-	} catch (...) {
-	}
-	return false;
-}
-namespace
-{
-inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
-
-} // namespace
-bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
-{
-	try {
-		MsgSubscribe sub;
-		for (auto &topic : topics) {
-			sub.add_topics(topic);
-		}
-		BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
-		AddRoute(head, mq().Id());
-
-		return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
-	} catch (...) {
-		return false;
-	}
-}
-
-bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
-{
-	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
-		if (head.type() == kMsgTypePublish) {
-			MsgPublish pub;
-			if (imsg.ParseBody(pub)) {
-				tdcb(head.proc_id(), pub.topic(), pub.data());
-			}
-		} else {
-			// ignored, or dropped
-		}
-	};
-
-	return tdcb && Start(AsyncRecvProc, nworker);
-}
-
-bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
-{
-	MsgI msg;
-	BHMsgHead head;
-	if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
-		MsgPublish pub;
-		if (msg.ParseBody(pub)) {
-			head.mutable_proc_id()->swap(proc_id);
-			pub.mutable_topic()->swap(topic);
-			pub.mutable_data()->swap(data);
-			return true;
-		}
-	}
-	return false;
-}
\ No newline at end of file
diff --git a/src/pubsub.h b/src/pubsub.h
deleted file mode 100644
index bd60fcd..0000000
--- a/src/pubsub.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  pubsub.h
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�03鏈�24鏃� 18鏃�44鍒�36绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#ifndef PUBSUB_4KGRA997
-#define PUBSUB_4KGRA997
-
-#include "defs.h"
-#include "socket.h"
-#include <string>
-
-class SocketPublish
-{
-	typedef ShmSocket Socket;
-	Socket::Shm &shm_;
-	Socket::Shm &shm() { return shm_; }
-
-public:
-	SocketPublish(Socket::Shm &shm) :
-	    shm_(shm) {}
-	SocketPublish() :
-	    SocketPublish(BHomeShm()) {}
-	bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms);
-};
-
-// socket subscribe
-class SocketSubscribe : private ShmSocket
-{
-	typedef ShmSocket Socket;
-
-public:
-	SocketSubscribe(Socket::Shm &shm) :
-	    Socket(shm, 64) {}
-	SocketSubscribe() :
-	    SocketSubscribe(BHomeShm()) {}
-	~SocketSubscribe() { Stop(); }
-
-	typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
-	bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
-	bool Stop() { return Socket::Stop(); }
-	bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms);
-	bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
-};
-
-#endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 32ccfae..88c13ec 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -136,25 +136,8 @@
 	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
 	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
 
-	template <class... Extra>
-	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra)
-	{
-		return Send(shm(), remote_id, msg, timeout_ms, extra...);
-	}
-	template <class Body, class... Extra>
-	bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra)
-	{
-		MsgI msg;
-		if (msg.Make(shm(), head, body)) {
-			if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
-				return true;
-			} else {
-				msg.Release(shm());
-			}
-		}
-		return false;
-	}
-
+	template <class... Rest>
+	bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
 	size_t Pending() const { return data()->size(); }
 };
 
diff --git a/src/socket.cpp b/src/socket.cpp
index f2b29f4..116175d 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -43,51 +43,37 @@
 
 bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
 {
-	auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
-		auto Find = [&](RecvCB &cb) {
-			std::lock_guard<std::mutex> lock(mutex());
-			const std::string &msgid = head.msg_id();
-			auto pos = async_cbs_.find(msgid);
-			if (pos != async_cbs_.end()) {
-				cb.swap(pos->second);
-				async_cbs_.erase(pos);
-				return true;
-			} else {
-				return false;
-			}
-		};
-
+	auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
 		RecvCB cb;
-		if (Find(cb)) {
+		if (async_cbs_->Find(head.msg_id(), cb)) {
 			cb(socket, imsg, head);
 		} else if (onData) {
 			onData(socket, imsg, head);
 		} // else ignored, or dropped
 	};
 
-	std::lock_guard<std::mutex> lock(mutex_);
-	StopNoLock();
-	auto RecvProc = [this, onRecv, onIdle]() {
-		while (run_) {
-			try {
-				MsgI imsg;
-				if (mq().Recv(imsg, 10)) {
-					DEFER1(imsg.Release(shm()));
-					BHMsgHead head;
-					if (imsg.ParseHead(head)) {
-						onRecv(*this, imsg, head);
-					}
-				} else if (onIdle) {
-					onIdle(*this);
+	auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
+		try {
+			MsgI imsg;
+			if (mq().Recv(imsg, 10)) {
+				DEFER1(imsg.Release(shm()));
+				BHMsgHead head;
+				if (imsg.ParseHead(head)) {
+					onRecvWithPerMsgCB(*this, imsg, head);
 				}
-			} catch (...) {
+			} else if (onIdle) {
+				onIdle(*this);
 			}
+		} catch (...) {
 		}
 	};
 
+	std::lock_guard<std::mutex> lock(mutex_);
+	StopNoLock();
+
 	run_.store(true);
 	for (int i = 0; i < nworker; ++i) {
-		workers_.emplace_back(RecvProc);
+		workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
 	}
 	return true;
 }
diff --git a/src/socket.h b/src/socket.h
index 7c4f83f..f73bee5 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -19,6 +19,7 @@
 #ifndef SOCKET_GWTJHBPO
 #define SOCKET_GWTJHBPO
 
+#include "bh_util.h"
 #include "defs.h"
 #include "shm_queue.h"
 #include <atomic>
@@ -34,6 +35,15 @@
 
 class ShmSocket : private boost::noncopyable
 {
+	template <class DoSend>
+	inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend)
+	{
+		bool r = false;
+		DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); });
+		r = doSend(msg);
+		return r;
+	}
+
 protected:
 	typedef bhome_shm::ShmMsgQueue Queue;
 
@@ -55,30 +65,28 @@
 	bool Stop();
 	size_t Pending() const { return mq().Pending(); }
 
-	bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
+	bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms)
 	{
-		return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
+		assert(valid_remote);
+		return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms);
 	}
 	//TODO reimplment, using async.
 	bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
+	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
 	{
-		assert(valid_remote);
-		try {
-			if (cb) {
-				auto RegisterCB = [&]() {
-					std::lock_guard<std::mutex> lock(mutex());
-					async_cbs_.emplace(head.msg_id(), cb);
-				};
-				return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
-			} else {
-				return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
-			}
-		} catch (...) {
-			return false;
-		}
+		auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
+		MsgI msg;
+		return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
+	}
+
+	template <class Body>
+	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms)
+	{
+		auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); };
+		MsgI msg;
+		return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
 	}
 
 	template <class Body>
@@ -133,7 +141,26 @@
 	std::atomic<bool> run_;
 
 	Queue mq_;
-	std::unordered_map<std::string, RecvCB> async_cbs_;
+	class AsyncCBs
+	{
+		std::unordered_map<std::string, RecvCB> store_;
+
+	public:
+		bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
+		bool Find(const std::string &id, RecvCB &cb)
+		{
+			auto pos = store_.find(id);
+			if (pos != store_.end()) {
+				cb.swap(pos->second);
+				store_.erase(pos);
+				return true;
+			} else {
+				return false;
+			}
+		}
+	};
+
+	Synced<AsyncCBs> async_cbs_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index c6c9771..d76c03a 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -76,29 +76,34 @@
     shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
 {
 	SockNode().Start();
+	SockClient().Start();
+	SockServer().Start();
 }
+
 TopicNode::~TopicNode()
 {
 	StopAll();
-	SockNode().Stop();
 }
+
 void TopicNode::StopAll()
 {
-	ServerStop();
-	ClientStopWorker();
+	SockServer().Stop();
+	SockClient().Stop();
+	SockNode().Stop();
 }
 
 bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	auto &sock = SockNode();
+
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
-	AddRoute(head, SockNode().id());
+	AddRoute(head, sock.id());
 
 	MsgI reply;
 	DEFER1(reply.Release(shm_););
 	BHMsgHead reply_head;
-	bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
-	r = r && reply_head.type() == kMsgTypeCommonReply;
-	r = r && reply.ParseBody(reply_body);
+	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+	r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
 	if (r) {
 		info_ = body;
 	}
@@ -108,14 +113,15 @@
 bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	//TODO check registered
+	auto &sock = SockServer();
 
 	auto head(InitMsgHead(GetType(body), proc_id()));
-	AddRoute(head, SockReply().id());
+	AddRoute(head, sock.id());
 
 	MsgI reply;
 	DEFER1(reply.Release(shm_););
 	BHMsgHead reply_head;
-	bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+	bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
 	r = r && reply_head.type() == kMsgTypeCommonReply;
 	r = r && reply.ParseBody(reply_body);
 	return r;
@@ -154,15 +160,17 @@
 		onIdle(sock);
 	};
 
-	return rcb && SockReply().Start(onRecv, onIdle, nworker);
+	auto &sock = SockServer();
+	return rcb && sock.Start(onRecv, onIdle, nworker);
 }
-bool TopicNode::ServerStop() { return SockReply().Stop(); }
 
 bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
 {
+	auto &sock = SockServer();
+
 	MsgI imsg;
 	BHMsgHead head;
-	if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
+	if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
 		MsgRequestTopic request;
 		if (imsg.ParseBody(request)) {
 			request.mutable_topic()->swap(topic);
@@ -179,6 +187,8 @@
 
 bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
 {
+	auto &sock = SockServer();
+
 	SrcInfo *p = static_cast<SrcInfo *>(src_info);
 	DEFER1(delete p);
 	if (!p || p->route.empty()) {
@@ -192,7 +202,7 @@
 		head.add_route()->Swap(&p->route[i]);
 	}
 
-	return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms);
+	return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
 }
 
 bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -211,12 +221,12 @@
 
 	return SockRequest().Start(onData, nworker);
 }
-bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); }
 
 bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
 {
 	auto Call = [&](const void *remote) {
 		auto &sock = SockRequest();
+
 		MsgRequestTopic req;
 		req.set_topic(topic);
 		req.set_data(data, size);
@@ -254,6 +264,7 @@
 {
 	try {
 		auto &sock = SockRequest();
+
 		BHAddress addr;
 		if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
 
@@ -290,6 +301,7 @@
 bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
 {
 	auto &sock = SockRequest();
+
 	if (topic_query_cache_.Find(topic, addr)) {
 		return true;
 	}
@@ -319,4 +331,85 @@
 	} else {
 	}
 	return false;
+}
+
+// publish
+
+bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+{
+	try {
+		auto &sock = SockPub();
+
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data, size);
+		BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
+		AddRoute(head, sock.id());
+
+		MsgI reply;
+		DEFER1(reply.Release(shm()););
+		BHMsgHead reply_head;
+		MsgCommonReply reply_body;
+		return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
+		       reply_head.type() == kMsgTypeCommonReply &&
+		       reply.ParseBody(reply_body) &&
+		       IsSuccess(reply_body.errmsg().errcode());
+	} catch (...) {
+	}
+	return false;
+}
+
+// subscribe
+
+bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
+{
+	try {
+		auto &sock = SockSub();
+		MsgSubscribe sub;
+		for (auto &topic : topics) {
+			sub.add_topics(topic);
+		}
+		BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
+		AddRoute(head, sock.id());
+
+		return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms);
+	} catch (...) {
+		return false;
+	}
+}
+
+bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker)
+{
+	auto &sock = SockSub();
+
+	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypePublish) {
+			MsgPublish pub;
+			if (imsg.ParseBody(pub)) {
+				tdcb(head.proc_id(), pub.topic(), pub.data());
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
+
+	return tdcb && sock.Start(AsyncRecvProc, nworker);
+}
+
+bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
+{
+	auto &sock = SockSub();
+	MsgI msg;
+	DEFER1(msg.Release(shm()););
+	BHMsgHead head;
+	if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+		MsgPublish pub;
+		if (msg.ParseBody(pub)) {
+			head.mutable_proc_id()->swap(proc_id);
+			pub.mutable_topic()->swap(topic);
+			pub.mutable_data()->swap(data);
+			return true;
+		}
+	}
+	return false;
 }
\ No newline at end of file
diff --git a/src/topic_node.h b/src/topic_node.h
index 8852af1..34fe2ee 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -19,7 +19,6 @@
 #define TOPIC_NODE_YVKWA6TF
 
 #include "msg.h"
-#include "pubsub.h"
 #include "socket.h"
 #include <memory>
 
@@ -32,23 +31,26 @@
 	SharedMemory &shm_;
 	MsgRegister info_;
 
+	SharedMemory &shm() { return shm_; }
+
 public:
 	TopicNode(SharedMemory &shm);
 	~TopicNode();
+
+	void StopAll();
+	// topic node
 	bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
 	bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
 
 	// topic rpc server
 	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
 	bool ServerStart(OnRequest const &cb, const int nworker = 2);
-	bool ServerStop();
 	bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
 	bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
 
 	// topic client
 	typedef std::function<void(const std::string &data)> RequestResultCB;
 	bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
-	bool ClientStopWorker();
 	bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
 	bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
 	{
@@ -60,7 +62,14 @@
 		return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
 	}
 
-	void StopAll();
+	// publish
+	bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+
+	// subscribe
+	typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
+	bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
+	bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+	bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
 
 private:
 	bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
@@ -106,14 +115,17 @@
 	// some sockets may be the same one, using functions make it easy to change.
 
 	auto &SockNode() { return sock_node_; }
+	auto &SockPub() { return SockNode(); }
 	auto &SockSub() { return sock_sub_; }
 	auto &SockRequest() { return sock_request_; }
+	auto &SockClient() { return SockRequest(); }
 	auto &SockReply() { return sock_reply_; }
+	auto &SockServer() { return SockReply(); }
 
 	ShmSocket sock_node_;
 	ShmSocket sock_request_;
 	ShmSocket sock_reply_;
-	SocketSubscribe sock_sub_;
+	ShmSocket sock_sub_;
 
 	TopicQueryCache topic_query_cache_;
 };
diff --git a/src/topic_rpc.cpp b/src/topic_rpc.cpp
deleted file mode 100644
index 065a861..0000000
--- a/src/topic_rpc.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  topic_rpc.cpp
- *
- *    Description:  topic request/reply manager
- *
- *        Version:  1.0
- *        Created:  2021骞�03鏈�31鏃� 16鏃�29鍒�31绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  YOUR NAME (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#include "topic_rpc.h"
-
-
-
diff --git a/src/topic_rpc.h b/src/topic_rpc.h
deleted file mode 100644
index 40ff985..0000000
--- a/src/topic_rpc.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  topic_rpc.h
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�03鏈�31鏃� 16鏃�30鍒�10绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  YOUR NAME (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#ifndef TOPIC_RPC_JU1AYN5L
-#define TOPIC_RPC_JU1AYN5L
-
-#include "socket.h"
-
-// request/reply topic manager
-class RPCManager
-{
-	ShmSocket socket_;
-
-public:
-};
-
-#endif // end of include guard: TOPIC_RPC_JU1AYN5L
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index d777f91..b1f11ac 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -40,6 +40,7 @@
 		body.set_data(str);
 		auto head(InitMsgHead(GetType(body), proc_id));
 		msg.MakeRC(shm, head, body);
+		assert(msg.IsCounted());
 		DEFER1(msg.Release(shm););
 
 		for (uint64_t i = 0; i < n; ++i) {
@@ -127,8 +128,8 @@
 	SharedMemory shm(shm_name, 1024 * 1024 * 50);
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
-	ShmMsgQueue srv(shm, qlen);
-	ShmMsgQueue cli(shm, qlen);
+	ShmSocket srv(shm, qlen);
+	ShmSocket cli(shm, qlen);
 
 	MsgI request_rc;
 	MsgRequestTopic req_body;
@@ -156,9 +157,9 @@
 				req_body.set_topic("topic");
 				req_body.set_data(msg_content);
 				auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
-				return cli.Send(srv.Id(), req_head, req_body, 100);
+				return cli.Send(&srv.id(), req_head, req_body, 100);
 			};
-			auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
+			auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); };
 
 			if (!ReqRC()) {
 				printf("********** client send error.\n");
@@ -166,7 +167,7 @@
 			}
 			MsgI msg;
 			BHMsgHead head;
-			if (!cli.Recv(msg, 1000)) {
+			if (!cli.SyncRecv(msg, head, 1000)) {
 				printf("********** client recv error.\n");
 			} else {
 				DEFER1(msg.Release(shm));
@@ -187,8 +188,9 @@
 		BHMsgHead req_head;
 
 		while (!stop) {
-			if (srv.Recv(req, 100)) {
+			if (srv.SyncRecv(req, req_head, 100)) {
 				DEFER1(req.Release(shm));
+
 				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
 					auto &mqid = req_head.route()[0].mq_id();
 					MQId src_id;
@@ -198,9 +200,9 @@
 						reply_body.set_topic("topic");
 						reply_body.set_data(msg_content);
 						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
-						return srv.Send(src_id, reply_head, reply_body, 100);
+						return srv.Send(&src_id, reply_head, reply_body, 100);
 					};
-					auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
+					auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); };
 
 					if (ReplyRC()) {
 					}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index c925e22..f88eab9 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,8 +1,5 @@
 #include "center.h"
 #include "defs.h"
-#include "pubsub.h"
-#include "socket.h"
-#include "topic_node.h"
 #include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
@@ -92,8 +89,12 @@
 	const uint64_t nmsg = 100 * 2;
 	const int timeout = 1000;
 	auto Sub = [&](int id, const std::vector<std::string> &topics) {
-		SocketSubscribe client(shm);
-		bool r = client.Subscribe(sub_proc_id, topics, timeout);
+		DemoNode client("client_" + std::to_string(id), shm);
+
+		bool r = client.Subscribe(topics, timeout);
+		if (!r) {
+			printf("client subscribe failed.\n");
+		}
 		std::mutex mutex;
 		std::condition_variable cv;
 
@@ -112,18 +113,19 @@
 			}
 			// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
 		};
-		client.StartRecv(OnTopicData, 1);
+		client.SubscribeStartWorker(OnTopicData, 1);
 
 		std::unique_lock<std::mutex> lk(mutex);
 		cv.wait(lk);
 	};
 
 	auto Pub = [&](const std::string &topic) {
-		SocketPublish provider(shm);
+		DemoNode provider("server_" + topic, shm);
+
 		for (unsigned i = 0; i < nmsg; ++i) {
 			std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-			bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
+			bool r = provider.Publish(topic, data.data(), data.size(), timeout);
 			if (!r) {
 				printf("pub ret: %s\n", r ? "ok" : "fail");
 			}
@@ -184,15 +186,7 @@
 	std::atomic<bool> run(true);
 
 	auto Client = [&](const std::string &topic, const int nreq) {
-		TopicNode client(shm);
-		MsgRegister reg;
-		reg.mutable_proc()->set_proc_id(client_proc_id + topic);
-		MsgCommonReply reply_body;
-
-		if (!client.Register(reg, reply_body, 1000)) {
-			printf("client register failed\n");
-			return;
-		}
+		DemoNode client(client_proc_id + topic, shm);
 
 		std::atomic<int> count(0);
 		std::string reply;
@@ -218,21 +212,13 @@
 		do {
 			std::this_thread::yield();
 		} while (count.load() < nreq);
-		client.ClientStopWorker();
+		client.StopAll();
 		printf("request %s %d done ", topic.c_str(), count.load());
 	};
+
 	std::atomic_uint64_t server_msg_count(0);
 	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
-		TopicNode server(shm);
-		MsgRegister reg;
-		reg.mutable_proc()->set_proc_id(server_proc_id);
-		reg.mutable_proc()->set_name(name);
-		MsgCommonReply reply_body;
-
-		if (!server.Register(reg, reply_body, 100)) {
-			printf("server register failed\n");
-			return;
-		}
+		DemoNode server(name, shm);
 
 		auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
 			++server_msg_count;
@@ -245,6 +231,7 @@
 		for (auto &topic : topics) {
 			rpc.add_topics(topic);
 		}
+		MsgCommonReply reply_body;
 		if (!server.RegisterRPC(rpc, reply_body, 100)) {
 			printf("server register topic failed\n");
 			return;
@@ -262,7 +249,7 @@
 		clients.Launch(Client, t, 1000 * 1);
 	}
 	clients.WaitAll();
-	printf("clients done, server replyed: %d\n", server_msg_count.load());
+	printf("clients done, server replyed: %ld\n", server_msg_count.load());
 	run = false;
 	servers.WaitAll();
 }
diff --git a/utest/util.h b/utest/util.h
index ca58cd7..28b636e 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -20,9 +20,7 @@
 #define UTIL_W8A0OA5U
 
 #include "bh_util.h"
-#include "msg.h"
-#include "shm.h"
-#include "shm_queue.h"
+#include "topic_node.h"
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/test/unit_test.hpp>
@@ -107,4 +105,23 @@
 	~ShmRemover() { SharedMemory::Remove(name_); }
 };
 
+class DemoNode : public TopicNode
+{
+	std::string id_;
+
+public:
+	DemoNode(const std::string &id, SharedMemory &shm) :
+	    TopicNode(shm), id_(id) { Init(); }
+	void Init()
+	{
+		MsgRegister reg;
+		reg.mutable_proc()->set_proc_id(id_);
+		MsgCommonReply reply_body;
+
+		if (!Register(reg, reply_body, 1000)) {
+			printf("node %s register failed\n", id_.c_str());
+		}
+	}
+};
+
 #endif // end of include guard: UTIL_W8A0OA5U

--
Gitblit v1.8.0