| | |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func SubscribeNet(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeNetTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := proc.Marshal() |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) |
| | |
| | | replyer(reply); |
| | | auto hosts = center->FindRemoteSubClients(pub.topic()); |
| | | for (auto &host : hosts) { |
| | | tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString()); |
| | | tcp_proxy.Publish(host, kBHCenterPort, msg.content()); |
| | | } |
| | | } |
| | | }; |
| | |
| | | namespace |
| | | { |
| | | const std::string &kTopicQueryProc = "#center_query_procs"; |
| | | const std::string &kTopicNotifyRemoteInfo = "pub-allRegisterInfo-to-center"; |
| | | |
| | | std::string ToJson(const MsgQueryProcReply &qpr) |
| | | { |
| | |
| | | throw std::runtime_error("center node register failed."); |
| | | } |
| | | |
| | | MsgTopicList topics; |
| | | topics.add_topic_list(kTopicQueryProc); |
| | | if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) { |
| | | MsgTopicList services; |
| | | services.add_topic_list(kTopicQueryProc); |
| | | if (!pnode_->DoServerRegisterRPC(true, services, reply, timeout)) { |
| | | throw std::runtime_error("center node register topics failed."); |
| | | } |
| | | MsgTopicList subs; |
| | | |
| | | subs.add_topic_list(kTopicNotifyRemoteInfo); |
| | | if (!pnode_->Subscribe(subs, reply, timeout)) { |
| | | throw std::runtime_error("center node subscribe topics failed."); |
| | | } |
| | | |
| | | auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) { |
| | |
| | | pnode_->ServerSendReply(src_info, reply); |
| | | }; |
| | | |
| | | auto OnSubRecv = [&](const std::string &proc_id, const MsgPublish &data) { |
| | | if (data.topic() == kTopicNotifyRemoteInfo) { |
| | | // parse other data. |
| | | // LOG_DEBUG() << "center got net info."; |
| | | ssjson::Json js; |
| | | if (js.parse(data.data())) { |
| | | if (js.is_array()) { |
| | | auto ¢er = *pscenter_; |
| | | center->ParseNetInfo(js); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | |
| | | bool cur = false; |
| | | if (run_.compare_exchange_strong(cur, true)) { |
| | | auto heartbeat = [this]() { |
| | |
| | | } |
| | | }; |
| | | std::thread(heartbeat).swap(worker_); |
| | | return pnode_->ServerStart(onRequest); |
| | | return pnode_->ServerStart(onRequest) && pnode_->SubscribeStartWorker(OnSubRecv); |
| | | } else { |
| | | return false; |
| | | } |
| | |
| | | |
| | | bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | // LOG_FUNCTION; |
| | | auto &topic = head.topic(); |
| | | auto clients = DoFindClients(topic, true); |
| | | if (clients.empty()) { return true; } |
| | |
| | | } |
| | | } |
| | | MsgI msg(shm); |
| | | if (msg.Make(body_content)) { |
| | | if (msg.Make(head, body_content)) { |
| | | RecordMsg(msg); |
| | | msgs.push_back(msg); |
| | | // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | }; |
| | |
| | | typedef MsgQueryTopicReply Reply; |
| | | |
| | | auto query = [&](Node self) -> Reply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | auto local = [&]() { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | return reply; |
| | | } else { |
| | | }; |
| | | auto net = [&]() { |
| | | auto hosts(FindRemoteRPCServers(req.topic())); |
| | | if (hosts.empty()) { |
| | | return false; |
| | | } else { |
| | | for (auto &ip : hosts) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->mutable_addr()->set_ip(ip); |
| | | } |
| | | return true; |
| | | } |
| | | }; |
| | | local(); |
| | | net(); |
| | | if (reply.node_address_size() == 0) { |
| | | return MakeReply<Reply>(eNotFound, "topic server not found."); |
| | | } else { |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | |
| | | sub_map[topic].insert(dest); |
| | | } |
| | | }; |
| | | LOG_DEBUG() << "subscribe net : " << msg.network(); |
| | | if (msg.network()) { |
| | | Sub(net_sub_, center_.net_sub_map_); |
| | | center_.Notify(kTopicNodeSub, *this); |
| | |
| | | |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) |
| | | { |
| | | // LOG_FUNCTION; |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &exact) { |
| | | auto FindIn = [&](auto &sub_map) { |
| | |
| | | }; |
| | | if (!from_remote) { |
| | | FindIn(local_sub_map_); |
| | | // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); |
| | | } |
| | | // net subscripitions also work in local mode. |
| | | FindIn(net_sub_map_); |
| | | // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); |
| | | }; |
| | | Find1(topic); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) |
| | | void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) |
| | | { |
| | | //TODO search synced full list; |
| | | return std::vector<std::string>(); |
| | | // LOG_FUNCTION; |
| | | sub_hosts_.clear(); |
| | | rpc_hosts_.clear(); |
| | | for (auto &host : info.array()) { |
| | | if (host.get("isLocal", false)) { |
| | | host_id_ = host.get("serverId", ""); |
| | | ip_ = host.get("ip", ""); |
| | | } else { |
| | | auto ip = host.get("ip", ""); |
| | | auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { |
| | | for (auto &topic : lot) { |
| | | auto t = topic.get_value<std::string>(); |
| | | rec[t].insert(ip); |
| | | // LOG_DEBUG() << "net topic: " << t << ", " << ip; |
| | | } |
| | | }; |
| | | // LOG_DEBUG() << "serives:"; |
| | | UpdateRec(host.child("pubTopics").array(), rpc_hosts_); |
| | | // LOG_DEBUG() << "net sub:"; |
| | | UpdateRec(host.child("netSubTopics").array(), sub_hosts_); |
| | | } |
| | | } |
| | | } |
| | |
| | | #ifndef NODE_CENTER_KY67RJ1Q |
| | | #define NODE_CENTER_KY67RJ1Q |
| | | |
| | | #include "json.h" |
| | | #include "shm_socket.h" |
| | | #include <unordered_map> |
| | | |
| | |
| | | void OnTimer(); |
| | | |
| | | // remote hosts records |
| | | std::vector<std::string> FindRemoteSubClients(const Topic &topic); |
| | | std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); } |
| | | std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); } |
| | | void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); } |
| | | |
| | | private: |
| | | void CheckNodes(); |
| | |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | | int64_t last_check_time_; |
| | | |
| | | // net hosts info |
| | | class NetRecords |
| | | { |
| | | public: |
| | | typedef std::set<std::string> Hosts; |
| | | void ParseData(const ssjson::Json &input); |
| | | Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); } |
| | | Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); } |
| | | |
| | | private: |
| | | typedef std::unordered_map<Topic, Hosts> TopicMap; |
| | | TopicMap sub_hosts_; |
| | | TopicMap rpc_hosts_; |
| | | Hosts FindHosts(const Topic &topic, const TopicMap &tmap) |
| | | { |
| | | auto pos = tmap.find(topic); |
| | | if (pos != tmap.end()) { |
| | | return pos->second; |
| | | } else { |
| | | return Hosts(); |
| | | } |
| | | } |
| | | std::string host_id_; |
| | | std::string ip_; |
| | | }; |
| | | NetRecords net_records_; |
| | | }; |
| | | |
| | | #endif // end of include guard: NODE_CENTER_KY67RJ1Q |
| | |
| | | if (4 > len) { return false; } |
| | | uint32_t head_len = Get32(p); |
| | | if (head_len > 1024 * 4) { |
| | | throw std::runtime_error("unexpected tcp reply data."); |
| | | throw std::runtime_error("unexpected tcp data head."); |
| | | } |
| | | auto before_body = 4 + head_len + 4; |
| | | if (before_body > len) { |
| | |
| | | { |
| | | return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApi_In1_Out1<MsgTopicList>(&TopicNode::SubscribeNet, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | int BHSubscribeNetTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | typedef void (*FSubDataCallback)(const void *proc_id, |
| | | int proc_id_len, |
| | |
| | | BHQueryTopicAddress; |
| | | BHQueryProcs; |
| | | BHSubscribeTopics; |
| | | BHSubscribeNetTopics; |
| | | BHStartWorker; |
| | | BHHeartbeatEasy; |
| | | BHHeartbeat; |
| | |
| | | |
| | | // subscribe |
| | | |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | |
| | | try { |
| | | auto &sock = SockSub(); |
| | | MsgSubscribe sub; |
| | | sub.set_network(net); |
| | | sub.mutable_topics()->Swap(&topics); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); |
| | |
| | | // subscribe |
| | | typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB; |
| | | bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2); |
| | | bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | return DoSubscribe(topics, false, reply_body, timeout_ms); |
| | | } |
| | | bool SubscribeNet(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | return DoSubscribe(topics, true, reply_body, timeout_ms); |
| | | } |
| | | bool DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); |
| | | |
| | | void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "bh_api.h" |
| | | #include "json.h" |
| | | #include "robust.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | |
| | | } |
| | | |
| | | { // Subscribe |
| | | MsgTopicList topics; |
| | | topics.add_topic_list("#center.node"); |
| | | for (int i = 0; i < 10; ++i) { |
| | | topics.add_topic_list(topic_ + std::to_string(i * 2)); |
| | | } |
| | | std::string s = topics.SerializeAsString(); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | BHFree(reply, reply_len); |
| | | printf("subscribe topic : %s\n", r ? "ok" : "failed"); |
| | | auto Subscribe = [&](std::string topic, bool net) { |
| | | MsgTopicList topics; |
| | | topics.add_topic_list(topic); |
| | | for (int i = 0; i < 10; ++i) { |
| | | topics.add_topic_list(topic_ + std::to_string(i * 2)); |
| | | } |
| | | std::string s = topics.SerializeAsString(); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = false; |
| | | if (net) { |
| | | r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | } else { |
| | | r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | } |
| | | BHFree(reply, reply_len); |
| | | printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed")); |
| | | }; |
| | | Subscribe("#center.node", false); |
| | | Subscribe("local0", false); |
| | | Subscribe("net0", true); |
| | | } |
| | | |
| | | auto ServerLoop = [&](std::atomic<bool> *run) { |
| | |
| | | } |
| | | }; |
| | | |
| | | std::atomic<bool> run(true); |
| | | ThreadManager threads; |
| | | #if 1 |
| | | BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | #else |
| | | BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); |
| | | threads.Launch(ServerLoop, &run); |
| | | #endif |
| | | |
| | | auto Publish = [&](const std::string &topic, const std::string &data) { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data); |
| | | std::string s(pub.SerializeAsString()); |
| | | BHPublish(s.data(), s.size(), 0); |
| | | }; |
| | | |
| | | { |
| | | // publish |
| | | Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a')); |
| | | for (int i = 0; i < 1; ++i) { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic_ + std::to_string(i)); |
| | | pub.set_data("pub_data_" + std::string(104 * 1, 'a')); |
| | | std::string s(pub.SerializeAsString()); |
| | | BHPublish(s.data(), s.size(), 0); |
| | | // Sleep(1s); |
| | | |
| | | ssjson::Json net = ssjson::Json::Array(); |
| | | ssjson::Json host; |
| | | host.put("serverId", "test_host"); |
| | | host.put("ip", "127.0.0.1"); |
| | | ssjson::Json topics = ssjson::Json::Array(); |
| | | topics.push_back("aaaaa"); |
| | | topics.push_back("bbbbb"); |
| | | host.put("pubTopics", topics); |
| | | topics.push_back("net0"); |
| | | topics.push_back("net1"); |
| | | host.put("netSubTopics", topics); |
| | | net.push_back(host); |
| | | |
| | | Publish("pub-allRegisterInfo-to-center", net.dump()); |
| | | Sleep(1s); |
| | | Publish("local0", "local-abcd0"); |
| | | Publish("net0", "net-abcd0"); |
| | | Publish("local0", "local-abcd1"); |
| | | Sleep(1s); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | }; |
| | | |
| | | std::atomic<bool> run(true); |
| | | |
| | | ThreadManager threads; |
| | | |
| | | #if 1 |
| | | BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | #else |
| | | BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); |
| | | threads.Launch(ServerLoop, &run); |
| | | #endif |
| | | |
| | | boost::timer::auto_cpu_timer timer; |
| | | threads.Launch(hb, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const int64_t nreq = 1000 * 100; |
| | | const int64_t nreq = 1000; //* 100; |
| | | |
| | | for (int i = 0; i < 10; ++i) { |
| | | SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i)); |