refactor, start tcp pub/sub.
| | |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | | MsgCommonReply reply; |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | if (head.route_size() == 1 && msg.ParseBody(pub)) { |
| | | // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? |
| | | auto reply(center->Publish(head, pub.topic(), msg)); |
| | | replyer(reply); |
| | | } else { |
| | | replyer(MakeReply(eSuccess)); |
| | | if (clients.empty()) { return; } |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | auto hosts = center->FindRemoteSubClients(pub.topic()); |
| | | for (auto &host : hosts) { |
| | | tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString()); |
| | | } |
| | | } |
| | | }; |
| | |
| | | proc.put("name", info.proc().name()); |
| | | proc.put("publicInfo", info.proc().public_info()); |
| | | proc.put("online", info.online()); |
| | | auto AddTopics = [&](auto &name, auto &topic_list) { |
| | | Json topics = Json::Array(); |
| | | for (auto &t : info.topics().topic_list()) { |
| | | for (auto &t : topic_list) { |
| | | topics.push_back(t); |
| | | } |
| | | proc.put("topics", topics); |
| | | proc.put(name, topics); |
| | | }; |
| | | AddTopics("service", info.service().topic_list()); |
| | | AddTopics("local_sub", info.local_sub().topic_list()); |
| | | AddTopics("net_sub", info.net_sub().topic_list()); |
| | | |
| | | list.push_back(proc); |
| | | } |
| | | return json.dump(0); |
| | |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | auto &topic = head.topic(); |
| | | auto clients = DoFindClients(topic, true); |
| | | if (clients.empty()) { return true; } |
| | | |
| | | std::vector<MsgI> msgs; |
| | | auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } }; |
| | | DEFER1(ReleaseAll();); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto Send1 = [&](Node node) { |
| | | auto &shm = node->shm_; |
| | | for (auto &msg : msgs) { |
| | | if (msg.shm().name() == shm.name()) { |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | return; |
| | | } |
| | | } |
| | | MsgI msg(shm); |
| | | if (msg.Make(body_content)) { |
| | | RecordMsg(msg); |
| | | msgs.push_back(msg); |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | }; |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | Send1(node); |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | } |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | |
| | | *info->mutable_proc() = node->proc_; |
| | | info->mutable_proc()->clear_private_info(); |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | auto AddTopics = [](auto &dst, auto &src) { |
| | | for (auto &addr_topics : src) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | dst.add_topic_list(topic); |
| | | } |
| | | } |
| | | }; |
| | | AddTopics(*info->mutable_service(), node->services_); |
| | | AddTopics(*info->mutable_local_sub(), node->local_sub_); |
| | | AddTopics(*info->mutable_net_sub(), node->net_sub_); |
| | | }; |
| | | |
| | | if (!proc_id.empty()) { |
| | |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | | void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node) |
| | | { |
| | | auto src = SrcAddr(head); |
| | | auto Sub = [&](auto &sub, auto &sub_map) { |
| | | auto &topics = msg.topics().topic_list(); |
| | | sub[src].insert(topics.begin(), topics.end()); |
| | | const TopicDest &dest = {src, SrcAbsAddr(head), node}; |
| | | for (auto &topic : topics) { |
| | | sub_map[topic].insert(dest); |
| | | } |
| | | }; |
| | | LOG_DEBUG() << "subscribe net : " << msg.network(); |
| | | if (msg.network()) { |
| | | Sub(net_sub_, center_.net_sub_map_); |
| | | } else { |
| | | Sub(local_sub_, center_.local_sub_map_); |
| | | } |
| | | } |
| | | |
| | | MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto src = SrcAddr(head); |
| | | auto &topics = msg.topics().topic_list(); |
| | | node->subscriptions_[src].insert(topics.begin(), topics.end()); |
| | | TopicDest dest = {src, SrcAbsAddr(head), node}; |
| | | for (auto &topic : topics) { |
| | | subscribe_map_[topic].insert(dest); |
| | | } |
| | | node->Subscribe(head, msg, node); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto src = SrcAddr(head); |
| | | auto pos = node->subscriptions_.find(src); |
| | | |
| | | auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { |
| | | auto pos = subscribe_map_.find(topic); |
| | | if (pos != subscribe_map_.end() && |
| | | void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node) |
| | | { |
| | | auto src = SrcAddr(head); |
| | | |
| | | auto Unsub = [&](auto &sub, auto &sub_map) { |
| | | auto pos = sub.find(src); |
| | | |
| | | auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) { |
| | | auto pos = sub_map.find(topic); |
| | | if (pos != sub_map.end() && |
| | | pos->second.erase(dest) != 0 && |
| | | pos->second.empty()) { |
| | | subscribe_map_.erase(pos); |
| | | sub_map.erase(pos); |
| | | } |
| | | }; |
| | | |
| | | if (pos != node->subscriptions_.end()) { |
| | | if (pos != sub.end()) { |
| | | const TopicDest &dest = {src, SrcAbsAddr(head), node}; |
| | | auto &topics = msg.topics().topic_list(); |
| | | // clear node sub records; |
| | |
| | | RemoveSubTopicDestRecord(topic, dest); |
| | | } |
| | | if (pos->second.empty()) { |
| | | node->subscriptions_.erase(pos); |
| | | sub.erase(pos); |
| | | } |
| | | } |
| | | }; |
| | | if (msg.network()) { |
| | | Unsub(net_sub_, center_.net_sub_map_); |
| | | } else { |
| | | Unsub(local_sub_, center_.local_sub_map_); |
| | | } |
| | | } |
| | | |
| | | MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | node->Unsubscribe(head, msg, node); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote) |
| | | { |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &exact) { |
| | | auto pos = subscribe_map_.find(exact); |
| | | if (pos != subscribe_map_.end()) { |
| | | auto FindIn = [&](auto &sub_map) { |
| | | auto pos = sub_map.find(exact); |
| | | if (pos != sub_map.end()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | if (Valid(cli.weak_node_)) { |
| | |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | if (!from_remote) { |
| | | FindIn(local_sub_map_); |
| | | } |
| | | FindIn(net_sub_map_); |
| | | }; |
| | | Find1(topic); |
| | | |
| | |
| | | return dests; |
| | | } |
| | | |
| | | bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) |
| | | MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg) |
| | | { |
| | | bool ret = false; |
| | | HandleMsg(head, [&](Node node) { |
| | | DoFindClients(msg.topic()).swap(out); |
| | | ret = true; |
| | | return HandleMsg(head, [&](Node node) { |
| | | DoPublish(DefaultSender(node->shm_), topic, msg); |
| | | return MakeReply(eSuccess); |
| | | }).Swap(&reply); |
| | | return ret; |
| | | }); |
| | | } |
| | | |
| | | void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg) |
| | | { |
| | | try { |
| | | auto clients = DoFindClients(topic, false); |
| | | if (clients.empty()) { return; } |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | } |
| | | } catch (...) { |
| | | LOG_ERROR() << "DoPublish error."; |
| | | } |
| | | } |
| | | |
| | | void NodeCenter::OnTimer() |
| | |
| | | } |
| | | }; |
| | | EraseMapRec(service_map_, node->services_); |
| | | EraseMapRec(subscribe_map_, node->subscriptions_); |
| | | EraseMapRec(local_sub_map_, node->local_sub_); |
| | | EraseMapRec(net_sub_map_, node->net_sub_); |
| | | |
| | | // remove online record. |
| | | auto pos = online_node_addr_map_.find(node->proc_.proc_id()); |
| | |
| | | void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) |
| | | { |
| | | try { |
| | | // LOG_DEBUG() << "center publish: " << topic << ": " << content; |
| | | Clients clients(DoFindClients(topic)); |
| | | if (clients.empty()) { return; } |
| | | |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(content); |
| | |
| | | if (msg.Make(head, pub)) { |
| | | DEFER1(msg.Release()); |
| | | RecordMsg(msg); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node && node->state_.flag_ == kStateNormal) { |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | } |
| | | DoPublish(DefaultSender(shm), topic, msg); |
| | | } |
| | | |
| | | } catch (...) { |
| | | LOG_ERROR() << "center publish error."; |
| | | } |
| | | } |
| | | |
| | | std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic) |
| | | { |
| | | //TODO search synced full list; |
| | | return std::vector<std::string>(); |
| | | } |
| | |
| | | }; |
| | | typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; |
| | | |
| | | struct NodeInfo; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct NodeInfo { |
| | | NodeCenter ¢er_; |
| | | SharedMemory &shm_; |
| | |
| | | std::map<MQId, int64_t> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | AddressTopics local_sub_; // address: topics |
| | | AddressTopics net_sub_; // address: topics |
| | | NodeInfo(NodeCenter ¢er, SharedMemory &shm) : |
| | | center_(center), shm_(shm) {} |
| | | void PutOffline(const int64_t offline_time); |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); |
| | | void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); |
| | | void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct TopicDest { |
| | | MQId mq_id_; |
| | |
| | | void RecordMsg(const MsgI &msg); |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); |
| | | |
| | | bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); |
| | | bool RemotePublish(BHMsgHead &head, const std::string &body_content); |
| | | bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); |
| | | void OnAlloc(ShmSocket &socket, const int64_t val); |
| | | void OnFree(ShmSocket &socket, const int64_t val); |
| | |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); |
| | | MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); |
| | | MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); |
| | | Clients DoFindClients(const std::string &topic); |
| | | bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); |
| | | MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); |
| | | |
| | | void OnTimer(); |
| | | |
| | | // remote hosts records |
| | | std::vector<std::string> FindRemoteSubClients(const Topic &topic); |
| | | |
| | | private: |
| | | void CheckNodes(); |
| | | bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } |
| | | void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); |
| | | void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); |
| | | Clients DoFindClients(const std::string &topic, bool from_remote); |
| | | bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } |
| | | bool Valid(const WeakNode &weak) |
| | | { |
| | |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<Topic, Clients> local_sub_map_; |
| | | std::unordered_map<Topic, Clients> net_sub_map_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<ProcId, Address> online_node_addr_map_; |
| | | ProcRecords procs_; // To get a short index for msg alloc. |
| | |
| | | send_buffer_ = imsg.content(); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | }; |
| | | |
| | | auto &scenter = *pscenter_; |
| | | if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | auto reply = MakeReply(eSuccess); |
| | | auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); |
| | | send_buffer_ = MsgI::Serialize(rep_head, reply); |
| | | async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); |
| | | |
| | | scenter->RemotePublish(head, body_content); |
| | | return; |
| | | } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { |
| | | return; |
| | | } else { |
| | | Close(); |
| | |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool TcpProxy::Publish(const std::string &ip, int port, std::string &&content) |
| | | { |
| | | return Request(ip, port, std::move(content), ReplyCB()); |
| | | } |
| | |
| | | TcpProxy(io_service_t &io) : |
| | | io_(io) {} |
| | | bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb); |
| | | bool Publish(const std::string &ip, int port, std::string &&content); |
| | | |
| | | private: |
| | | io_service_t &io_; |
| | |
| | | |
| | | message MsgSubscribe { |
| | | MsgTopicList topics = 1; |
| | | bool network = 2; |
| | | } |
| | | message MsgUnsubscribe { |
| | | MsgTopicList topics = 1; |
| | | bool network = 2; |
| | | } |
| | | message MsgRegisterRPC { |
| | | MsgTopicList topics = 1; |
| | |
| | | message Info { |
| | | ProcInfo proc = 1; |
| | | bool online = 2; |
| | | MsgTopicList topics = 3; |
| | | MsgTopicList service = 3; |
| | | MsgTopicList local_sub = 4; |
| | | MsgTopicList net_sub = 5; |
| | | } |
| | | repeated Info proc_list = 2; |
| | | } |
| | |
| | | auto &sock = SockPub(); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | head.set_topic(pub.topic()); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BusAddr(), head, pub); |
| | |
| | | printf("proc [%d] %s, %s, %s\n\ttopics\n", i, |
| | | (info.online() ? "online" : "offline"), |
| | | info.proc().proc_id().c_str(), info.proc().name().c_str()); |
| | | for (auto &t : info.topics().topic_list()) { |
| | | printf("\t\t %s\n", t.c_str()); |
| | | auto PrintTopics = [](std::string const &name, auto &topic_list) { |
| | | printf("%s:[", name.c_str()); |
| | | for (auto &t : topic_list) { |
| | | printf("%s,", t.c_str()); |
| | | } |
| | | printf("]\n"); |
| | | }; |
| | | PrintTopics("service", info.service().topic_list()); |
| | | PrintTopics("local_sub", info.local_sub().topic_list()); |
| | | PrintTopics("net_sub", info.net_sub().topic_list()); |
| | | printf("\n"); |
| | | } |
| | | printf("\n"); |
| | | }; |
| | | if (0) { |
| | | if (1) { |
| | | // query procs |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | | MsgQueryProc query; |