lichao
2021-04-09 4e5cb7960ce4e7e66d5190be67426aeca8b55c3d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "bh_util.h"
#include "defs.h"
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
using namespace bhome_msg;
 
class ShmSocket : private boost::noncopyable
{
    template <class DoSend>
    inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend)
    {
        bool r = false;
        DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); });
        r = doSend(msg);
        return r;
    }
 
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
 
public:
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
 
    ShmSocket(Shm &shm, const MQId &id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
    const MQId &id() const { return mq().Id(); }
    Shm &shm() { return shm_; }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
    size_t Pending() const { return mq().Pending(); }
 
    bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms)
    {
        assert(valid_remote);
        return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms);
    }
    //TODO reimplment, using async.
    bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
 
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
    {
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
 
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms)
    {
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
 
    template <class Body>
    bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
            std::condition_variable cv;
            bool canceled = false;
        };
 
        try {
            std::shared_ptr<State> st(new State);
            auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
 
            auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
                std::unique_lock<std::mutex> lk(st->mutex);
                if (!st->canceled) {
                    reply.swap(msg);
                    reply_head.Swap(&head);
                    st->cv.notify_one();
                } else {
                }
            };
 
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
            if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
                return true;
            } else {
                st->canceled = true;
                return false;
            }
        } catch (...) {
            return false;
        }
    }
 
protected:
    const Shm &shm() const { return shm_; }
    Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return mq_; }
    std::mutex &mutex() { return mutex_; }
 
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
 
    Shm &shm_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    Queue mq_;
    class AsyncCBs
    {
        std::unordered_map<std::string, RecvCB> store_;
 
    public:
        bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
        bool Find(const std::string &id, RecvCB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
                cb.swap(pos->second);
                store_.erase(pos);
                return true;
            } else {
                return false;
            }
        }
    };
 
    Synced<AsyncCBs> async_cbs_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO