lichao
2021-06-03 8967e7f2f8b94dc032135707e16c8a9f233d0db6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
 * =====================================================================================
 *
 *       Filename:  shm_socket.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
 
#ifndef SHM_SOCKET_GWTJHBPO
#define SHM_SOCKET_GWTJHBPO
 
#include "bh_util.h"
#include "defs.h"
#include "sendq.h"
#include "shm_msg_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
 
using namespace bhome_msg;
class ShmSocket : private boost::noncopyable
{
 
protected:
    typedef ShmMsgQueue Queue;
 
public:
    typedef ShmMsgQueue::MQId MQId;
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
 
    ShmSocket(Shm &shm, const MQId id, Mode mode) :
        run_(false), mq_(shm, id, mode), alloc_id_(0), send_buffer_(shm) { Start(); }
    ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
        run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); }
 
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
    bool Remove() { return Remove(shm(), id()); }
    MQId id() const { return mq().Id(); }
    int64_t AbsAddr() const { return mq().AbsAddr(); }
    void SetNodeProc(const int proc_index, const int socket_index)
    {
        node_proc_index_ = proc_index;
        socket_index_ = socket_index;
    }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
 
    bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
    bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb);
    template <class Body>
    bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); }
    bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb);
 
    template <class Body>
    bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); }
    bool Send(const MQInfo &remote, std::string &&content);
    bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); }
 
    template <class... T>
    bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
    {
        return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
    }
 
    template <class Body>
    bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
            std::condition_variable cv;
            bool canceled = false;
        };
 
        try {
            std::shared_ptr<State> st(new State);
 
            auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
 
            auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
                std::unique_lock<std::mutex> lk(st->mutex);
                if (!st->canceled) {
                    reply.swap(msg);
                    reply_head.Swap(&head);
                    st->cv.notify_one();
                } else { // ignore
                }
            };
 
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, std::move(OnRecv));
            if (!sendok) {
                LOG_DEBUG() << "send timeout";
            }
            if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
                return true;
            } else {
                st->canceled = true;
                SetLastError(ETIMEDOUT, "timeout");
                return false;
            }
        } catch (...) {
            return false;
        }
    }
 
    Shm &shm() const { return mq().shm(); }
 
protected:
    Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return mq_; }
    std::mutex &mutex() { return mutex_; }
 
private:
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
 
    template <class... Rest>
    bool SendImpl(Rest &&...rest)
    {
        return send_buffer_.Append(std::forward<decltype(rest)>(rest)...);
    }
 
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
 
    Queue mq_;
    template <class Key, class CB>
    class CallbackRecords
    {
        std::unordered_map<Key, CB> store_;
 
    public:
        bool empty() const { return store_.empty(); }
        bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const Key &id, CB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
                cb.swap(pos->second);
                store_.erase(pos);
                return true;
            } else {
                return false;
            }
        }
    };
 
    Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
    Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
 
    // node request center alloc memory.
    int node_proc_index_ = -1;
    int socket_index_ = -1;
    std::atomic<int> alloc_id_;
 
    SendQ send_buffer_;
};
 
#endif // end of include guard: SHM_SOCKET_GWTJHBPO