| | |
| | | |
| | | } |
| | | |
| | | func QueryProcs(dest_addr *bh.BHAddress, topic *bh.MsgQueryProc, reply *bh.MsgQueryProcReply, timeout_ms int) bool { |
| | | dest, _ := dest_addr.Marshal() |
| | | data, _ := topic.Marshal() |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHQueryProcs(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 |
| | | if r { |
| | | reply.Unmarshal(C.GoBytes(creply, creply_len)) |
| | | } |
| | | return r |
| | | |
| | | } |
| | | |
| | | func Publish(pub *bh.MsgPublish, timeout_ms int) bool { |
| | | data, _ := pub.Marshal() |
| | | return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0 |
| | |
| | | proc_id := "test_proc" |
| | | proc := bh.ProcInfo{} |
| | | proc.ProcId = []byte(proc_id) |
| | | fmt.Println("proc id: ", proc.ProcId) |
| | | reply := bh.MsgCommonReply{} |
| | | defer Cleanup() |
| | | |
| | |
| | | } else { |
| | | fmt.Println("reg topics failed") |
| | | } |
| | | |
| | | req := bh.MsgRequestTopic{} |
| | | time.Sleep(time.Second * 1) |
| | | req.Topic = []byte("topic0") |
| | |
| | | pid := "" |
| | | rr := bh.MsgRequestTopicReply{} |
| | | dest := bh.BHAddress{} |
| | | for i := 0; i < 100; i++ { |
| | | |
| | | queryProc := bh.MsgQueryProc{} |
| | | queryReply := bh.MsgQueryProcReply{} |
| | | |
| | | QueryProcs(&dest, &queryProc, &queryReply, 3000) |
| | | fmt.Println("query result:", string(queryReply.ProcList[0].Proc.ProcId)) |
| | | |
| | | for i := 0; i < 10; i++ { |
| | | if Request(&dest, &req, &pid, &rr, 3000) { |
| | | fmt.Println("server:" + pid + ", reply:" + string(rr.Data)) |
| | | } else { |
| | |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req) |
| | | { |
| | | typedef MsgQueryProcReply Reply; |
| | | auto query = [&](Node self) -> Reply { |
| | | auto Add1 = [](Reply &reply, Node node) { |
| | | auto info = reply.add_proc_list(); |
| | | *info->mutable_proc() = node->proc_; |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | if (!req.proc_id().empty()) { |
| | | auto pos = online_node_addr_map_.find(req.proc_id()); |
| | | if (pos == online_node_addr_map_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc not found."); |
| | | } else { |
| | | auto node_pos = nodes_.find(pos->second); |
| | | if (node_pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc node not found."); |
| | | } else { |
| | | auto reply = MakeReply<Reply>(eSuccess); |
| | | Add1(reply, node_pos->second); |
| | | return reply; |
| | | } |
| | | } |
| | | } else { |
| | | Reply reply(MakeReply<Reply>(eSuccess)); |
| | | for (auto &kv : nodes_) { |
| | | Add1(reply, kv.second); |
| | | } |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) |
| | | { |
| | | typedef MsgQueryTopicReply Reply; |
| | | |
| | | auto query = [&](Node self) -> MsgQueryTopicReply { |
| | | auto query = [&](Node self) -> Reply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | |
| | | |
| | | CASE_ON_MSG_TYPE(RegisterRPC); |
| | | CASE_ON_MSG_TYPE(QueryTopic); |
| | | CASE_ON_MSG_TYPE(QueryProc); |
| | | default: return false; |
| | | } |
| | | }; |
| | |
| | | // kMsgTypeUnsubscribeReply = 25; |
| | | kMsgTypeUnregister = 26; |
| | | // kMsgTypeUnregisterReply = 27; |
| | | kMsgTypeQueryProc = 28; |
| | | kMsgTypeQueryProcReply = 29; |
| | | |
| | | } |
| | | |
| | |
| | | remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHQueryProcs(const void *remote, |
| | | const int remote_len, |
| | | const void *query, |
| | | const int query_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | return BHApi_In2_Out1<BHAddress, MsgQueryProc, MsgQueryProcReply>( |
| | | &TopicNode::QueryProcs, |
| | | remote, remote_len, query, query_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHQueryProcs(const void *remote, |
| | | const int remote_len, |
| | | const void *query, |
| | | const int query_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHSubscribeTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | |
| | | BHUnregister; |
| | | BHRegisterTopics; |
| | | BHQueryTopicAddress; |
| | | BHQueryProcs; |
| | | BHSubscribeTopics; |
| | | BHStartWorker; |
| | | BHHeartbeatEasy; |
| | |
| | | |
| | | RefCount count_; |
| | | const uint32_t tag_ = kMsgTag; |
| | | const uint32_t size_ = 0; |
| | | const uint32_t capacity_ = 0; |
| | | const int64_t id_ = 0; |
| | | std::atomic<int64_t> timestamp_; |
| | | bool managed_ = false; |
| | | uint32_t size_ = 0; |
| | | Meta(uint32_t size) : |
| | | size_(size), id_(NewId()), timestamp_(NowSec()) {} |
| | | capacity_(size), id_(NewId()), timestamp_(NowSec()) {} |
| | | }; |
| | | OffsetType offset_; |
| | | static void *Alloc(const size_t size) |
| | |
| | | }; |
| | | Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); }); |
| | | Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | meta()->size_ = 4 + head_len + 4 + body_len; |
| | | } |
| | | return addr; |
| | | } |
| | |
| | | void *addr = get(); |
| | | if (addr) { |
| | | memcpy(addr, content.data(), content.size()); |
| | | meta()->size_ = content.size(); |
| | | } |
| | | return addr; |
| | | } |
| | |
| | | uint32_t head_len = head.ByteSizeLong(); |
| | | uint32_t body_len = body.ByteSizeLong(); |
| | | uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; |
| | | return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len); |
| | | return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len); |
| | | } |
| | | |
| | | inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); } |
| | | inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); } |
| | | inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); } |
| | | |
| | | inline bool Make(const size_t size) { return Make(Alloc(size)); } |
| | | |
| | |
| | | p += 4; |
| | | return head.ParseFromArray(p, msg_size); |
| | | } |
| | | std::string content() const |
| | | { |
| | | auto p = get<char>(); |
| | | return p ? std::string(p, meta()->size_) : std::string(); |
| | | } |
| | | std::string body() const |
| | | { |
| | | auto p = get<char>(); |
| | |
| | | BHOME_SIMPLE_MAP_MSG(Unsubscribe); |
| | | BHOME_SIMPLE_MAP_MSG(ProcInit); |
| | | BHOME_SIMPLE_MAP_MSG(ProcInitReply); |
| | | BHOME_SIMPLE_MAP_MSG(QueryProc); |
| | | BHOME_SIMPLE_MAP_MSG(QueryProcReply); |
| | | |
| | | #undef BHOME_SIMPLE_MAP_MSG |
| | | #undef BHOME_MAP_MSG_AND_TYPE |
| | |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryProcReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | |
| | | bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(const int timeout_ms); |
| | | bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); |
| | | bool QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms); |
| | | |
| | | // topic rpc server |
| | | typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; |
| | |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | BHFree(reply, reply_len); |
| | | DEFER1(BHFree(reply, reply_len)); |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | // Sleep(1s); |
| | | } |
| | | { |
| | | // query procs |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | | MsgQueryProc query; |
| | | std::string s = query.SerializeAsString(); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHQueryProcs(dest.data(), dest.size(), s.data(), s.size(), &reply, &reply_len, 1000); |
| | | DEFER1(BHFree(reply, reply_len)); |
| | | MsgQueryProcReply result; |
| | | if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) { |
| | | printf("query proc result: %d\n", result.proc_list().size()); |
| | | for (int i = 0; i < result.proc_list().size(); ++i) { |
| | | auto &info = result.proc_list(i); |
| | | printf("proc [%d] %s, %s, %s\n\ttopics\n", i, |
| | | (info.online() ? "online" : "offline"), |
| | | info.proc().proc_id().c_str(), info.proc().name().c_str()); |
| | | for (auto &t : info.topics().topic_list()) { |
| | | printf("\t\t %s", t.c_str()); |
| | | } |
| | | } |
| | | } else { |
| | | printf("query proc error\n"); |
| | | } |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | // Sleep(1s); |
| | | } |
| | |
| | | int ncli = 10; |
| | | const int64_t nreq = 1000 * 100; |
| | | |
| | | #if 1 |
| | | for (int i = 0; i < ncli; ++i) { |
| | | threads.Launch(asyncRequest, nreq); |
| | | } |
| | | #else |
| | | for (int i = 0; i < 100; ++i) { |
| | | SyncRequest(i); |
| | | } |
| | | #endif |
| | | |
| | | for (int i = 0; i < ncli; ++i) { |
| | | threads.Launch(asyncRequest, nreq); |
| | | } |
| | | |
| | | int same = 0; |
| | | uint64_t last = 0; |