From 65a230ec6ccb61c3ce6816730da2106f07f40b4a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 18:42:41 +0800 Subject: [PATCH] add api, Unregister, QueryTopicAddress. --- proto/source/bhome_msg_api.proto | 21 ++++ utest/api_test.cpp | 2 api/bhsgo/bhome_node_test.go | 18 +++ box/center.cpp | 36 ++++-- src/proto.h | 1 api/bhsgo/bhome_node.go | 25 ++++ proto/source/bhome_msg.proto | 2 src/topic_node.h | 4 src/bh_api.h | 12 ++ src/bh_api.cpp | 8 + src/topic_node.cpp | 124 ++++++++++++++---------- 11 files changed, 184 insertions(+), 69 deletions(-) diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go index 35d8681..f771f08 100644 --- a/api/bhsgo/bhome_node.go +++ b/api/bhsgo/bhome_node.go @@ -8,8 +8,9 @@ import "C" import ( - bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "unsafe" + + bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" ) func getPtr(n *[]byte) unsafe.Pointer { @@ -46,8 +47,8 @@ return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) } -func Heartbeat(topics *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { - data, _ := topics.Marshal() +func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := proc.Marshal() return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) } @@ -55,6 +56,24 @@ return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0 } +func Unregister(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := proc.Marshal() + return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHUnregister), data, reply, timeout_ms) +} + +func QueryTopicAddress(topic *bh.MsgQueryTopic, reply *bh.MsgQueryTopicReply, timeout_ms int) bool { + data, _ := topic.Marshal() + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHQueryTopicAddress(getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 + if r { + reply.Unmarshal(C.GoBytes(creply, creply_len)) + } + return r + +} + func Publish(pub *bh.MsgPublish, timeout_ms int) bool { data, _ := pub.Marshal() return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0 diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go index b00b1fe..cc1966e 100644 --- a/api/bhsgo/bhome_node_test.go +++ b/api/bhsgo/bhome_node_test.go @@ -1,11 +1,12 @@ package bhsgo import ( - bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "fmt" "testing" "time" "unsafe" + + bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" ) func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) { @@ -37,6 +38,21 @@ t.Log("register error") return } + r = Unregister(&proc, &reply, 1000) + if r { + fmt.Println("Unregister ok") + } else { + fmt.Println("Unregister failed") + } + + r = Register(&proc, &reply, 1000) + if r { + fmt.Println("register ok") + } else { + fmt.Println("register failed") + t.Log("register error") + return + } r = HeartbeatEasy(1000) if r { diff --git a/box/center.cpp b/box/center.cpp index 0f547e9..badfbfe 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -166,6 +166,18 @@ return HandleMsg<MsgCommonReply, Func>(head, op); } + MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg) + { + return HandleMsg( + head, [&](Node node) -> MsgCommonReply { + NodeInfo &ni = *node; + auto now = NowSec(); // just set to offline. + ni.state_.timestamp_ = now - offline_time_; + ni.state_.UpdateState(now, offline_time_, kill_time_); + return MakeReply(eSuccess); + }); + } + MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) { return HandleMsg( @@ -206,20 +218,17 @@ auto query = [&](Node self) -> MsgQueryTopicReply { auto pos = service_map_.find(req.topic()); if (pos != service_map_.end() && !pos->second.empty()) { - // now just find first one. - const TopicDest &dest = *(pos->second.begin()); - Node dest_node(dest.weak_node_.lock()); - if (!dest_node) { - service_map_.erase(pos); - return MakeReply<Reply>(eOffline, "topic server offline."); - } else if (!Valid(*dest_node)) { - return MakeReply<Reply>(eNoRespond, "topic server not responding."); - } else { - MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess); - reply.mutable_address()->set_mq_id(dest.mq_); - return reply; + auto &clients = pos->second; + Reply reply = MakeReply<Reply>(eSuccess); + for (auto &dest : clients) { + Node dest_node(dest.weak_node_.lock()); + if (dest_node && Valid(*dest_node)) { + auto node_addr = reply.add_node_address(); + node_addr->set_proc_id(dest_node->proc_.proc_id()); + node_addr->mutable_addr()->set_mq_id(dest.mq_); + } } - + return reply; } else { return MakeReply<Reply>(eNotFound, "topic server not found."); } @@ -433,6 +442,7 @@ switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); + CASE_ON_MSG_TYPE(Unregister); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index aabe372..6a4942d 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -44,6 +44,8 @@ // kMsgTypeSubscribeReply = 23; kMsgTypeUnsubscribe = 24; // kMsgTypeUnsubscribeReply = 25; + kMsgTypeUnregister = 26; + // kMsgTypeUnregisterReply = 27; } diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto index fd1ae8f..838c228 100644 --- a/proto/source/bhome_msg_api.proto +++ b/proto/source/bhome_msg_api.proto @@ -51,6 +51,11 @@ repeated BHAddress addrs = 2; } +message MsgUnregister +{ + ProcInfo proc = 1; +} + message MsgHeartbeat { ProcInfo proc = 1; @@ -62,5 +67,19 @@ message MsgQueryTopicReply { ErrorMsg errmsg = 1; - BHAddress address = 2; + +message BHNodeAddress { + bytes proc_id = 1; + BHAddress addr = 2; +} + repeated BHNodeAddress node_address = 2; +} + +message MsgQueryProc { + bytes proc_id = 1; +} + +message MsgQueryProcReply { + ErrorMsg errmsg = 1; + repeated ProcInfo proc = 2; } diff --git a/src/bh_api.cpp b/src/bh_api.cpp index f0ba26d..5c424e0 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -104,6 +104,10 @@ { return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } +int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); +} int BHHeartbeatEasy(const int timeout_ms) { @@ -120,6 +124,10 @@ return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); } +int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms); +} int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); diff --git a/src/bh_api.h b/src/bh_api.h index 33a70cb..4d6846e 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -24,12 +24,24 @@ int *reply_len, const int timeout_ms); +int BHUnregister(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms); + int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms); +int BHQueryTopicAddress(const void *topics, + const int topics_len, + void **reply, + int *reply_len, + const int timeout_ms); + int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, diff --git a/src/proto.h b/src/proto.h index 2557f8e..c30b4fd 100644 --- a/src/proto.h +++ b/src/proto.h @@ -38,6 +38,7 @@ BHOME_SIMPLE_MAP_MSG(CommonReply); BHOME_SIMPLE_MAP_MSG(Register); +BHOME_SIMPLE_MAP_MSG(Unregister); BHOME_SIMPLE_MAP_MSG(RegisterRPC); BHOME_SIMPLE_MAP_MSG(Heartbeat); BHOME_SIMPLE_MAP_MSG(QueryTopic); diff --git a/src/topic_node.cpp b/src/topic_node.cpp index a5d48b7..24bc4bb 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -120,6 +120,39 @@ return IsOnline(); } } +bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) +{ + info_.Clear(); + state_cas(eStateOnline, eStateOffline); + + auto &sock = SockNode(); + MsgUnregister body; + body.mutable_proc()->Swap(&proc); + + auto head(InitMsgHead(GetType(body), body.proc().proc_id())); + AddRoute(head, sock.id()); + + auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { + bool r = head.type() == kMsgTypeCommonReply && + msg.ParseBody(rbody) && + IsSuccess(rbody.errmsg().errcode()); + return r; + }; + + if (timeout_ms == 0) { + auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + MsgCommonReply body; + CheckResult(imsg, head, body); + }; + return sock.Send(&BHTopicCenterAddress(), head, body, onResult); + } else { + MsgI reply; + DEFER1(reply.Release();); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + return r && CheckResult(reply, reply_head, reply_body); + } +} bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { @@ -152,6 +185,25 @@ proc.set_proc_id(proc_id()); MsgCommonReply reply_body; return Heartbeat(proc, reply_body, timeout_ms); +} + +bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) +{ + if (!IsOnline()) { + SetLastError(eNotRegistered, "Not Registered."); + return false; + } + auto &sock = SockNode(); + + BHMsgHead head(InitMsgHead(GetType(query), proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release()); + BHMsgHead reply_head; + return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeQueryTopicReply && + reply.ParseBody(reply_body)); } bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) @@ -317,32 +369,7 @@ try { BHAddress addr; -#if 1 return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); -#else - if (topic_query_cache_.Pick(req.topic(), addr)) { - return SendTo(addr, req, cb); - } - - auto &sock = SockClient(); - MsgQueryTopic query; - query.set_topic(req.topic()); - BHMsgHead head(InitMsgHead(GetType(query), proc_id())); - AddRoute(head, sock.id()); - - auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - MsgQueryTopicReply rep; - if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { - auto &addr = rep.address(); - if (!addr.mq_id().empty()) { - topic_query_cache_.Store(req.topic(), addr); - SendTo(addr, req, cb); - } - } - }; - return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); -#endif - } catch (...) { SetLastError(eError, "internal error."); return false; @@ -384,6 +411,22 @@ return false; } +int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) +{ + int n = 0; + MsgQueryTopic query; + query.set_topic(topic); + MsgQueryTopicReply rep; + if (QueryTopicAddress(query, rep, timeout_ms)) { + auto &ls = rep.node_address(); + n = ls.size(); + for (auto &na : ls) { + addr.push_back(na); + } + } + return n; +} + bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) { if (!IsOnline()) { @@ -391,35 +434,16 @@ return false; } - auto &sock = SockClient(); - if (topic_query_cache_.Find(topic, addr)) { return true; } - - MsgQueryTopic query; - query.set_topic(topic); - BHMsgHead head(InitMsgHead(GetType(query), proc_id())); - AddRoute(head, sock.id()); - - MsgI reply; - DEFER1(reply.Release()); - BHMsgHead reply_head; - - if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { - if (reply_head.type() == kMsgTypeQueryTopicReply) { - MsgQueryTopicReply rep; - if (reply.ParseBody(rep)) { - addr = rep.address(); - if (addr.mq_id().empty()) { - return false; - } else { - topic_query_cache_.Store(topic, addr); - return true; - } - } + std::vector<NodeAddress> lst; + if (QueryRPCTopics(topic, lst, timeout_ms)) { + addr = lst.front().addr(); + if (!addr.mq_id().empty()) { + topic_query_cache_.Store(topic, addr); + return true; } - } else { } return false; } diff --git a/src/topic_node.h b/src/topic_node.h index 3371c35..20b27d2 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -41,8 +41,10 @@ // topic node bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); + bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); + bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; @@ -73,6 +75,8 @@ private: bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); + typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; + int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); const std::string &proc_id() { return info_.proc_id(); } typedef BHAddress Address; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 79236ba..5d65bd5 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -219,7 +219,7 @@ TLMutex mutex; // CasMutex mutex; auto Lock = [&]() { - for (int i = 0; i < 1000 * 1000 * 10; ++i) { + for (int i = 0; i < 10; ++i) { mutex.lock(); mutex.unlock(); } -- Gitblit v1.8.0