From 628c1c21ffb19d8c96ed9ce89531595f9870ab1a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 18:41:02 +0800 Subject: [PATCH] add msg tag; recv all msgs before remove mq. --- src/topic_node.h | 28 +++++++++++++++++----------- 1 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/topic_node.h b/src/topic_node.h index 8287b4a..3371c35 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -22,6 +22,7 @@ #include "socket.h" #include <atomic> #include <memory> +#include <vector> using namespace bhome_shm; using namespace bhome_msg; @@ -55,8 +56,8 @@ // topic client typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); - bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); + bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish bool Publish(const MsgPublish &pub, const int timeout_ms); @@ -107,17 +108,22 @@ }; // some sockets may be the same one, using functions make it easy to change. + enum { eSockStart, + eSockNode = eSockStart, + eSockPub = eSockNode, + eSockServer, + eSockClient, + eSockSub, + eSockEnd, + }; + std::vector<std::unique_ptr<ShmSocket>> sockets_; - ShmSocket &SockNode() { return sock_node_; } - ShmSocket &SockPub() { return SockNode(); } - ShmSocket &SockSub() { return sock_sub_; } - ShmSocket &SockClient() { return sock_client_; } - ShmSocket &SockServer() { return sock_server_; } + ShmSocket &SockNode() { return *sockets_[eSockNode]; } + ShmSocket &SockPub() { return *sockets_[eSockPub]; } + ShmSocket &SockSub() { return *sockets_[eSockSub]; } + ShmSocket &SockClient() { return *sockets_[eSockClient]; } + ShmSocket &SockServer() { return *sockets_[eSockServer]; } - ShmSocket sock_node_; - ShmSocket sock_client_; - ShmSocket sock_server_; - ShmSocket sock_sub_; enum State { eStateUnregistered, eStateOnline, -- Gitblit v1.8.0