From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- utest/api_test.cpp | 39 ++++++++++-- api/bhsgo/bhome_node_test.go | 11 +++ src/bh_api.cc | 13 ++++ box/center.cpp | 43 ++++++++++++++ src/proto.h | 2 src/msg.h | 16 ++++- api/bhsgo/bhome_node.go | 14 ++++ proto/source/bhome_msg.proto | 2 src/topic_node.h | 1 src/bh_api.h | 8 ++ src/exported_symbols | 1 src/topic_node.cpp | 19 ++++++ 12 files changed, 156 insertions(+), 13 deletions(-) diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go index 2cf3a81..9d66814 100644 --- a/api/bhsgo/bhome_node.go +++ b/api/bhsgo/bhome_node.go @@ -75,6 +75,20 @@ } +func QueryProcs(dest_addr *bh.BHAddress, topic *bh.MsgQueryProc, reply *bh.MsgQueryProcReply, timeout_ms int) bool { + dest, _ := dest_addr.Marshal() + data, _ := topic.Marshal() + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHQueryProcs(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 + if r { + reply.Unmarshal(C.GoBytes(creply, creply_len)) + } + return r + +} + func Publish(pub *bh.MsgPublish, timeout_ms int) bool { data, _ := pub.Marshal() return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0 diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go index 756b752..cd4bfb9 100644 --- a/api/bhsgo/bhome_node_test.go +++ b/api/bhsgo/bhome_node_test.go @@ -26,6 +26,7 @@ proc_id := "test_proc" proc := bh.ProcInfo{} proc.ProcId = []byte(proc_id) + fmt.Println("proc id: ", proc.ProcId) reply := bh.MsgCommonReply{} defer Cleanup() @@ -69,6 +70,7 @@ } else { fmt.Println("reg topics failed") } + req := bh.MsgRequestTopic{} time.Sleep(time.Second * 1) req.Topic = []byte("topic0") @@ -81,7 +83,14 @@ pid := "" rr := bh.MsgRequestTopicReply{} dest := bh.BHAddress{} - for i := 0; i < 100; i++ { + + queryProc := bh.MsgQueryProc{} + queryReply := bh.MsgQueryProcReply{} + + QueryProcs(&dest, &queryProc, &queryReply, 3000) + fmt.Println("query result:", string(queryReply.ProcList[0].Proc.ProcId)) + + for i := 0; i < 10; i++ { if Request(&dest, &req, &pid, &rr, 3000) { fmt.Println("server:" + pid + ", reply:" + string(rr.Data)) } else { diff --git a/box/center.cpp b/box/center.cpp index 9ecd04b..f140289 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -479,12 +479,52 @@ return MakeReply(eSuccess); }); } + MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req) + { + typedef MsgQueryProcReply Reply; + auto query = [&](Node self) -> Reply { + auto Add1 = [](Reply &reply, Node node) { + auto info = reply.add_proc_list(); + *info->mutable_proc() = node->proc_; + info->set_online(node->state_.flag_ == kStateNormal); + for (auto &addr_topics : node->services_) { + for (auto &topic : addr_topics.second) { + info->mutable_topics()->add_topic_list(topic); + } + } + }; + + if (!req.proc_id().empty()) { + auto pos = online_node_addr_map_.find(req.proc_id()); + if (pos == online_node_addr_map_.end()) { + return MakeReply<Reply>(eNotFound, "proc not found."); + } else { + auto node_pos = nodes_.find(pos->second); + if (node_pos == nodes_.end()) { + return MakeReply<Reply>(eNotFound, "proc node not found."); + } else { + auto reply = MakeReply<Reply>(eSuccess); + Add1(reply, node_pos->second); + return reply; + } + } + } else { + Reply reply(MakeReply<Reply>(eSuccess)); + for (auto &kv : nodes_) { + Add1(reply, kv.second); + } + return reply; + } + }; + + return HandleMsg<Reply>(head, query); + } MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) { typedef MsgQueryTopicReply Reply; - auto query = [&](Node self) -> MsgQueryTopicReply { + auto query = [&](Node self) -> Reply { auto pos = service_map_.find(req.topic()); if (pos != service_map_.end() && !pos->second.empty()) { auto &clients = pos->second; @@ -747,6 +787,7 @@ CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); + CASE_ON_MSG_TYPE(QueryProc); default: return false; } }; diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index d06a7c2..42cf375 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -49,6 +49,8 @@ // kMsgTypeUnsubscribeReply = 25; kMsgTypeUnregister = 26; // kMsgTypeUnregisterReply = 27; + kMsgTypeQueryProc = 28; + kMsgTypeQueryProcReply = 29; } diff --git a/src/bh_api.cc b/src/bh_api.cc index b37eaae..ca69b6e 100644 --- a/src/bh_api.cc +++ b/src/bh_api.cc @@ -194,6 +194,19 @@ remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); } +int BHQueryProcs(const void *remote, + const int remote_len, + const void *query, + const int query_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + return BHApi_In2_Out1<BHAddress, MsgQueryProc, MsgQueryProcReply>( + &TopicNode::QueryProcs, + remote, remote_len, query, query_len, reply, reply_len, timeout_ms); +} + int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); diff --git a/src/bh_api.h b/src/bh_api.h index e196aa6..3b77da5 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -44,6 +44,14 @@ int *reply_len, const int timeout_ms); +int BHQueryProcs(const void *remote, + const int remote_len, + const void *query, + const int query_len, + void **reply, + int *reply_len, + const int timeout_ms); + int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, diff --git a/src/exported_symbols b/src/exported_symbols index 3351da9..addfadc 100644 --- a/src/exported_symbols +++ b/src/exported_symbols @@ -5,6 +5,7 @@ BHUnregister; BHRegisterTopics; BHQueryTopicAddress; + BHQueryProcs; BHSubscribeTopics; BHStartWorker; BHHeartbeatEasy; diff --git a/src/msg.h b/src/msg.h index 42a753e..1ac153a 100644 --- a/src/msg.h +++ b/src/msg.h @@ -74,12 +74,13 @@ RefCount count_; const uint32_t tag_ = kMsgTag; - const uint32_t size_ = 0; + const uint32_t capacity_ = 0; const int64_t id_ = 0; std::atomic<int64_t> timestamp_; bool managed_ = false; + uint32_t size_ = 0; Meta(uint32_t size) : - size_(size), id_(NewId()), timestamp_(NowSec()) {} + capacity_(size), id_(NewId()), timestamp_(NowSec()) {} }; OffsetType offset_; static void *Alloc(const size_t size) @@ -111,6 +112,7 @@ }; Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); }); Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); }); + meta()->size_ = 4 + head_len + 4 + body_len; } return addr; } @@ -120,6 +122,7 @@ void *addr = get(); if (addr) { memcpy(addr, content.data(), content.size()); + meta()->size_ = content.size(); } return addr; } @@ -174,11 +177,11 @@ uint32_t head_len = head.ByteSizeLong(); uint32_t body_len = body.ByteSizeLong(); uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; - return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len); + return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len); } inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); } - inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); } + inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); } inline bool Make(const size_t size) { return Make(Alloc(size)); } @@ -209,6 +212,11 @@ p += 4; return head.ParseFromArray(p, msg_size); } + std::string content() const + { + auto p = get<char>(); + return p ? std::string(p, meta()->size_) : std::string(); + } std::string body() const { auto p = get<char>(); diff --git a/src/proto.h b/src/proto.h index c05407b..29ff290 100644 --- a/src/proto.h +++ b/src/proto.h @@ -50,6 +50,8 @@ BHOME_SIMPLE_MAP_MSG(Unsubscribe); BHOME_SIMPLE_MAP_MSG(ProcInit); BHOME_SIMPLE_MAP_MSG(ProcInitReply); +BHOME_SIMPLE_MAP_MSG(QueryProc); +BHOME_SIMPLE_MAP_MSG(QueryProcReply); #undef BHOME_SIMPLE_MAP_MSG #undef BHOME_MAP_MSG_AND_TYPE diff --git a/src/topic_node.cpp b/src/topic_node.cpp index a3f3428..8bbb929 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -292,6 +292,25 @@ reply.ParseBody(reply_body)); } +bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) +{ + if (!IsOnline()) { + SetLastError(eNotRegistered, kErrMsgNotRegistered); + return false; + } + auto &sock = SockNode(); + + BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); + AddRoute(head, sock); + + MsgI reply; + DEFER1(reply.Release()); + BHMsgHead reply_head; + return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeQueryProcReply && + reply.ParseBody(reply_body)); +} + bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { diff --git a/src/topic_node.h b/src/topic_node.h index 81bf718..1bb3611 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -46,6 +46,7 @@ bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); + bool QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 533c399..33adf91 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -165,7 +165,34 @@ void *reply = 0; int reply_len = 0; bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); - BHFree(reply, reply_len); + DEFER1(BHFree(reply, reply_len)); + // printf("register topic : %s\n", r ? "ok" : "failed"); + // Sleep(1s); + } + { + // query procs + std::string dest(BHAddress().SerializeAsString()); + MsgQueryProc query; + std::string s = query.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = BHQueryProcs(dest.data(), dest.size(), s.data(), s.size(), &reply, &reply_len, 1000); + DEFER1(BHFree(reply, reply_len)); + MsgQueryProcReply result; + if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) { + printf("query proc result: %d\n", result.proc_list().size()); + for (int i = 0; i < result.proc_list().size(); ++i) { + auto &info = result.proc_list(i); + printf("proc [%d] %s, %s, %s\n\ttopics\n", i, + (info.online() ? "online" : "offline"), + info.proc().proc_id().c_str(), info.proc().name().c_str()); + for (auto &t : info.topics().topic_list()) { + printf("\t\t %s", t.c_str()); + } + } + } else { + printf("query proc error\n"); + } // printf("register topic : %s\n", r ? "ok" : "failed"); // Sleep(1s); } @@ -310,15 +337,13 @@ int ncli = 10; const int64_t nreq = 1000 * 100; -#if 1 - for (int i = 0; i < ncli; ++i) { - threads.Launch(asyncRequest, nreq); - } -#else for (int i = 0; i < 100; ++i) { SyncRequest(i); } -#endif + + for (int i = 0; i < ncli; ++i) { + threads.Launch(asyncRequest, nreq); + } int same = 0; uint64_t last = 0; -- Gitblit v1.8.0