lichao
2021-06-03 8967e7f2f8b94dc032135707e16c8a9f233d0db6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
 * =====================================================================================
 *
 *       Filename:  sendq.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月14日 09时22分59秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#ifndef SENDQ_IWKMSK7M
#define SENDQ_IWKMSK7M
 
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
#include <deque>
#include <functional>
#include <list>
#include <mutex>
#include <string>
#include <unordered_map>
 
class ShmMsgQueue;
 
class SendQ
{
public:
    typedef MQId Remote;
    typedef bhome_msg::MsgI MsgI;
    typedef std::string Content;
    typedef int64_t Data;
    typedef std::function<void(const Data &)> OnMsgEvent;
    struct MsgInfo {
        MQInfo mq_;
        Data data_;
        OnMsgEvent on_expire_;
    };
    typedef TimedData<MsgInfo> TimedMsg;
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    SendQ(SharedMemory &shm) :
        shm_(shm) {}
 
    bool Append(const MQInfo &mq, MsgI msg)
    {
        msg.AddRef();
        auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
        try {
            AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
            return true;
        } catch (...) {
            msg.Release();
            return false;
        }
    }
 
    bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
    {
        msg.AddRef();
        auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
            onExpire(d);
            msg.Release();
        };
        try {
            AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
            return true;
        } catch (...) {
            msg.Release();
            return false;
        }
    }
 
    bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
    {
        try {
            AppendData(mq, command, DefaultExpire(), onExpire);
            return true;
        } catch (...) {
            return false;
        }
    }
    bool TrySend();
 
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(NodeTimeoutSec()); }
    void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
 
    typedef std::deque<TimedMsg> Array;
    typedef std::list<Array> ArrayList;
    typedef std::unordered_map<Remote, ArrayList> Store;
 
    int DoSend1Remote(const Remote remote, Array &arr);
    int DoSend1Remote(const Remote remote, ArrayList &arr);
 
    bool TooFast();
 
    SharedMemory &shm_;
    std::mutex mutex_in_;
    std::mutex mutex_out_;
    Store in_;
    Store out_;
 
    struct Counter {
        std::atomic<int64_t> count_;
        std::atomic<int64_t> count_1sec_;
        std::atomic<int64_t> last_time_;
        Counter() :
            count_(0), count_1sec_(0), last_time_(0) {}
        void Count1()
        {
            CheckTime();
            ++count_1sec_;
            ++count_;
        }
        void Count(int n)
        {
            CheckTime();
            count_1sec_ += n;
            count_ += n;
        }
        void CheckTime()
        {
            auto cur = NowSec();
            if (cur > last_time_) {
                count_1sec_ = 0;
                last_time_ = cur;
            }
        }
        int64_t GetCount() const { return count_.load(); }
        int64_t LastSec() const { return count_1sec_.load(); }
    };
    Counter count_in_;
    Counter count_out_;
};
 
#endif // end of include guard: SENDQ_IWKMSK7M