lichao
2021-04-09 2197cf91e7a3bd5941327ba630a42946b88f069e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
 * =====================================================================================
 *
 *       Filename:  shm_queue.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月25日 10时35分09秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SHM_QUEUE_JE0OEUP3
#define SHM_QUEUE_JE0OEUP3
 
#include "msg.h"
#include "shm.h"
#include <boost/circular_buffer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
 
namespace bhome_shm
{
 
template <class D>
using Circular = boost::circular_buffer<D, Allocator<D>>;
 
typedef boost::uuids::uuid MQId;
 
template <class D>
class SharedQueue : private Circular<D>
{
    typedef Circular<D> Super;
    Mutex mutex_;
    Cond cond_read_;
    Cond cond_write_;
    Mutex &mutex() { return mutex_; }
 
    static boost::posix_time::ptime MSFromNow(const int ms)
    {
        using namespace boost::posix_time;
        ptime cur = boost::posix_time::microsec_clock::universal_time();
        return cur + millisec(ms);
    }
 
public:
    SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
        Super(len, alloc) {}
    using Super::capacity;
    using Super::size;
    template <class Iter, class OnWrite>
    int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
    {
        int n = 0;
        if (begin != end) {
            auto endtime = MSFromNow(timeout_ms);
            Guard lock(mutex());
            while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
                onWrite(*begin);
                this->push_back(*begin);
                ++n;
                cond_read_.notify_one();
                if (++begin == end) {
                    break;
                }
            }
        }
        return n;
    }
 
    template <class OnWrite>
    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite)
    {
        return Write(&buf, (&buf) + 1, timeout_ms, onWrite);
    }
    bool Write(const D &buf, const int timeout_ms)
    {
        return Write(buf, timeout_ms, [](const D &buf) {});
    }
 
    template <class OnData>
    bool Read(const int timeout_ms, OnData onData)
    {
        int n = 0;
        auto endtime = MSFromNow(timeout_ms);
        Guard lock(mutex());
        while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
            const bool more = onData(this->front());
            this->pop_front();
            cond_write_.notify_one();
            ++n;
            if (!more) {
                break;
            }
        }
        return n;
    }
 
    bool Read(D &buf, const int timeout_ms)
    {
        auto read1 = [&](D &d) {
            using std::swap;
            swap(buf, d);
            return false;
        };
        return Read(timeout_ms, read1) == 1;
    }
};
 
using namespace bhome_msg;
 
class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>
{
    typedef ShmObject<SharedQueue<MsgI>> Super;
    typedef Super::Data Queue;
    typedef std::function<void()> OnSend;
    bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
    bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
    MQId id_;
 
protected:
    ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
public:
    ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    const MQId &Id() const { return id_; }
 
    // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
 
    template <class... Rest>
    bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    size_t Pending() const { return data()->size(); }
};
 
} // namespace bhome_shm
 
#endif // end of include guard: SHM_QUEUE_JE0OEUP3