add api, Unregister, QueryTopicAddress.
| | |
| | | import "C" |
| | | |
| | | import ( |
| | | bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" |
| | | "unsafe" |
| | | |
| | | bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" |
| | | ) |
| | | |
| | | func getPtr(n *[]byte) unsafe.Pointer { |
| | |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func Heartbeat(topics *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := proc.Marshal() |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) |
| | | } |
| | | |
| | |
| | | return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0 |
| | | } |
| | | |
| | | func Unregister(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := proc.Marshal() |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHUnregister), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func QueryTopicAddress(topic *bh.MsgQueryTopic, reply *bh.MsgQueryTopicReply, timeout_ms int) bool { |
| | | data, _ := topic.Marshal() |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHQueryTopicAddress(getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 |
| | | if r { |
| | | reply.Unmarshal(C.GoBytes(creply, creply_len)) |
| | | } |
| | | return r |
| | | |
| | | } |
| | | |
| | | func Publish(pub *bh.MsgPublish, timeout_ms int) bool { |
| | | data, _ := pub.Marshal() |
| | | return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0 |
| | |
| | | package bhsgo |
| | | |
| | | import ( |
| | | bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" |
| | | "fmt" |
| | | "testing" |
| | | "time" |
| | | "unsafe" |
| | | |
| | | bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg" |
| | | ) |
| | | |
| | | func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) { |
| | |
| | | t.Log("register error") |
| | | return |
| | | } |
| | | r = Unregister(&proc, &reply, 1000) |
| | | if r { |
| | | fmt.Println("Unregister ok") |
| | | } else { |
| | | fmt.Println("Unregister failed") |
| | | } |
| | | |
| | | r = Register(&proc, &reply, 1000) |
| | | if r { |
| | | fmt.Println("register ok") |
| | | } else { |
| | | fmt.Println("register failed") |
| | | t.Log("register error") |
| | | return |
| | | } |
| | | |
| | | r = HeartbeatEasy(1000) |
| | | if r { |
| | |
| | | return HandleMsg<MsgCommonReply, Func>(head, op); |
| | | } |
| | | |
| | | MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg) |
| | | { |
| | | return HandleMsg( |
| | | head, [&](Node node) -> MsgCommonReply { |
| | | NodeInfo &ni = *node; |
| | | auto now = NowSec(); // just set to offline. |
| | | ni.state_.timestamp_ = now - offline_time_; |
| | | ni.state_.UpdateState(now, offline_time_, kill_time_); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | |
| | | MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) |
| | | { |
| | | return HandleMsg( |
| | |
| | | auto query = [&](Node self) -> MsgQueryTopicReply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | // now just find first one. |
| | | const TopicDest &dest = *(pos->second.begin()); |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (!dest_node) { |
| | | service_map_.erase(pos); |
| | | return MakeReply<Reply>(eOffline, "topic server offline."); |
| | | } else if (!Valid(*dest_node)) { |
| | | return MakeReply<Reply>(eNoRespond, "topic server not responding."); |
| | | } else { |
| | | MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess); |
| | | reply.mutable_address()->set_mq_id(dest.mq_); |
| | | return reply; |
| | | auto &clients = pos->second; |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_); |
| | | } |
| | | } |
| | | |
| | | return reply; |
| | | } else { |
| | | return MakeReply<Reply>(eNotFound, "topic server not found."); |
| | | } |
| | |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | | CASE_ON_MSG_TYPE(Unregister); |
| | | |
| | | CASE_ON_MSG_TYPE(RegisterRPC); |
| | | CASE_ON_MSG_TYPE(QueryTopic); |
| | |
| | | // kMsgTypeSubscribeReply = 23; |
| | | kMsgTypeUnsubscribe = 24; |
| | | // kMsgTypeUnsubscribeReply = 25; |
| | | kMsgTypeUnregister = 26; |
| | | // kMsgTypeUnregisterReply = 27; |
| | | |
| | | } |
| | | |
| | |
| | | repeated BHAddress addrs = 2; |
| | | } |
| | | |
| | | message MsgUnregister |
| | | { |
| | | ProcInfo proc = 1; |
| | | } |
| | | |
| | | message MsgHeartbeat |
| | | { |
| | | ProcInfo proc = 1; |
| | |
| | | |
| | | message MsgQueryTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | BHAddress address = 2; |
| | | |
| | | message BHNodeAddress { |
| | | bytes proc_id = 1; |
| | | BHAddress addr = 2; |
| | | } |
| | | repeated BHNodeAddress node_address = 2; |
| | | } |
| | | |
| | | message MsgQueryProc { |
| | | bytes proc_id = 1; |
| | | } |
| | | |
| | | message MsgQueryProcReply { |
| | | ErrorMsg errmsg = 1; |
| | | repeated ProcInfo proc = 2; |
| | | } |
| | |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHHeartbeatEasy(const int timeout_ms) |
| | | { |
| | |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHUnregister(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHRegisterTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHQueryTopicAddress(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHSubscribeTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | |
| | | |
| | | BHOME_SIMPLE_MAP_MSG(CommonReply); |
| | | BHOME_SIMPLE_MAP_MSG(Register); |
| | | BHOME_SIMPLE_MAP_MSG(Unregister); |
| | | BHOME_SIMPLE_MAP_MSG(RegisterRPC); |
| | | BHOME_SIMPLE_MAP_MSG(Heartbeat); |
| | | BHOME_SIMPLE_MAP_MSG(QueryTopic); |
| | |
| | | return IsOnline(); |
| | | } |
| | | } |
| | | bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | info_.Clear(); |
| | | state_cas(eStateOnline, eStateOffline); |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgUnregister body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | | bool r = head.type() == kMsgTypeCommonReply && |
| | | msg.ParseBody(rbody) && |
| | | IsSuccess(rbody.errmsg().errcode()); |
| | | return r; |
| | | }; |
| | | |
| | | if (timeout_ms == 0) { |
| | | auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | return r && CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | |
| | | proc.set_proc_id(proc_id()); |
| | | MsgCommonReply reply_body; |
| | | return Heartbeat(proc, reply_body, timeout_ms); |
| | | } |
| | | |
| | | bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryTopicReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | #if 1 |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); |
| | | #else |
| | | if (topic_query_cache_.Pick(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | MsgQueryTopicReply rep; |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Store(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); |
| | | #endif |
| | | |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | | return false; |
| | |
| | | return false; |
| | | } |
| | | |
| | | int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) |
| | | { |
| | | int n = 0; |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | MsgQueryTopicReply rep; |
| | | if (QueryTopicAddress(query, rep, timeout_ms)) { |
| | | auto &ls = rep.node_address(); |
| | | n = ls.size(); |
| | | for (auto &na : ls) { |
| | | addr.push_back(na); |
| | | } |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockClient(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | | return true; |
| | | } |
| | | |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { |
| | | if (reply_head.type() == kMsgTypeQueryTopicReply) { |
| | | MsgQueryTopicReply rep; |
| | | if (reply.ParseBody(rep)) { |
| | | addr = rep.address(); |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } |
| | | std::vector<NodeAddress> lst; |
| | | if (QueryRPCTopics(topic, lst, timeout_ms)) { |
| | | addr = lst.front().addr(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | | } else { |
| | | } |
| | | return false; |
| | | } |
| | |
| | | |
| | | // topic node |
| | | bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(const int timeout_ms); |
| | | bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); |
| | | |
| | | // topic rpc server |
| | | typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; |
| | |
| | | |
| | | private: |
| | | bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); |
| | | typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; |
| | | int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); |
| | | const std::string &proc_id() { return info_.proc_id(); } |
| | | |
| | | typedef BHAddress Address; |
| | |
| | | TLMutex mutex; |
| | | // CasMutex mutex; |
| | | auto Lock = [&]() { |
| | | for (int i = 0; i < 1000 * 1000 * 10; ++i) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | mutex.lock(); |
| | | mutex.unlock(); |
| | | } |