From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 23 六月 2021 19:43:29 +0800 Subject: [PATCH] refactor, start tcp pub/sub. --- box/node_center.h | 24 ++++++++++++++++++------ 1 files changed, 18 insertions(+), 6 deletions(-) diff --git a/box/node_center.h b/box/node_center.h index 74dd52f..54f84c0 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -82,6 +82,10 @@ }; typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; + struct NodeInfo; + typedef std::shared_ptr<NodeInfo> Node; + typedef std::weak_ptr<NodeInfo> WeakNode; + struct NodeInfo { NodeCenter ¢er_; SharedMemory &shm_; @@ -89,14 +93,15 @@ std::map<MQId, int64_t> addrs_; // registered mqs ProcInfo proc_; // AddressTopics services_; // address: topics - AddressTopics subscriptions_; // address: topics + AddressTopics local_sub_; // address: topics + AddressTopics net_sub_; // address: topics NodeInfo(NodeCenter ¢er, SharedMemory &shm) : center_(center), shm_(shm) {} void PutOffline(const int64_t offline_time); void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); + void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node); + void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node); }; - typedef std::shared_ptr<NodeInfo> Node; - typedef std::weak_ptr<NodeInfo> WeakNode; struct TopicDest { MQId mq_id_; @@ -121,7 +126,9 @@ void RecordMsg(const MsgI &msg); bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg); bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg); + bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb); + bool RemotePublish(BHMsgHead &head, const std::string &body_content); bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content); void OnAlloc(ShmSocket &socket, const int64_t val); void OnFree(ShmSocket &socket, const int64_t val); @@ -176,15 +183,19 @@ MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); - Clients DoFindClients(const std::string &topic); - bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply); + MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg); void OnTimer(); + + // remote hosts records + std::vector<std::string> FindRemoteSubClients(const Topic &topic); private: void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); + void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); + Clients DoFindClients(const std::string &topic, bool from_remote); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } bool Valid(const WeakNode &weak) { @@ -197,7 +208,8 @@ std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; - std::unordered_map<Topic, Clients> subscribe_map_; + std::unordered_map<Topic, Clients> local_sub_map_; + std::unordered_map<Topic, Clients> net_sub_map_; std::unordered_map<Address, Node> nodes_; std::unordered_map<ProcId, Address> online_node_addr_map_; ProcRecords procs_; // To get a short index for msg alloc. -- Gitblit v1.8.0