//
|
// Created by ps on 2/26/18.
|
//
|
|
#ifndef COMPARETEST_COMPARESERVER_H
|
#define COMPARETEST_COMPARESERVER_H
|
|
#include <thread>
|
#include <set>
|
|
#include <basic/debug/Debug.h>
|
#include <basic/util/thread/RWLock.hpp>
|
#include <basic/util/thread/MultiThread.h>
|
#include <basic/util/BASE64/Base64.h>
|
|
struct AlarmData {
|
int num;
|
std::string tableName;
|
std::vector<unsigned char> feature;
|
float threshold;
|
};
|
|
struct FeatureData {
|
long face_id;
|
std::string uuid;
|
//feature
|
std::vector<std::vector<unsigned char>> features;
|
std::string faceUrl;
|
std::string idcard;
|
};
|
|
static std::vector<std::string> AlarmServerPropertyAnalyseByComma(std::string str_list) {
|
std::vector<std::string> result;
|
char *property_list = const_cast<char *>(str_list.c_str());
|
const char *c_Analyse = ",";
|
char *t_property;
|
|
#ifdef linux
|
char *t_save = NULL;
|
t_property = strtok_r(property_list, c_Analyse, &t_save);
|
#else
|
t_property = strtok(property_list,c_Analyse);
|
#endif
|
|
while (t_property) {
|
std::string str_pro(t_property);
|
result.push_back(str_pro);
|
#ifdef linux
|
t_property = strtok_r(t_save, c_Analyse, &t_save);
|
#else
|
t_property = strtok(NULL,c_Analyse);
|
#endif
|
|
}
|
return result;
|
}
|
|
#define ParalleFunc std::function<void(std::string&,FeatureData&)>
|
enum ParallelForThreardSize {
|
CPU_Number = 1
|
};
|
|
class AlarmServer {
|
|
public:
|
AlarmServer() : m_dbSet(false), m_dbReady(false) {
|
|
}
|
|
~AlarmServer() {
|
for (auto item : dataMap) {
|
delete &(item.second);
|
// item.second = NULL;
|
}
|
dataMap.clear();
|
dataAddBuffer.clear();
|
dataRemoveBuffer.clear();
|
}
|
|
void initDB(std::string str_config) {
|
std::thread thd(loadDBCahce, this, str_config);
|
thd.detach();
|
}
|
|
void removeData(std::string key) {
|
std::lock_guard<std::mutex> guard(dataRemoveBufferMtx);
|
dataRemoveBuffer.insert(key);
|
dataRemoveBufferUpdated = true;
|
}
|
|
void addData(std::string key, FeatureData &value) {
|
std::lock_guard<std::mutex> guard(dataAddBufferMtx);
|
dataAddBuffer[key] = value;
|
dataAddBufferUpdated = true;
|
}
|
|
bool getDBReady() {
|
return m_dbReady;
|
}
|
|
//m_dbReady is false return,true go on
|
//use parallelFor
|
virtual bool compare(std::thread::id key, AlarmData *, int topN) = 0;
|
|
private:
|
//init data,this is thread body
|
static void loadDBCahce(AlarmServer *compareServer, std::string str_config) {
|
std::lock_guard<std::mutex> guard(compareServer->dataRemoveBufferMtx);
|
std::lock_guard<std::mutex> guard2(compareServer->dataAddBufferMtx);
|
std::lock_guard<std::mutex> dataGuard(compareServer->dataMtx);
|
|
compareServer->dataRemoveBuffer.clear();
|
compareServer->dataAddBuffer.clear();
|
|
compareServer->m_rwLock.wrlock();
|
compareServer->loadDBData(str_config);
|
compareServer->m_dbReady = true;
|
compareServer->m_rwLock.unlock();
|
INFO("m_dbReady is true");
|
}
|
|
virtual void loadDBData(std::string tableName) = 0;
|
|
bool m_dbSet;
|
bool m_dbReady;
|
|
bool dataAddBufferUpdated;
|
bool dataRemoveBufferUpdated;
|
bool dateResetUpdated;
|
|
RWLock m_rwLock;
|
|
void updateDataRemove() {
|
if (dataRemoveBufferUpdated) {
|
std::lock_guard<std::mutex> dataRemoveGuard(dataRemoveBufferMtx);
|
std::lock_guard<std::mutex> dataGuard(dataMtx);
|
if (!dataRemoveBufferUpdated)return;
|
for (auto key: dataRemoveBuffer) {
|
this->dataMap.erase(key);
|
}
|
dataRemoveBuffer.clear();
|
dataRemoveBufferUpdated = false;
|
}
|
}
|
|
void updateDataAdd() {
|
if (dataAddBufferUpdated) {
|
std::lock_guard<std::mutex> dataRemoveGuard(dataAddBufferMtx);
|
std::lock_guard<std::mutex> dataGuard(dataMtx);
|
if (!dataAddBufferUpdated)return;
|
for (auto addData: dataAddBuffer) {
|
this->dataMap.insert(addData);
|
}
|
dataAddBuffer.clear();
|
dataAddBufferUpdated = false;
|
}
|
}
|
|
protected:
|
Base64 base64;
|
//#todo value is vector not is struct
|
std::map<std::string, FeatureData> dataMap;
|
std::map<std::string, FeatureData> dataAddBuffer;
|
std::set<std::string> dataRemoveBuffer;
|
std::mutex dataMtx;
|
std::mutex dataAddBufferMtx;
|
std::mutex dataRemoveBufferMtx;
|
std::mutex dataRestMtx;
|
|
void parallelFor(int threads, ParalleFunc func) {
|
updateDataRemove();
|
updateDataAdd();
|
// std::lock_guard<std::mutex> dataGuard(dataMtx);
|
m_rwLock.rdlock();
|
MultiThread mthd(threads, [&func, this](int idx, int num) {
|
int size = dataMap.size();
|
int step = size / num;
|
if (step < 1) {
|
step = 1;
|
if (idx >= size)return;
|
}
|
auto iter = dataMap.begin();
|
for (int i = idx * step; i > 0; i--) {
|
iter++;
|
}
|
for (int i = 0; i < step && iter != dataMap.end(); iter++, i++) {
|
auto &data = iter->second;
|
std::string key = iter->first;
|
func(key, data);
|
}
|
});
|
mthd.join();
|
m_rwLock.unlock();
|
}
|
};
|
|
|
#endif //COMPARETEST_COMPARESERVER_H
|