lichao
2021-04-20 64bff0caaf665c65125cdab2b144f3594d520002
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "bh_util.h"
#include "defs.h"
#include "sendq.h"
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
using namespace bhome_msg;
 
class ShmSocket : private boost::noncopyable
{
    template <class... T>
    bool SendImpl(const void *valid_remote, T &&...rest)
    {
        send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
        return true;
    }
 
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
 
public:
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
 
    ShmSocket(Shm &shm, const MQId &id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
    bool Remove() { return Remove(shm(), id()); }
    const MQId &id() const { return mq().Id(); }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
    size_t Pending() const { return mq().Pending(); }
 
    template <class Body>
    bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        try {
            if (!cb) {
                return SendImpl(valid_remote, MsgI::Serialize(head, body));
            } else {
                std::string msg_id(head.msg_id());
                per_msg_cbs_->Store(msg_id, std::move(cb));
                auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                    RecvCB cb_no_use;
                    per_msg_cbs_->Pick(msg_id, cb_no_use);
                };
                return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
            }
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
    }
 
    bool Send(const void *valid_remote, const MsgI &imsg)
    {
        return SendImpl(valid_remote, imsg);
    }
 
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
    template <class Body>
    bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
            std::condition_variable cv;
            bool canceled = false;
        };
 
        try {
            std::shared_ptr<State> st(new State);
            auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
 
            auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
                std::unique_lock<std::mutex> lk(st->mutex);
                if (!st->canceled) {
                    reply.swap(msg);
                    reply_head.Swap(&head);
                    st->cv.notify_one();
                } else { // ignore
                }
            };
 
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, std::move(OnRecv));
            if (!sendok) {
                printf("send timeout\n");
            }
            if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
                return true;
            } else {
                st->canceled = true;
                SetLastError(ETIMEDOUT, "timeout");
                return false;
            }
        } catch (...) {
            return false;
        }
    }
 
    Shm &shm() const { return mq().shm(); }
 
protected:
    Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return mq_; }
    std::mutex &mutex() { return mutex_; }
 
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
 
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    Queue mq_;
    class AsyncCBs
    {
        std::unordered_map<std::string, RecvCB> store_;
 
    public:
        bool empty() const { return store_.empty(); }
        bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const std::string &id, RecvCB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
                cb.swap(pos->second);
                store_.erase(pos);
                return true;
            } else {
                return false;
            }
        }
    };
 
    Synced<AsyncCBs> per_msg_cbs_;
    SendQ send_buffer_;
    // Synced<SendQ> send_buffer_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO