From 11f6c600e55ca5677f93624efe44d2605cdd908d Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 20:18:38 +0800 Subject: [PATCH] reserve #,@ prefix for internal proc id and topic. --- utest/api_test.cpp | 19 ++++++++- box/center_topic_node.cpp | 11 +++-- box/center.cpp | 6 +- src/topic_node.h | 7 ++- src/topic_node.cpp | 21 ++++++++-- 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 53c1f42..c3a03e3 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -106,7 +106,7 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); + BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; @@ -142,7 +142,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); + BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -166,7 +166,7 @@ BHCenter::BHCenter(Socket::Shm &shm) { auto nsec = NodeTimeoutSec(); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. AddCenter(center_ptr, shm); for (auto &kv : Centers()) { diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp index 859aa8b..5c8df7a 100644 --- a/box/center_topic_node.cpp +++ b/box/center_topic_node.cpp @@ -29,7 +29,7 @@ namespace { -const std::string &kTopicQueryProc = "@center_query_procs"; +const std::string &kTopicQueryProc = "#center_query_procs"; std::string ToJson(const MsgQueryProcReply &qpr) { @@ -77,15 +77,18 @@ MsgCommonReply reply; ProcInfo info; - info.set_proc_id("#center.node"); + info.set_proc_id("@center.node"); info.set_name("center node"); - if (!pnode_->UniRegister(true, info, reply, timeout)) { + Json jinfo; + jinfo.put("description", "some center services. Other nodes may use topics to use them."); + info.set_public_info(jinfo.dump()); + if (!pnode_->DoRegister(true, info, reply, timeout)) { throw std::runtime_error("center node register failed."); } MsgTopicList topics; topics.add_topic_list(kTopicQueryProc); - if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) { + if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) { throw std::runtime_error("center node register topics failed."); } diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 124d329..3c38121 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -29,6 +29,11 @@ namespace { +bool ValidUserSymbol(const std::string &s) +{ + return !s.empty() && s[0] != '#' && s[0] != '@'; +} + inline void AddRoute(BHMsgHead &head, const ShmSocket &sock) { auto route = head.add_route(); @@ -143,10 +148,9 @@ for (auto &p : sockets_) { p->Stop(); } } -bool TopicNode::UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) +bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { - auto ValidUserProcId = [](const std::string &id) { return !id.empty() && id[0] != '#'; }; - if (!internal && !ValidUserProcId(proc.proc_id())) { + if (!internal && !ValidUserSymbol(proc.proc_id())) { SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'"); return false; } @@ -315,8 +319,17 @@ reply.ParseBody(reply_body)); } -bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) +bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { + if (!internal) { + for (auto &&topic : topics.topic_list()) { + if (!ValidUserSymbol(topic)) { + SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'"); + return false; + } + } + } + if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; diff --git a/src/topic_node.h b/src/topic_node.h index c421048..3d6767b 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -43,8 +43,8 @@ ~TopicNode(); // topic node - bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return UniRegister(false, proc, reply_body, timeout_ms); } - bool UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); + bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return DoRegister(false, proc, reply_body, timeout_ms); } + bool DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); @@ -56,7 +56,8 @@ typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB; bool ServerStart(ServerSyncCB const &cb, const int nworker = 2); bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2); - bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); + bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms) { return DoServerRegisterRPC(false, topics, reply, timeout_ms); } + bool DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 7081435..3d842bf 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -176,8 +176,21 @@ int reply_len = 0; bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); DEFER1(BHFree(reply, reply_len)); - // printf("register topic : %s\n", r ? "ok" : "failed"); - // Sleep(1s); + } + { // Server Register Topics + MsgTopicList topics; + topics.add_topic_list("@should_fail"); + std::string s = topics.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); + DEFER1(BHFree(reply, reply_len)); + if (!r) { + int ec = 0; + std::string msg; + GetApiError(ec, msg); + printf("register rpc failed, %d, %s\n", ec, msg.c_str()); + } } auto PrintProcs = [](MsgQueryProcReply const &result) { printf("query proc result: %d\n", result.proc_list().size()); @@ -214,7 +227,7 @@ { // query procs with normal topic request MsgRequestTopic req; - req.set_topic("@center_query_procs"); + req.set_topic("#center_query_procs"); // req.set_data("{\"proc_id\":\"#center.node\"}"); std::string s(req.SerializeAsString()); // Sleep(10ms, false); -- Gitblit v1.8.0