From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 19 五月 2021 19:14:13 +0800
Subject: [PATCH] add api BHQueryProcs.
---
utest/api_test.cpp | 39 ++++++++++--
api/bhsgo/bhome_node_test.go | 11 +++
src/bh_api.cc | 13 ++++
box/center.cpp | 43 ++++++++++++++
src/proto.h | 2
src/msg.h | 16 ++++-
api/bhsgo/bhome_node.go | 14 ++++
proto/source/bhome_msg.proto | 2
src/topic_node.h | 1
src/bh_api.h | 8 ++
src/exported_symbols | 1
src/topic_node.cpp | 19 ++++++
12 files changed, 156 insertions(+), 13 deletions(-)
diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go
index 2cf3a81..9d66814 100644
--- a/api/bhsgo/bhome_node.go
+++ b/api/bhsgo/bhome_node.go
@@ -75,6 +75,20 @@
}
+func QueryProcs(dest_addr *bh.BHAddress, topic *bh.MsgQueryProc, reply *bh.MsgQueryProcReply, timeout_ms int) bool {
+ dest, _ := dest_addr.Marshal()
+ data, _ := topic.Marshal()
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+ r := C.BHQueryProcs(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
+ if r {
+ reply.Unmarshal(C.GoBytes(creply, creply_len))
+ }
+ return r
+
+}
+
func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
data, _ := pub.Marshal()
return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go
index 756b752..cd4bfb9 100644
--- a/api/bhsgo/bhome_node_test.go
+++ b/api/bhsgo/bhome_node_test.go
@@ -26,6 +26,7 @@
proc_id := "test_proc"
proc := bh.ProcInfo{}
proc.ProcId = []byte(proc_id)
+ fmt.Println("proc id: ", proc.ProcId)
reply := bh.MsgCommonReply{}
defer Cleanup()
@@ -69,6 +70,7 @@
} else {
fmt.Println("reg topics failed")
}
+
req := bh.MsgRequestTopic{}
time.Sleep(time.Second * 1)
req.Topic = []byte("topic0")
@@ -81,7 +83,14 @@
pid := ""
rr := bh.MsgRequestTopicReply{}
dest := bh.BHAddress{}
- for i := 0; i < 100; i++ {
+
+ queryProc := bh.MsgQueryProc{}
+ queryReply := bh.MsgQueryProcReply{}
+
+ QueryProcs(&dest, &queryProc, &queryReply, 3000)
+ fmt.Println("query result:", string(queryReply.ProcList[0].Proc.ProcId))
+
+ for i := 0; i < 10; i++ {
if Request(&dest, &req, &pid, &rr, 3000) {
fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
} else {
diff --git a/box/center.cpp b/box/center.cpp
index 9ecd04b..f140289 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -479,12 +479,52 @@
return MakeReply(eSuccess);
});
}
+ MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+ {
+ typedef MsgQueryProcReply Reply;
+ auto query = [&](Node self) -> Reply {
+ auto Add1 = [](Reply &reply, Node node) {
+ auto info = reply.add_proc_list();
+ *info->mutable_proc() = node->proc_;
+ info->set_online(node->state_.flag_ == kStateNormal);
+ for (auto &addr_topics : node->services_) {
+ for (auto &topic : addr_topics.second) {
+ info->mutable_topics()->add_topic_list(topic);
+ }
+ }
+ };
+
+ if (!req.proc_id().empty()) {
+ auto pos = online_node_addr_map_.find(req.proc_id());
+ if (pos == online_node_addr_map_.end()) {
+ return MakeReply<Reply>(eNotFound, "proc not found.");
+ } else {
+ auto node_pos = nodes_.find(pos->second);
+ if (node_pos == nodes_.end()) {
+ return MakeReply<Reply>(eNotFound, "proc node not found.");
+ } else {
+ auto reply = MakeReply<Reply>(eSuccess);
+ Add1(reply, node_pos->second);
+ return reply;
+ }
+ }
+ } else {
+ Reply reply(MakeReply<Reply>(eSuccess));
+ for (auto &kv : nodes_) {
+ Add1(reply, kv.second);
+ }
+ return reply;
+ }
+ };
+
+ return HandleMsg<Reply>(head, query);
+ }
MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
{
typedef MsgQueryTopicReply Reply;
- auto query = [&](Node self) -> MsgQueryTopicReply {
+ auto query = [&](Node self) -> Reply {
auto pos = service_map_.find(req.topic());
if (pos != service_map_.end() && !pos->second.empty()) {
auto &clients = pos->second;
@@ -747,6 +787,7 @@
CASE_ON_MSG_TYPE(RegisterRPC);
CASE_ON_MSG_TYPE(QueryTopic);
+ CASE_ON_MSG_TYPE(QueryProc);
default: return false;
}
};
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index d06a7c2..42cf375 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -49,6 +49,8 @@
// kMsgTypeUnsubscribeReply = 25;
kMsgTypeUnregister = 26;
// kMsgTypeUnregisterReply = 27;
+ kMsgTypeQueryProc = 28;
+ kMsgTypeQueryProcReply = 29;
}
diff --git a/src/bh_api.cc b/src/bh_api.cc
index b37eaae..ca69b6e 100644
--- a/src/bh_api.cc
+++ b/src/bh_api.cc
@@ -194,6 +194,19 @@
remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
}
+int BHQueryProcs(const void *remote,
+ const int remote_len,
+ const void *query,
+ const int query_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms)
+{
+ return BHApi_In2_Out1<BHAddress, MsgQueryProc, MsgQueryProcReply>(
+ &TopicNode::QueryProcs,
+ remote, remote_len, query, query_len, reply, reply_len, timeout_ms);
+}
+
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
diff --git a/src/bh_api.h b/src/bh_api.h
index e196aa6..3b77da5 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -44,6 +44,14 @@
int *reply_len,
const int timeout_ms);
+int BHQueryProcs(const void *remote,
+ const int remote_len,
+ const void *query,
+ const int query_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
+
int BHSubscribeTopics(const void *topics,
const int topics_len,
void **reply,
diff --git a/src/exported_symbols b/src/exported_symbols
index 3351da9..addfadc 100644
--- a/src/exported_symbols
+++ b/src/exported_symbols
@@ -5,6 +5,7 @@
BHUnregister;
BHRegisterTopics;
BHQueryTopicAddress;
+ BHQueryProcs;
BHSubscribeTopics;
BHStartWorker;
BHHeartbeatEasy;
diff --git a/src/msg.h b/src/msg.h
index 42a753e..1ac153a 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -74,12 +74,13 @@
RefCount count_;
const uint32_t tag_ = kMsgTag;
- const uint32_t size_ = 0;
+ const uint32_t capacity_ = 0;
const int64_t id_ = 0;
std::atomic<int64_t> timestamp_;
bool managed_ = false;
+ uint32_t size_ = 0;
Meta(uint32_t size) :
- size_(size), id_(NewId()), timestamp_(NowSec()) {}
+ capacity_(size), id_(NewId()), timestamp_(NowSec()) {}
};
OffsetType offset_;
static void *Alloc(const size_t size)
@@ -111,6 +112,7 @@
};
Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
+ meta()->size_ = 4 + head_len + 4 + body_len;
}
return addr;
}
@@ -120,6 +122,7 @@
void *addr = get();
if (addr) {
memcpy(addr, content.data(), content.size());
+ meta()->size_ = content.size();
}
return addr;
}
@@ -174,11 +177,11 @@
uint32_t head_len = head.ByteSizeLong();
uint32_t body_len = body.ByteSizeLong();
uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
- return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len);
+ return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
}
inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
- inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); }
+ inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); }
inline bool Make(const size_t size) { return Make(Alloc(size)); }
@@ -209,6 +212,11 @@
p += 4;
return head.ParseFromArray(p, msg_size);
}
+ std::string content() const
+ {
+ auto p = get<char>();
+ return p ? std::string(p, meta()->size_) : std::string();
+ }
std::string body() const
{
auto p = get<char>();
diff --git a/src/proto.h b/src/proto.h
index c05407b..29ff290 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -50,6 +50,8 @@
BHOME_SIMPLE_MAP_MSG(Unsubscribe);
BHOME_SIMPLE_MAP_MSG(ProcInit);
BHOME_SIMPLE_MAP_MSG(ProcInitReply);
+BHOME_SIMPLE_MAP_MSG(QueryProc);
+BHOME_SIMPLE_MAP_MSG(QueryProcReply);
#undef BHOME_SIMPLE_MAP_MSG
#undef BHOME_MAP_MSG_AND_TYPE
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index a3f3428..8bbb929 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -292,6 +292,25 @@
reply.ParseBody(reply_body));
}
+bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms)
+{
+ if (!IsOnline()) {
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
+ return false;
+ }
+ auto &sock = SockNode();
+
+ BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
+ AddRoute(head, sock);
+
+ MsgI reply;
+ DEFER1(reply.Release());
+ BHMsgHead reply_head;
+ return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
+ reply_head.type() == kMsgTypeQueryProcReply &&
+ reply.ParseBody(reply_body));
+}
+
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
diff --git a/src/topic_node.h b/src/topic_node.h
index 81bf718..1bb3611 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -46,6 +46,7 @@
bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
bool Heartbeat(const int timeout_ms);
bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
+ bool QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms);
// topic rpc server
typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 533c399..33adf91 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -165,7 +165,34 @@
void *reply = 0;
int reply_len = 0;
bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
- BHFree(reply, reply_len);
+ DEFER1(BHFree(reply, reply_len));
+ // printf("register topic : %s\n", r ? "ok" : "failed");
+ // Sleep(1s);
+ }
+ {
+ // query procs
+ std::string dest(BHAddress().SerializeAsString());
+ MsgQueryProc query;
+ std::string s = query.SerializeAsString();
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = BHQueryProcs(dest.data(), dest.size(), s.data(), s.size(), &reply, &reply_len, 1000);
+ DEFER1(BHFree(reply, reply_len));
+ MsgQueryProcReply result;
+ if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) {
+ printf("query proc result: %d\n", result.proc_list().size());
+ for (int i = 0; i < result.proc_list().size(); ++i) {
+ auto &info = result.proc_list(i);
+ printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
+ (info.online() ? "online" : "offline"),
+ info.proc().proc_id().c_str(), info.proc().name().c_str());
+ for (auto &t : info.topics().topic_list()) {
+ printf("\t\t %s", t.c_str());
+ }
+ }
+ } else {
+ printf("query proc error\n");
+ }
// printf("register topic : %s\n", r ? "ok" : "failed");
// Sleep(1s);
}
@@ -310,15 +337,13 @@
int ncli = 10;
const int64_t nreq = 1000 * 100;
-#if 1
- for (int i = 0; i < ncli; ++i) {
- threads.Launch(asyncRequest, nreq);
- }
-#else
for (int i = 0; i < 100; ++i) {
SyncRequest(i);
}
-#endif
+
+ for (int i = 0; i < ncli; ++i) {
+ threads.Launch(asyncRequest, nreq);
+ }
int same = 0;
uint64_t last = 0;
--
Gitblit v1.8.0