From 11f6c600e55ca5677f93624efe44d2605cdd908d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 20:18:38 +0800
Subject: [PATCH] reserve #,@ prefix for internal proc id and topic.

---
 utest/api_test.cpp        |   19 ++++++++-
 box/center_topic_node.cpp |   11 +++--
 box/center.cpp            |    6 +-
 src/topic_node.h          |    7 ++-
 src/topic_node.cpp        |   21 ++++++++--
 5 files changed, 47 insertions(+), 17 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 53c1f42..c3a03e3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -106,7 +106,7 @@
 		default: return false;
 		}
 	};
-	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
+	BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -142,7 +142,7 @@
 		}
 	};
 
-	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
+	BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
 
 	return true;
 }
@@ -166,7 +166,7 @@
 BHCenter::BHCenter(Socket::Shm &shm)
 {
 	auto nsec = NodeTimeoutSec();
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
 	AddCenter(center_ptr, shm);
 
 	for (auto &kv : Centers()) {
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 859aa8b..5c8df7a 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -29,7 +29,7 @@
 
 namespace
 {
-const std::string &kTopicQueryProc = "@center_query_procs";
+const std::string &kTopicQueryProc = "#center_query_procs";
 
 std::string ToJson(const MsgQueryProcReply &qpr)
 {
@@ -77,15 +77,18 @@
 	MsgCommonReply reply;
 
 	ProcInfo info;
-	info.set_proc_id("#center.node");
+	info.set_proc_id("@center.node");
 	info.set_name("center node");
-	if (!pnode_->UniRegister(true, info, reply, timeout)) {
+	Json jinfo;
+	jinfo.put("description", "some center services. Other nodes may use topics to use them.");
+	info.set_public_info(jinfo.dump());
+	if (!pnode_->DoRegister(true, info, reply, timeout)) {
 		throw std::runtime_error("center node register failed.");
 	}
 
 	MsgTopicList topics;
 	topics.add_topic_list(kTopicQueryProc);
-	if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) {
+	if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
 		throw std::runtime_error("center node register topics failed.");
 	}
 
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 124d329..3c38121 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -29,6 +29,11 @@
 
 namespace
 {
+bool ValidUserSymbol(const std::string &s)
+{
+	return !s.empty() && s[0] != '#' && s[0] != '@';
+}
+
 inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
 {
 	auto route = head.add_route();
@@ -143,10 +148,9 @@
 	for (auto &p : sockets_) { p->Stop(); }
 }
 
-bool TopicNode::UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
-	auto ValidUserProcId = [](const std::string &id) { return !id.empty() && id[0] != '#'; };
-	if (!internal && !ValidUserProcId(proc.proc_id())) {
+	if (!internal && !ValidUserSymbol(proc.proc_id())) {
 		SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'");
 		return false;
 	}
@@ -315,8 +319,17 @@
 	        reply.ParseBody(reply_body));
 }
 
-bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	if (!internal) {
+		for (auto &&topic : topics.topic_list()) {
+			if (!ValidUserSymbol(topic)) {
+				SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'");
+				return false;
+			}
+		}
+	}
+
 	if (!IsOnline()) {
 		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
diff --git a/src/topic_node.h b/src/topic_node.h
index c421048..3d6767b 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -43,8 +43,8 @@
 	~TopicNode();
 
 	// topic node
-	bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return UniRegister(false, proc, reply_body, timeout_ms); }
-	bool UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
+	bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return DoRegister(false, proc, reply_body, timeout_ms); }
+	bool DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(const int timeout_ms);
@@ -56,7 +56,8 @@
 	typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
 	bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
 	bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
-	bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
+	bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms) { return DoServerRegisterRPC(false, topics, reply, timeout_ms); }
+	bool DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
 	bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
 	bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
 
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 7081435..3d842bf 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -176,8 +176,21 @@
 		int reply_len = 0;
 		bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
 		DEFER1(BHFree(reply, reply_len));
-		// printf("register topic : %s\n", r ? "ok" : "failed");
-		// Sleep(1s);
+	}
+	{ // Server Register Topics
+		MsgTopicList topics;
+		topics.add_topic_list("@should_fail");
+		std::string s = topics.SerializeAsString();
+		void *reply = 0;
+		int reply_len = 0;
+		bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+		DEFER1(BHFree(reply, reply_len));
+		if (!r) {
+			int ec = 0;
+			std::string msg;
+			GetApiError(ec, msg);
+			printf("register rpc failed, %d, %s\n", ec, msg.c_str());
+		}
 	}
 	auto PrintProcs = [](MsgQueryProcReply const &result) {
 		printf("query proc result: %d\n", result.proc_list().size());
@@ -214,7 +227,7 @@
 	{
 		// query procs with normal topic request
 		MsgRequestTopic req;
-		req.set_topic("@center_query_procs");
+		req.set_topic("#center_query_procs");
 		// req.set_data("{\"proc_id\":\"#center.node\"}");
 		std::string s(req.SerializeAsString());
 		// Sleep(10ms, false);

--
Gitblit v1.8.0