From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 19 五月 2021 19:14:13 +0800
Subject: [PATCH] add api BHQueryProcs.

---
 utest/api_test.cpp           |   39 ++++++++++--
 api/bhsgo/bhome_node_test.go |   11 +++
 src/bh_api.cc                |   13 ++++
 box/center.cpp               |   43 ++++++++++++++
 src/proto.h                  |    2 
 src/msg.h                    |   16 ++++-
 api/bhsgo/bhome_node.go      |   14 ++++
 proto/source/bhome_msg.proto |    2 
 src/topic_node.h             |    1 
 src/bh_api.h                 |    8 ++
 src/exported_symbols         |    1 
 src/topic_node.cpp           |   19 ++++++
 12 files changed, 156 insertions(+), 13 deletions(-)

diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go
index 2cf3a81..9d66814 100644
--- a/api/bhsgo/bhome_node.go
+++ b/api/bhsgo/bhome_node.go
@@ -75,6 +75,20 @@
 
 }
 
+func QueryProcs(dest_addr *bh.BHAddress, topic *bh.MsgQueryProc, reply *bh.MsgQueryProcReply, timeout_ms int) bool {
+	dest, _ := dest_addr.Marshal()
+	data, _ := topic.Marshal()
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+	r := C.BHQueryProcs(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
+	if r {
+		reply.Unmarshal(C.GoBytes(creply, creply_len))
+	}
+	return r
+
+}
+
 func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
 	data, _ := pub.Marshal()
 	return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go
index 756b752..cd4bfb9 100644
--- a/api/bhsgo/bhome_node_test.go
+++ b/api/bhsgo/bhome_node_test.go
@@ -26,6 +26,7 @@
 	proc_id := "test_proc"
 	proc := bh.ProcInfo{}
 	proc.ProcId = []byte(proc_id)
+	fmt.Println("proc id: ", proc.ProcId)
 	reply := bh.MsgCommonReply{}
 	defer Cleanup()
 
@@ -69,6 +70,7 @@
 	} else {
 		fmt.Println("reg topics failed")
 	}
+
 	req := bh.MsgRequestTopic{}
 	time.Sleep(time.Second * 1)
 	req.Topic = []byte("topic0")
@@ -81,7 +83,14 @@
 	pid := ""
 	rr := bh.MsgRequestTopicReply{}
 	dest := bh.BHAddress{}
-	for i := 0; i < 100; i++ {
+
+	queryProc := bh.MsgQueryProc{}
+	queryReply := bh.MsgQueryProcReply{}
+
+	QueryProcs(&dest, &queryProc, &queryReply, 3000)
+	fmt.Println("query result:", string(queryReply.ProcList[0].Proc.ProcId))
+
+	for i := 0; i < 10; i++ {
 		if Request(&dest, &req, &pid, &rr, 3000) {
 			fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
 		} else {
diff --git a/box/center.cpp b/box/center.cpp
index 9ecd04b..f140289 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -479,12 +479,52 @@
 			return MakeReply(eSuccess);
 		});
 	}
