lichao
2021-04-02 83085f2ce99cca05d40a19482151873a55e6393a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
class ShmSocket : private boost::noncopyable
{
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
 
public:
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
 
    ShmSocket(Shm &shm, const void *id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
 
    Shm &shm() { return shm_; }
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool Start(const RecvBHMsgCB &onData, int nworker = 1)
    {
        return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
    }
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
 
protected:
    const Shm &shm() const { return shm_; }
    Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return *mq_; }
    std::mutex &mutex() { return mutex_; }
 
    bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
    bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
 
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
 
    Shm &shm_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    std::unique_ptr<Queue> mq_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO