派生自 development/c++

pansen
2019-03-07 979bc003bce710bf300bc2bd87a8278585678763
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
//
// Created by ps on 2/26/18.
//
 
#ifndef COMPARETEST_COMPARESERVER_H
#define COMPARETEST_COMPARESERVER_H
 
#include <thread>
#include <set>
 
#include <basic/debug/Debug.h>
#include <basic/util/thread/RWLock.hpp>
#include <basic/util/thread/MultiThread.h>
#include <basic/util/BASE64/Base64.h>
 
struct AlarmData {
    int num;
    std::string tableName;
    std::vector<unsigned char> feature;
    float threshold;
};
 
struct FeatureData {
    long face_id;
    std::string uuid;
    //feature
    std::vector<std::vector<unsigned char>> features;
    std::string faceUrl;
    std::string idcard;
};
 
static std::vector<std::string> AlarmServerPropertyAnalyseByComma(std::string str_list) {
    std::vector<std::string> result;
    char *property_list = const_cast<char *>(str_list.c_str());
    const char *c_Analyse = ",";
    char *t_property;
 
#ifdef linux
    char *t_save = NULL;
    t_property = strtok_r(property_list, c_Analyse, &t_save);
#else
    t_property = strtok(property_list,c_Analyse);
#endif
 
    while (t_property) {
        std::string str_pro(t_property);
        result.push_back(str_pro);
#ifdef linux
        t_property = strtok_r(t_save, c_Analyse, &t_save);
#else
        t_property = strtok(NULL,c_Analyse);
#endif
 
    }
    return result;
}
 
#define ParalleFunc std::function<void(std::string&,FeatureData&)>
enum ParallelForThreardSize {
    CPU_Number = 1
};
 
class AlarmServer {
 
public:
    AlarmServer() : m_dbSet(false), m_dbReady(false) {
 
    }
 
    ~AlarmServer() {
        for (auto item : dataMap) {
            delete &(item.second);
//            item.second = NULL;
        }
        dataMap.clear();
        dataAddBuffer.clear();
        dataRemoveBuffer.clear();
    }
 
    void initDB(std::string str_config) {
        std::thread thd(loadDBCahce, this, str_config);
        thd.detach();
    }
 
    void removeData(std::string key) {
        std::lock_guard<std::mutex> guard(dataRemoveBufferMtx);
        dataRemoveBuffer.insert(key);
        dataRemoveBufferUpdated = true;
    }
 
    void addData(std::string key, FeatureData &value) {
        std::lock_guard<std::mutex> guard(dataAddBufferMtx);
        dataAddBuffer[key] = value;
        dataAddBufferUpdated = true;
    }
 
    bool getDBReady() {
        return m_dbReady;
    }
 
    //m_dbReady is false return,true go on
    //use parallelFor
    virtual bool compare(std::thread::id key, AlarmData *, int topN) = 0;
 
private:
    //init data,this is thread body
    static void loadDBCahce(AlarmServer *compareServer, std::string str_config) {
        std::lock_guard<std::mutex> guard(compareServer->dataRemoveBufferMtx);
        std::lock_guard<std::mutex> guard2(compareServer->dataAddBufferMtx);
        std::lock_guard<std::mutex> dataGuard(compareServer->dataMtx);
 
        compareServer->dataRemoveBuffer.clear();
        compareServer->dataAddBuffer.clear();
 
        compareServer->m_rwLock.wrlock();
        compareServer->loadDBData(str_config);
        compareServer->m_dbReady = true;
        compareServer->m_rwLock.unlock();
        INFO("m_dbReady is true");
    }
 
    virtual void loadDBData(std::string tableName) = 0;
 
    bool m_dbSet;
    bool m_dbReady;
 
    bool dataAddBufferUpdated;
    bool dataRemoveBufferUpdated;
    bool dateResetUpdated;
 
    RWLock m_rwLock;
 
    void updateDataRemove() {
        if (dataRemoveBufferUpdated) {
            std::lock_guard<std::mutex> dataRemoveGuard(dataRemoveBufferMtx);
            std::lock_guard<std::mutex> dataGuard(dataMtx);
            if (!dataRemoveBufferUpdated)return;
            for (auto key: dataRemoveBuffer) {
                this->dataMap.erase(key);
            }
            dataRemoveBuffer.clear();
            dataRemoveBufferUpdated = false;
        }
    }
 
    void updateDataAdd() {
        if (dataAddBufferUpdated) {
            std::lock_guard<std::mutex> dataRemoveGuard(dataAddBufferMtx);
            std::lock_guard<std::mutex> dataGuard(dataMtx);
            if (!dataAddBufferUpdated)return;
            for (auto addData: dataAddBuffer) {
                this->dataMap.insert(addData);
            }
            dataAddBuffer.clear();
            dataAddBufferUpdated = false;
        }
    }
 
protected:
    Base64 base64;
    //#todo value is vector not is struct
    std::map<std::string, FeatureData> dataMap;
    std::map<std::string, FeatureData> dataAddBuffer;
    std::set<std::string> dataRemoveBuffer;
    std::mutex dataMtx;
    std::mutex dataAddBufferMtx;
    std::mutex dataRemoveBufferMtx;
    std::mutex dataRestMtx;
 
    void parallelFor(int threads, ParalleFunc func) {
        updateDataRemove();
        updateDataAdd();
//        std::lock_guard<std::mutex> dataGuard(dataMtx);
        m_rwLock.rdlock();
        MultiThread mthd(threads, [&func, this](int idx, int num) {
            int size = dataMap.size();
            int step = size / num;
            if (step < 1) {
                step = 1;
                if (idx >= size)return;
            }
            auto iter = dataMap.begin();
            for (int i = idx * step; i > 0; i--) {
                iter++;
            }
            for (int i = 0; i < step && iter != dataMap.end(); iter++, i++) {
                auto &data = iter->second;
                std::string key = iter->first;
                func(key, data);
            }
        });
        mthd.join();
        m_rwLock.unlock();
    }
};
 
 
#endif //COMPARETEST_COMPARESERVER_H