+	MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+	{
+		typedef MsgQueryProcReply Reply;
+		auto query = [&](Node self) -> Reply {
+			auto Add1 = [](Reply &reply, Node node) {
+				auto info = reply.add_proc_list();
+				*info->mutable_proc() = node->proc_;
+				info->set_online(node->state_.flag_ == kStateNormal);
+				for (auto &addr_topics : node->services_) {
+					for (auto &topic : addr_topics.second) {
+						info->mutable_topics()->add_topic_list(topic);
+					}
+				}
+			};
+
+			if (!req.proc_id().empty()) {
+				auto pos = online_node_addr_map_.find(req.proc_id());
+				if (pos == online_node_addr_map_.end()) {
+					return MakeReply<Reply>(eNotFound, "proc not found.");
+				} else {
+					auto node_pos = nodes_.find(pos->second);
+					if (node_pos == nodes_.end()) {
+						return MakeReply<Reply>(eNotFound, "proc node not found.");
+					} else {
+						auto reply = MakeReply<Reply>(eSuccess);
+						Add1(reply, node_pos->second);
+						return reply;
+					}
+				}
+			} else {
+				Reply reply(MakeReply<Reply>(eSuccess));
+				for (auto &kv : nodes_) {
+					Add1(reply, kv.second);
+				}
+				return reply;
+			}
+		};
+
+		return HandleMsg<Reply>(head, query);
+	}
 
 	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
 	{
 		typedef MsgQueryTopicReply Reply;
 
-		auto query = [&](Node self) -> MsgQueryTopicReply {
+		auto query = [&](Node self) -> Reply {
 			auto pos = service_map_.find(req.topic());
 			if (pos != service_map_.end() && !pos->second.empty()) {
 				auto &clients = pos->second;
@@ -747,6 +787,7 @@
 
 			CASE_ON_MSG_TYPE(RegisterRPC);
 			CASE_ON_MSG_TYPE(QueryTopic);
+			CASE_ON_MSG_TYPE(QueryProc);
 		default: return false;
 		}
 	};
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index d06a7c2..42cf375 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -49,6 +49,8 @@
 	// kMsgTypeUnsubscribeReply = 25;
 	kMsgTypeUnregister = 26;
 	// kMsgTypeUnregisterReply = 27;
+	kMsgTypeQueryProc = 28;
+	kMsgTypeQueryProcReply = 29;
 
 }
 
diff --git a/src/bh_api.cc b/src/bh_api.cc
index b37eaae..ca69b6e 100644
--- a/src/bh_api.cc
+++ b/src/bh_api.cc
@@ -194,6 +194,19 @@
 	    remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
 }
 
+int BHQueryProcs(const void *remote,
+                 const int remote_len,
+                 const void *query,
+                 const int query_len,
+                 void **reply,
+                 int *reply_len,
+                 const int timeout_ms)
+{
+	return BHApi_In2_Out1<BHAddress, MsgQueryProc, MsgQueryProcReply>(
+	    &TopicNode::QueryProcs,
+	    remote, remote_len, query, query_len, reply, reply_len, timeout_ms);
+}
+
 int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
 	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
diff --git a/src/bh_api.h b/src/bh_api.h
index e196aa6..3b77da5 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -44,6 +44,14 @@
                         int *reply_len,
                         const int timeout_ms);
 
+int BHQueryProcs(const void *remote,
+                 const int remote_len,
+                 const void *query,
+                 const int query_len,
+                 void **reply,
+                 int *reply_len,
+                 const int timeout_ms);
+
 int BHSubscribeTopics(const void *topics,
                       const int topics_len,
                       void **reply,
diff --git a/src/exported_symbols b/src/exported_symbols
index 3351da9..addfadc 100644
--- a/src/exported_symbols
+++ b/src/exported_symbols
@@ -5,6 +5,7 @@
 	BHUnregister;
 	BHRegisterTopics;
 	BHQueryTopicAddress;
+	BHQueryProcs;
 	BHSubscribeTopics;
 	BHStartWorker;
 	BHHeartbeatEasy;
diff --git a/src/msg.h b/src/msg.h
index 42a753e..1ac153a 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -74,12 +74,13 @@
 
 		RefCount count_;
 		const uint32_t tag_ = kMsgTag;
-		const uint32_t size_ = 0;
+		const uint32_t capacity_ = 0;
 		const int64_t id_ = 0;
 		std::atomic<int64_t> timestamp_;
 		bool managed_ = false;
+		uint32_t size_ = 0;
 		Meta(uint32_t size) :
-		    size_(size), id_(NewId()), timestamp_(NowSec()) {}
+		    capacity_(size), id_(NewId()), timestamp_(NowSec()) {}
 	};
 	OffsetType offset_;
 	static void *Alloc(const size_t size)
@@ -111,6 +112,7 @@
 			};
 			Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
 			Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
+			meta()->size_ = 4 + head_len + 4 + body_len;
 		}
 		return addr;
 	}
@@ -120,6 +122,7 @@
 		void *addr = get();
 		if (addr) {
 			memcpy(addr, content.data(), content.size());
+			meta()->size_ = content.size();
 		}
 		return addr;
 	}
