reserve #,@ prefix for internal proc id and topic.
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); |
| | | BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); |
| | | BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr, shm); |
| | | |
| | | for (auto &kv : Centers()) { |
| | |
| | | |
| | | namespace |
| | | { |
| | | const std::string &kTopicQueryProc = "@center_query_procs"; |
| | | const std::string &kTopicQueryProc = "#center_query_procs"; |
| | | |
| | | std::string ToJson(const MsgQueryProcReply &qpr) |
| | | { |
| | |
| | | MsgCommonReply reply; |
| | | |
| | | ProcInfo info; |
| | | info.set_proc_id("#center.node"); |
| | | info.set_proc_id("@center.node"); |
| | | info.set_name("center node"); |
| | | if (!pnode_->UniRegister(true, info, reply, timeout)) { |
| | | Json jinfo; |
| | | jinfo.put("description", "some center services. Other nodes may use topics to use them."); |
| | | info.set_public_info(jinfo.dump()); |
| | | if (!pnode_->DoRegister(true, info, reply, timeout)) { |
| | | throw std::runtime_error("center node register failed."); |
| | | } |
| | | |
| | | MsgTopicList topics; |
| | | topics.add_topic_list(kTopicQueryProc); |
| | | if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) { |
| | | if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) { |
| | | throw std::runtime_error("center node register topics failed."); |
| | | } |
| | | |
| | |
| | | |
| | | namespace |
| | | { |
| | | bool ValidUserSymbol(const std::string &s) |
| | | { |
| | | return !s.empty() && s[0] != '#' && s[0] != '@'; |
| | | } |
| | | |
| | | inline void AddRoute(BHMsgHead &head, const ShmSocket &sock) |
| | | { |
| | | auto route = head.add_route(); |
| | |
| | | for (auto &p : sockets_) { p->Stop(); } |
| | | } |
| | | |
| | | bool TopicNode::UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | auto ValidUserProcId = [](const std::string &id) { return !id.empty() && id[0] != '#'; }; |
| | | if (!internal && !ValidUserProcId(proc.proc_id())) { |
| | | if (!internal && !ValidUserSymbol(proc.proc_id())) { |
| | | SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'"); |
| | | return false; |
| | | } |
| | |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!internal) { |
| | | for (auto &&topic : topics.topic_list()) { |
| | | if (!ValidUserSymbol(topic)) { |
| | | SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'"); |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | |
| | | ~TopicNode(); |
| | | |
| | | // topic node |
| | | bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return UniRegister(false, proc, reply_body, timeout_ms); } |
| | | bool UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return DoRegister(false, proc, reply_body, timeout_ms); } |
| | | bool DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(const int timeout_ms); |
| | |
| | | typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB; |
| | | bool ServerStart(ServerSyncCB const &cb, const int nworker = 2); |
| | | bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2); |
| | | bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); |
| | | bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms) { return DoServerRegisterRPC(false, topics, reply, timeout_ms); } |
| | | bool DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); |
| | | bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); |
| | | |
| | |
| | | int reply_len = 0; |
| | | bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | DEFER1(BHFree(reply, reply_len)); |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | // Sleep(1s); |
| | | } |
| | | { // Server Register Topics |
| | | MsgTopicList topics; |
| | | topics.add_topic_list("@should_fail"); |
| | | std::string s = topics.SerializeAsString(); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | DEFER1(BHFree(reply, reply_len)); |
| | | if (!r) { |
| | | int ec = 0; |
| | | std::string msg; |
| | | GetApiError(ec, msg); |
| | | printf("register rpc failed, %d, %s\n", ec, msg.c_str()); |
| | | } |
| | | } |
| | | auto PrintProcs = [](MsgQueryProcReply const &result) { |
| | | printf("query proc result: %d\n", result.proc_list().size()); |
| | |
| | | { |
| | | // query procs with normal topic request |
| | | MsgRequestTopic req; |
| | | req.set_topic("@center_query_procs"); |
| | | req.set_topic("#center_query_procs"); |
| | | // req.set_data("{\"proc_id\":\"#center.node\"}"); |
| | | std::string s(req.SerializeAsString()); |
| | | // Sleep(10ms, false); |