lichao
2021-04-01 b55ffe89f4b237be5f79232cfddfe22bfdb87c64
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
class ShmSocket : private boost::noncopyable
{
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
 
public:
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
 
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
 
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool StartRaw(const RecvRawCB &onData, int nworker = 1);
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
 
protected:
    ShmSocket(Shm &shm, const void *id, const int len);
    Shm &shm() { return shm_; }
    const Shm &shm() const { return shm_; }
    Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return *mq_; }
    std::mutex &mutex() { return mutex_; }
 
    bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
    bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
 
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
 
    Shm &shm_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    std::unique_ptr<Queue> mq_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO