lichao
2021-03-31 6eefba812ede29549af3633c490f2e85a4805524
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "shm_queue.h"
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
class ShmSocket
{
    typedef bhome_shm::ShmMsgQueue Queue;
 
public:
    enum Type {
        eSockRequest,
        eSockReply,
        eSockSubscribe,
        eSockPublish,
    };
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
 
    ShmSocket(Type type);
    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
    ~ShmSocket();
 
    // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
    bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
 
    // bool HandleRequest(onData);
    bool ReadRequest(); // exclude with HandleRequest
    bool SendReply();   // exclude with HandleRequest
 
    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
    bool SetRecvCallback(const RecvCB &onRecv);
 
private:
    bool HasRecvCB();
    void Stop();
 
    bhome_shm::SharedMemory &shm_;
    Type type_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::condition_variable cv_recv_cb_;
    std::atomic<bool> run_;
    RecvCB onRecv_;
 
    std::unique_ptr<Queue> mq_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO