| | |
| | | const std::string kTopicNode = Join(kTopicCenterRoot, "node"); |
| | | const std::string kTopicNodeOnline = Join(kTopicNode, "online"); |
| | | const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); |
| | | const std::string kTopicNodeService = Join(kTopicNode, "service"); |
| | | const std::string kTopicNodeSub = Join(kTopicNode, "subscribe"); |
| | | const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe"); |
| | | } // namespace |
| | | |
| | | ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) |
| | |
| | | return; |
| | | } |
| | | // LOG_FUNCTION; |
| | | const size_t total = msgs_.size(); |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t limit = std::max(10000ul, total / 10); |
| | | int64_t n = 0; |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | |
| | | ++n; |
| | | }; |
| | | int n = now - msg.timestamp(); |
| | | if (n < 10) { |
| | | if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > NodeTimeoutSec()) { |
| | | Free(); |
| | | } else { |
| | | ++it; |
| | | } else if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > 60) { |
| | | Free(); |
| | | } |
| | | } |
| | | if (n > 0) { |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | state_.timestamp_ = NowSec() - offline_time; |
| | | state_.flag_ = kStateOffline; |
| | | |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(shm_, kTopicNodeOffline, json.dump()); |
| | | center_.Notify(kTopicNodeOffline, *this); |
| | | } |
| | | |
| | | void NodeCenter::Notify(const Topic &topic, NodeInfo &node) |
| | | { |
| | | if (node.proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", node.proc_.proc_id()); |
| | | Publish(node.shm_, topic, json.dump()); |
| | | } |
| | | void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | | auto old = state_.flag_; |
| | | auto diff = now - state_.timestamp_; |
| | | auto publish = [this](const std::string &topic) { |
| | | if (proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(shm_, topic, json.dump()); |
| | | }; |
| | | |
| | | LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; |
| | | if (diff < offline_time) { |
| | | state_.flag_ = kStateNormal; |
| | | if (old != state_.flag_) { |
| | | publish(kTopicNodeOnline); |
| | | center_.Notify(kTopicNodeOnline, *this); |
| | | } |
| | | } else if (diff < kill_time) { |
| | | state_.flag_ = kStateOffline; |
| | | if (old != state_.flag_) { |
| | | publish(kTopicNodeOffline); |
| | | center_.Notify(kTopicNodeOffline, *this); |
| | | } |
| | | } else { |
| | | state_.flag_ = kStateKillme; |
| | |
| | | |
| | | // create sockets. |
| | | try { |
| | | ShmSocket tmp(shm, true, ssn, 16); |
| | | ShmSocket tmp(shm, ssn, eCreate); |
| | | node->addrs_.emplace(ssn, tmp.AbsAddr()); |
| | | return true; |
| | | } catch (...) { |
| | |
| | | { |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | | } |
| | | |
| | | NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) |
| | | { |
| | | Node node; |
| | | auto ssn = mq_id - (mq_id % 10); |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos != nodes_.end()) { |
| | | node = pos->second; |
| | | } |
| | | return node; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | Node node; |
| | | |
| | | auto FindDest = [&]() { |
| | | auto pos = service_map_.find(head.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | node = cli.weak_node_.lock(); |
| | | if (node && Valid(*node)) { |
| | | dest.id_ = cli.mq_id_; |
| | | dest.offset_ = cli.mq_abs_addr_; |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | }; |
| | | |
| | | if (dest.id_ == 0) { |
| | | if (!FindDest()) { |
| | | LOG_ERROR() << id() << " pass remote request, topic dest not found."; |
| | | return false; |
| | | } |
| | | } else { |
| | | node = GetNode(dest.id_); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(sender.id()); |
| | | route->set_abs_addr(sender.AbsAddr()); |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | // LOG_FUNCTION; |
| | | auto &topic = head.topic(); |
| | | auto clients = DoFindClients(topic, true); |
| | | if (clients.empty()) { return true; } |
| | | |
| | | std::vector<MsgI> msgs; |
| | | auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; |
| | | DEFER1(ReleaseAll();); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto Send1 = [&](Node node) { |
| | | auto &shm = node->shm_; |
| | | for (auto &msg : msgs) { |
| | | if (msg.shm().name() == shm.name()) { |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | return; |
| | | } |
| | | } |
| | | MsgI msg(shm); |
| | | if (msg.Make(head, body_content)) { |
| | | RecordMsg(msg); |
| | | msgs.push_back(msg); |
| | | // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_; |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | }; |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | Send1(node); |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | } |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node) { |
| | | LOG_ERROR() << id() << " pass remote reply , ssn not found."; |
| | | return false; |
| | | } |
| | | auto offset = node->addrs_[dest.id_]; |
| | | if (offset != dest.offset_) { |
| | | LOG_ERROR() << id() << " pass remote reply, dest address not match"; |
| | | return false; |
| | | } |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return DefaultSender(node->shm_).Send(dest, msg); |
| | | } |
| | | |
| | | void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) |
| | |
| | | auto &node = pos->second; |
| | | try { |
| | | for (int i = 0; i < msg.extra_mq_num(); ++i) { |
| | | ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16); |
| | | ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate); |
| | | node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); |
| | | auto addr = reply.add_extra_mqs(); |
| | | addr->set_mq_id(tmp.id()); |
| | |
| | | for (auto &topic : topics) { |
| | | LOG_DEBUG() << "\t" << topic; |
| | | } |
| | | Notify(kTopicNodeService, *node); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | |
| | | *info->mutable_proc() = node->proc_; |
| | | info->mutable_proc()->clear_private_info(); |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | auto AddTopics = [](auto &dst, auto &src) { |
| | | for (auto &addr_topics : src) { |
| | | for (auto &topic : addr_topics.second) { |
| | | dst.add_topic_list(topic); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | AddTopics(*info->mutable_service(), node->services_); |
| | | AddTopics(*info->mutable_local_sub(), node->local_sub_); |
| | | AddTopics(*info->mutable_net_sub(), node->net_sub_); |
| | | }; |
| | | |
| | | if (!proc_id.empty()) { |
| | |
| | | typedef MsgQueryTopicReply Reply; |
| | | |
| | | auto query = [&](Node self) -> Reply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | Reply reply = MakeReply<Reply>(eSuccess); |
| | | auto local = [&]() { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &dest : clients) { |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | } |
| | | } |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | return reply; |
| | | } else { |
| | | }; |
| | | auto net = [&]() { |
| | | auto hosts(FindRemoteRPCServers(req.topic())); |
| | | if (hosts.empty()) { |
| | | return false; |
| | | } else { |
| | | for (auto &ip : hosts) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->mutable_addr()->set_ip(ip); |
| | | } |
| | | return true; |
| | | } |
| | | }; |
| | | local(); |
| | | net(); |
| | | if (reply.node_address_size() == 0) { |
| | | return MakeReply<Reply>(eNotFound, "topic server not found."); |
| | | } else { |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | | void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) |
| | | { |
| | | auto src = SrcAddr(head); |
| | | auto Sub = [&](auto &sub, auto &sub_map) { |
| | | auto &topics = msg.topics().topic_list(); |
| | | sub[src].insert(topics.begin(), topics.end()); |
| | | const TopicDest &dest = {src, SrcAbsAddr(head), node}; |
| | | for (auto &topic : topics) { |
| | | sub_map[topic].insert(dest); |
| | | } |
| | | }; |
| | | if (msg.network()) { |
| | | Sub(net_sub_, center_.net_sub_map_); |
| | | center_.Notify(kTopicNodeSub, *this); |
| | | } else { |
| | | Sub(local_sub_, center_.local_sub_map_); |
| | | } |
| | | } |
| | | |
| | | MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto src = SrcAddr(head); |
| | | auto &topics = msg.topics().topic_list(); |
| | | node->subscriptions_[src].insert(topics.begin(), topics.end()); |
| | | TopicDest dest = {src, SrcAbsAddr(head), node}; |
| | | for (auto &topic : topics) { |
| | | subscribe_map_[topic].insert(dest); |
| | | } |
| | | node->Subscribe(head, msg, node); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto src = SrcAddr(head); |
| | | auto pos = node->subscriptions_.find(src); |
| | | |
| | | auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { |
| | | auto pos = subscribe_map_.find(topic); |
| | | if (pos != subscribe_map_.end() && |
| | | void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) |
| | | { |
| | | auto src = SrcAddr(head); |
| | | |
| | | auto Unsub = [&](auto &sub, auto &sub_map) { |
| | | auto pos = sub.find(src); |
| | | |
| | | auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { |
| | | auto pos = sub_map.find(topic); |
| | | if (pos != sub_map.end() && |
| | | pos->second.erase(dest) != 0 && |
| | | pos->second.empty()) { |
| | | subscribe_map_.erase(pos); |
| | | sub_map.erase(pos); |
| | | } |
| | | }; |
| | | |
| | | if (pos != node->subscriptions_.end()) { |
| | | if (pos != sub.end()) { |
| | | const TopicDest &dest = {src, SrcAbsAddr(head), node}; |
| | | auto &topics = msg.topics().topic_list(); |
| | | // clear node sub records; |
| | |
| | | RemoveSubTopicDestRecord(topic, dest); |
| | | } |
| | | if (pos->second.empty()) { |
| | | node->subscriptions_.erase(pos); |
| | | sub.erase(pos); |
| | | } |
| | | } |
| | | }; |
| | | if (msg.network()) { |
| | | Unsub(net_sub_, center_.net_sub_map_); |
| | | center_.Notify(kTopicNodeUnsub, *this); |
| | | } else { |
| | | Unsub(local_sub_, center_.local_sub_map_); |
| | | } |
| | | } |
| | | |
| | | MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | node->Unsubscribe(head, msg, node); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) |
| | | { |
| | | // LOG_FUNCTION; |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &exact) { |
| | | auto pos = subscribe_map_.find(exact); |
| | | if (pos != subscribe_map_.end()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | if (Valid(cli.weak_node_)) { |
| | | dests.insert(cli); |
| | | auto FindIn = [&](auto &sub_map) { |
| | | auto pos = sub_map.find(exact); |
| | | if (pos != sub_map.end()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | if (node->state_.flag_ == kStateNormal) |
| | | dests.insert(cli); |
| | | } |
| | | |
| | | // if (Valid(cli.weak_node_)) { |
| | | // dests.insert(cli); |
| | | // } |
| | | } |
| | | } |
| | | }; |
| | | if (!from_remote) { |
| | | FindIn(local_sub_map_); |
| | | // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size(); |
| | | } |
| | | // net subscripitions also work in local mode. |
| | | FindIn(net_sub_map_); |
| | | // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size(); |
| | | }; |
| | | Find1(topic); |
| | | |
| | |
| | | return dests; |
| | | } |
| | | |
| | | bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) |
| | | MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) |
| | | { |
| | | bool ret = false; |
| | | HandleMsg(head, [&](Node node) { |
| | | DoFindClients(msg.topic()).swap(out); |
| | | ret = true; |
| | | return HandleMsg(head, [&](Node node) { |
| | | DoPublish(DefaultSender(node->shm_), topic, msg); |
| | | return MakeReply(eSuccess); |
| | | }).Swap(&reply); |
| | | return ret; |
| | | }); |
| | | } |
| | | |
| | | void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) |
| | | { |
| | | try { |
| | | auto clients = DoFindClients(topic, false); |
| | | if (clients.empty()) { return; } |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | } |
| | | } catch (...) { |
| | | LOG_ERROR() << "DoPublish error."; |
| | | } |
| | | } |
| | | |
| | | void NodeCenter::OnTimer() |
| | |
| | | } |
| | | }; |
| | | EraseMapRec(service_map_, node->services_); |
| | | EraseMapRec(subscribe_map_, node->subscriptions_); |
| | | EraseMapRec(local_sub_map_, node->local_sub_); |
| | | EraseMapRec(net_sub_map_, node->net_sub_); |
| | | |
| | | // remove online record. |
| | | auto pos = online_node_addr_map_.find(node->proc_.proc_id()); |
| | |
| | | void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) |
| | | { |
| | | try { |
| | | // LOG_DEBUG() << "center publish: " << topic << ": " << content; |
| | | Clients clients(DoFindClients(topic)); |
| | | if (clients.empty()) { return; } |
| | | |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(content); |
| | |
| | | if (msg.Make(head, pub)) { |
| | | DEFER1(msg.Release()); |
| | | RecordMsg(msg); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node && node->state_.flag_ == kStateNormal) { |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | } |
| | | DoPublish(DefaultSender(shm), topic, msg); |
| | | } |
| | | |
| | | } catch (...) { |
| | | LOG_ERROR() << "center publish error."; |
| | | } |
| | | } |
| | | |
| | | void NodeCenter::NetRecords::ParseData(const ssjson::Json &info) |
| | | { |
| | | // LOG_FUNCTION; |
| | | sub_hosts_.clear(); |
| | | rpc_hosts_.clear(); |
| | | for (auto &host : info.array()) { |
| | | if (host.get("isLocal", false)) { |
| | | host_id_ = host.get("serverId", ""); |
| | | ip_ = host.get("ip", ""); |
| | | } else { |
| | | auto ip = host.get("ip", ""); |
| | | auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) { |
| | | for (auto &topic : lot) { |
| | | auto t = topic.get_value<std::string>(); |
| | | rec[t].insert(ip); |
| | | // LOG_DEBUG() << "net topic: " << t << ", " << ip; |
| | | } |
| | | }; |
| | | // LOG_DEBUG() << "serives:"; |
| | | UpdateRec(host.child("pubTopics").array(), rpc_hosts_); |
| | | // LOG_DEBUG() << "net sub:"; |
| | | UpdateRec(host.child("netSubTopics").array(), sub_hosts_); |
| | | } |
| | | } |
| | | } |