@@ -174,11 +177,11 @@
 		uint32_t head_len = head.ByteSizeLong();
 		uint32_t body_len = body.ByteSizeLong();
 		uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
-		return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len);
+		return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
 	}
 
 	inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
-	inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); }
+	inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); }
 
 	inline bool Make(const size_t size) { return Make(Alloc(size)); }
 
@@ -209,6 +212,11 @@
 		p += 4;
 		return head.ParseFromArray(p, msg_size);
 	}
+	std::string content() const
+	{
+		auto p = get<char>();
+		return p ? std::string(p, meta()->size_) : std::string();
+	}
 	std::string body() const
 	{
 		auto p = get<char>();
diff --git a/src/proto.h b/src/proto.h
index c05407b..29ff290 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -50,6 +50,8 @@
 BHOME_SIMPLE_MAP_MSG(Unsubscribe);
 BHOME_SIMPLE_MAP_MSG(ProcInit);
 BHOME_SIMPLE_MAP_MSG(ProcInitReply);
+BHOME_SIMPLE_MAP_MSG(QueryProc);
+BHOME_SIMPLE_MAP_MSG(QueryProcReply);
 
 #undef BHOME_SIMPLE_MAP_MSG
 #undef BHOME_MAP_MSG_AND_TYPE
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index a3f3428..8bbb929 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -292,6 +292,25 @@
 	        reply.ParseBody(reply_body));
 }
 
+bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms)
+{
+	if (!IsOnline()) {
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
+		return false;
+	}
+	auto &sock = SockNode();
+
+	BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
+	AddRoute(head, sock);
+
+	MsgI reply;
+	DEFER1(reply.Release());
+	BHMsgHead reply_head;
+	return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
+	        reply_head.type() == kMsgTypeQueryProcReply &&
+	        reply.ParseBody(reply_body));
+}
+
 bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
diff --git a/src/topic_node.h b/src/topic_node.h
index 81bf718..1bb3611 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -46,6 +46,7 @@
 	bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(const int timeout_ms);
 	bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
+	bool QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms);
 
 	// topic rpc server
 	typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 533c399..33adf91 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -165,7 +165,34 @@
 		void *reply = 0;
 		int reply_len = 0;
 		bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
-		BHFree(reply, reply_len);
+		DEFER1(BHFree(reply, reply_len));
+		// printf("register topic : %s\n", r ? "ok" : "failed");
+		// Sleep(1s);
+	}
+	{
+		// query procs
+		std::string dest(BHAddress().SerializeAsString());
+		MsgQueryProc query;
+		std::string s = query.SerializeAsString();
+		void *reply = 0;
+		int reply_len = 0;
+		bool r = BHQueryProcs(dest.data(), dest.size(), s.data(), s.size(), &reply, &reply_len, 1000);
+		DEFER1(BHFree(reply, reply_len));
+		MsgQueryProcReply result;
+		if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) {
+			printf("query proc result: %d\n", result.proc_list().size());
+			for (int i = 0; i < result.proc_list().size(); ++i) {
+				auto &info = result.proc_list(i);
+				printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
+				       (info.online() ? "online" : "offline"),
+				       info.proc().proc_id().c_str(), info.proc().name().c_str());
+				for (auto &t : info.topics().topic_list()) {
+					printf("\t\t %s", t.c_str());
+				}
+			}
+		} else {
+			printf("query proc error\n");
+		}
 		// printf("register topic : %s\n", r ? "ok" : "failed");
 		// Sleep(1s);
 	}
@@ -310,15 +337,13 @@
 	int ncli = 10;
 	const int64_t nreq = 1000 * 100;
 
-#if 1
-	for (int i = 0; i < ncli; ++i) {
-		threads.Launch(asyncRequest, nreq);
-	}
-#else
 	for (int i = 0; i < 100; ++i) {
 		SyncRequest(i);
 	}
-#endif
+
+	for (int i = 0; i < ncli; ++i) {
+		threads.Launch(asyncRequest, nreq);
+	}
 
 	int same = 0;
 	uint64_t last = 0;

--
Gitblit v1.8.0