From 11f6c600e55ca5677f93624efe44d2605cdd908d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 20:18:38 +0800
Subject: [PATCH] reserve #,@ prefix for internal proc id and topic.
---
utest/api_test.cpp | 19 ++++++++-
box/center_topic_node.cpp | 11 +++--
box/center.cpp | 6 +-
src/topic_node.h | 7 ++-
src/topic_node.cpp | 21 ++++++++--
5 files changed, 47 insertions(+), 17 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 53c1f42..c3a03e3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -106,7 +106,7 @@
default: return false;
}
};
- BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
+ BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
auto OnBusIdle = [=](ShmSocket &socket) {};
auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -142,7 +142,7 @@
}
};
- BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
+ BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
return true;
}
@@ -166,7 +166,7 @@
BHCenter::BHCenter(Socket::Shm &shm)
{
auto nsec = NodeTimeoutSec();
- auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
AddCenter(center_ptr, shm);
for (auto &kv : Centers()) {
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 859aa8b..5c8df7a 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -29,7 +29,7 @@
namespace
{
-const std::string &kTopicQueryProc = "@center_query_procs";
+const std::string &kTopicQueryProc = "#center_query_procs";
std::string ToJson(const MsgQueryProcReply &qpr)
{
@@ -77,15 +77,18 @@
MsgCommonReply reply;
ProcInfo info;
- info.set_proc_id("#center.node");
+ info.set_proc_id("@center.node");
info.set_name("center node");
- if (!pnode_->UniRegister(true, info, reply, timeout)) {
+ Json jinfo;
+ jinfo.put("description", "some center services. Other nodes may use topics to use them.");
+ info.set_public_info(jinfo.dump());
+ if (!pnode_->DoRegister(true, info, reply, timeout)) {
throw std::runtime_error("center node register failed.");
}
MsgTopicList topics;
topics.add_topic_list(kTopicQueryProc);
- if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) {
+ if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
throw std::runtime_error("center node register topics failed.");
}
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 124d329..3c38121 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -29,6 +29,11 @@
namespace
{
+bool ValidUserSymbol(const std::string &s)
+{
+ return !s.empty() && s[0] != '#' && s[0] != '@';
+}
+
inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
{
auto route = head.add_route();
@@ -143,10 +148,9 @@
for (auto &p : sockets_) { p->Stop(); }
}
-bool TopicNode::UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
- auto ValidUserProcId = [](const std::string &id) { return !id.empty() && id[0] != '#'; };
- if (!internal && !ValidUserProcId(proc.proc_id())) {
+ if (!internal && !ValidUserSymbol(proc.proc_id())) {
SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'");
return false;
}
@@ -315,8 +319,17 @@
reply.ParseBody(reply_body));
}
-bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
+ if (!internal) {
+ for (auto &&topic : topics.topic_list()) {
+ if (!ValidUserSymbol(topic)) {
+ SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'");
+ return false;
+ }
+ }
+ }
+
if (!IsOnline()) {
SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
diff --git a/src/topic_node.h b/src/topic_node.h
index c421048..3d6767b 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -43,8 +43,8 @@
~TopicNode();
// topic node
- bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return UniRegister(false, proc, reply_body, timeout_ms); }
- bool UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
+ bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return DoRegister(false, proc, reply_body, timeout_ms); }
+ bool DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
bool Heartbeat(const int timeout_ms);
@@ -56,7 +56,8 @@
typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
- bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
+ bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms) { return DoServerRegisterRPC(false, topics, reply, timeout_ms); }
+ bool DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 7081435..3d842bf 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -176,8 +176,21 @@
int reply_len = 0;
bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
DEFER1(BHFree(reply, reply_len));
- // printf("register topic : %s\n", r ? "ok" : "failed");
- // Sleep(1s);
+ }
+ { // Server Register Topics
+ MsgTopicList topics;
+ topics.add_topic_list("@should_fail");
+ std::string s = topics.SerializeAsString();
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+ DEFER1(BHFree(reply, reply_len));
+ if (!r) {
+ int ec = 0;
+ std::string msg;
+ GetApiError(ec, msg);
+ printf("register rpc failed, %d, %s\n", ec, msg.c_str());
+ }
}
auto PrintProcs = [](MsgQueryProcReply const &result) {
printf("query proc result: %d\n", result.proc_list().size());
@@ -214,7 +227,7 @@
{
// query procs with normal topic request
MsgRequestTopic req;
- req.set_topic("@center_query_procs");
+ req.set_topic("#center_query_procs");
// req.set_data("{\"proc_id\":\"#center.node\"}");
std::string s(req.SerializeAsString());
// Sleep(10ms, false);
--
Gitblit v1.8.0