lichao
2021-04-02 83085f2ce99cca05d40a19482151873a55e6393a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
 * =====================================================================================
 *
 *       Filename:  reqrep.h
 *
 *    Description:  topic request/reply sockets
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时36分06秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
#ifndef REQREP_ACEH09NK
#define REQREP_ACEH09NK
 
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include "socket.h"
#include <functional>
#include <unordered_map>
 
using bhome::msg::ProcInfo;
 
class SocketRequest : private ShmSocket
{
    typedef ShmSocket Socket;
 
public:
    SocketRequest(Socket::Shm &shm) :
        Socket(shm, 64) { StartWorker(); }
    SocketRequest() :
        SocketRequest(BHomeShm()) {}
    ~SocketRequest() { Stop(); }
 
    typedef std::function<void(const std::string &data)> RequestResultCB;
    bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
    bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
    bool Stop() { return Socket::Stop(); }
    bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
 
    bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms);
    }
    bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
    {
        return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
 
private:
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb);
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms);
    bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    std::unordered_map<std::string, RecvBHMsgCB> async_cbs_;
 
    typedef bhome_msg::BHAddress Address;
    class TopicCache
    {
        class Impl
        {
            typedef std::unordered_map<Topic, Address> Store;
            Store store_;
 
        public:
            bool Find(const Topic &topic, Address &addr)
            {
                auto pos = store_.find(topic);
                if (pos != store_.end()) {
                    addr = pos->second;
                    return true;
                } else {
                    return false;
                }
            }
            bool Update(const Topic &topic, const Address &addr)
            {
                store_[topic] = addr;
                return true;
            }
        };
        Synced<Impl> impl_;
        // Impl &impl()
        // {
        //     thread_local Impl impl;
        //     return impl;
        // }
 
    public:
        bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
    };
    TopicCache topic_cache_;
};
 
class SocketReply : private ShmSocket
{
    typedef ShmSocket Socket;
 
public:
    SocketReply(Socket::Shm &shm) :
        Socket(shm, 64) {}
    SocketReply() :
        SocketReply(BHomeShm()) {}
    ~SocketReply() { Stop(); }
 
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool StartWorker(const OnRequest &rcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
    bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
    bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
 
private:
};
 
#endif // end of include guard: REQREP_ACEH09NK