lichao
2021-03-31 3c2b6739208d961cf8b86460d7f05516d044960c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>
 
class ShmSocket : private boost::noncopyable
{
    typedef bhome_shm::ShmMsgQueue Queue;
 
public:
    enum Type {
        eSockRequest,
        eSockReply,
        eSockSubscribe,
        eSockPublish,
        eSockBus,
    };
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
    typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
 
    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
    ShmSocket(Type type);
    ~ShmSocket();
    bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
 
    // bool HandleRequest(onData);
    bool ReadRequest(); // exclude with HandleRequest
    bool SendReply();   // exclude with HandleRequest
 
    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
 
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool StartRaw(const RecvRawCB &onData, int nworker = 1);
    bool StartAsync(int nworker = 2);
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
 
private:
    bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
    bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    bool StopNoLock();
    bhome_shm::SharedMemory &shm_;
    const Type type_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    std::unique_ptr<Queue> mq_;
    std::unordered_map<std::string, RecvCB> async_cbs_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO