lichao
2021-05-13 db322f33ba13592f2492317e3f1a070454c97059
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
 
#include "bh_util.h"
#include "defs.h"
#include "sendq.h"
#include "shm_msg_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
using namespace bhome_msg;
class ShmSocket : private boost::noncopyable
{
 
protected:
    typedef ShmMsgQueue Queue;
 
public:
    typedef ShmMsgQueue::MQId MQId;
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
 
    ShmSocket(Shm &shm, const MQId id, const int len);
    ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
    bool Remove() { return Remove(shm(), id()); }
    MQId id() const { return mq().Id(); }
    void SetNodeProc(const int proc_index, const int socket_index)
    {
        node_proc_index_ = proc_index;
        socket_index_ = socket_index;
        LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
    }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
 
    template <class Body>
    bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
    {
        try {
            //TODO alloc outsiez and use send.
            MsgI msg;
            if (!msg.Make(head, body)) { return false; }
            DEFER1(msg.Release());
 
            return Send(remote, msg);
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
    }
 
    bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
    template <class Body>
    bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        std::string msg_id(head.msg_id());
        std::string content(MsgI::Serialize(head, body));
        size_t size = content.size();
        auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
            if (!msg.Fill(content)) { return; }
 
            try {
                if (!cb) {
                    Send(remote, msg);
                } else {
                    per_msg_cbs_->Store(msg_id, std::move(cb));
                    auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                        RecvCB cb_no_use;
                        per_msg_cbs_->Pick(msg_id, cb_no_use);
                    };
                    Send(remote, msg, onExpireRemoveCB);
                }
            } catch (...) {
                SetLastError(eError, "Send internal error.");
            }
        };
 
        return RequestAlloc(size, OnResult);
    }
    template <class... T>
    bool Send(const MQId remote, const MsgI &imsg, T &&...t)
    {
        return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
    }
    template <class... T>
    bool Send(const MQId remote, const int64_t cmd, T &&...t)
    {
        return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
    }
    bool SyncRecv(int64_t &cmd, const int timeout_ms);
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
    template <class Body>
    bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
            std::condition_variable cv;
            bool canceled = false;
        };
 
        try {
            std::shared_ptr<State> st(new State);
            auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
 
            auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
                std::unique_lock<std::mutex> lk(st->mutex);
                if (!st->canceled) {
                    reply.swap(msg);
                    reply_head.Swap(&head);
                    st->cv.notify_one();
                } else { // ignore
                }
            };
 
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, std::move(OnRecv));
            if (!sendok) {
                LOG_DEBUG() << "send timeout";
            }
            if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
                return true;
            } else {
                st->canceled = true;
                SetLastError(ETIMEDOUT, "timeout");
                return false;
            }
        } catch (...) {
            return false;
        }
    }
 
    Shm &shm() const { return mq().shm(); }
 
protected:
    Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return mq_; }
    std::mutex &mutex() { return mutex_; }
 
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
 
    template <class... Rest>
    bool SendImpl(const MQId remote, Rest &&...rest)
    {
        // TODO send alloc request, and pack later, higher bit means alloc?
        send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
        return true;
    }
 
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    Queue mq_;
    template <class Key, class CB>
    class CallbackRecords
    {
        std::unordered_map<Key, CB> store_;
 
    public:
        bool empty() const { return store_.empty(); }
        bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const Key &id, CB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
                cb.swap(pos->second);
                store_.erase(pos);
                return true;
            } else {
                return false;
            }
        }
    };
 
    Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
    Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
 
    SendQ send_buffer_;
    // node request center alloc memory.
    int node_proc_index_ = -1;
    int socket_index_ = -1;
    std::atomic<int> alloc_id_;
};
 
#endif // end of include guard: SOCKET_GWTJHBPO