lichao
2021-04-08 c338820e4db43ad32c20ff429a038b06bcb980f8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
 * =====================================================================================
 *
 *       Filename:  topic_node.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月07日 09时05分26秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#ifndef TOPIC_NODE_YVKWA6TF
#define TOPIC_NODE_YVKWA6TF
 
#include "msg.h"
#include "pubsub.h"
#include "socket.h"
#include <memory>
 
using namespace bhome_shm;
using namespace bhome_msg;
 
// a node is a client.
class TopicNode
{
    SharedMemory &shm_;
    MsgRegister info_;
 
public:
    TopicNode(SharedMemory &shm);
    ~TopicNode();
    bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
    bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
 
    // topic rpc server
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool ServerStart(OnRequest const &cb, const int nworker = 2);
    bool ServerStop();
    bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
 
    // topic client
    typedef std::function<void(const std::string &data)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientStopWorker();
    bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
    {
        return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
    {
        return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
 
    void StopAll();
 
private:
    bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc().proc_id(); }
 
    typedef bhome_msg::BHAddress Address;
    class TopicQueryCache
    {
        class Impl
        {
            typedef std::unordered_map<Topic, Address> Store;
            Store store_;
 
        public:
            bool Find(const Topic &topic, Address &addr)
            {
                auto pos = store_.find(topic);
                if (pos != store_.end()) {
                    addr = pos->second;
                    return true;
                } else {
                    return false;
                }
            }
            bool Update(const Topic &topic, const Address &addr)
            {
                store_[topic] = addr;
                return true;
            }
        };
        Synced<Impl> impl_;
        // Impl &impl()
        // {
        //     thread_local Impl impl;
        //     return impl;
        // }
 
    public:
        bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
    };
 
    // some sockets may be the same one, using functions make it easy to change.
 
    auto &SockNode() { return sock_node_; }
    auto &SockSub() { return sock_sub_; }
    auto &SockRequest() { return sock_request_; }
    auto &SockReply() { return sock_reply_; }
 
    ShmSocket sock_node_;
    ShmSocket sock_request_;
    ShmSocket sock_reply_;
    SocketSubscribe sock_sub_;
 
    TopicQueryCache topic_query_cache_;
};
 
#endif // end of include guard: TOPIC_NODE_YVKWA6TF