lichao
2021-04-20 1f3729698a131b3f701f67adb6a1258aa1235dce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
 * =====================================================================================
 *
 *       Filename:  topic_node.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月07日 09时05分26秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#ifndef TOPIC_NODE_YVKWA6TF
#define TOPIC_NODE_YVKWA6TF
 
#include "msg.h"
#include "socket.h"
#include <memory>
 
using namespace bhome_shm;
using namespace bhome_msg;
 
// a node is a client.
class TopicNode
{
    SharedMemory &shm_;
    ProcInfo info_;
 
    SharedMemory &shm() { return shm_; }
 
public:
    TopicNode(SharedMemory &shm);
    ~TopicNode();
 
    // topic node
    bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
 
    // topic rpc server
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
    typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
    bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
    bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
 
    // topic client
    typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
 
    // publish
    bool Publish(const MsgPublish &pub, const int timeout_ms);
 
    // subscribe
    typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
    bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
    bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
    bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
 
    void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
    void Stop();
 
private:
    bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc_id(); }
 
    typedef BHAddress Address;
    class TopicQueryCache
    {
        class Impl
        {
            typedef std::unordered_map<Topic, Address> Records;
            Records records_;
 
        public:
            bool Find(const Topic &topic, Address &addr)
            {
                auto pos = records_.find(topic);
                if (pos != records_.end()) {
                    addr = pos->second;
                    return true;
                } else {
                    return false;
                }
            }
            bool Store(const Topic &topic, const Address &addr)
            {
                records_[topic] = addr;
                return true;
            }
        };
        Synced<Impl> impl_;
 
    public:
        bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); }
    };
 
    // some sockets may be the same one, using functions make it easy to change.
 
    ShmSocket &SockNode() { return sock_node_; }
    ShmSocket &SockPub() { return SockNode(); }
    ShmSocket &SockSub() { return sock_sub_; }
    ShmSocket &SockClient() { return sock_client_; }
    ShmSocket &SockServer() { return sock_server_; }
    bool IsRegistered() const { return registered_.load(); }
 
    ShmSocket sock_node_;
    ShmSocket sock_client_;
    ShmSocket sock_server_;
    ShmSocket sock_sub_;
    std::atomic<bool> registered_;
    std::atomic<bool> registered_ever_;
 
    TopicQueryCache topic_query_cache_;
};
 
#endif // end of include guard: TOPIC_NODE_YVKWA6TF