// // Created by ps on 2/26/18. // #ifndef COMPARETEST_COMPARESERVER_H #define COMPARETEST_COMPARESERVER_H #include #include #include #include #include #include struct AlarmData { int num; std::string tableName; std::vector feature; float threshold; }; struct FeatureData { long face_id; std::string uuid; //feature std::vector> features; std::string faceUrl; std::string idcard; }; static std::vector AlarmServerPropertyAnalyseByComma(std::string str_list) { std::vector result; char *property_list = const_cast(str_list.c_str()); const char *c_Analyse = ","; char *t_property; #ifdef linux char *t_save = NULL; t_property = strtok_r(property_list, c_Analyse, &t_save); #else t_property = strtok(property_list,c_Analyse); #endif while (t_property) { std::string str_pro(t_property); result.push_back(str_pro); #ifdef linux t_property = strtok_r(t_save, c_Analyse, &t_save); #else t_property = strtok(NULL,c_Analyse); #endif } return result; } #define ParalleFunc std::function enum ParallelForThreardSize { CPU_Number = 1 }; class AlarmServer { public: AlarmServer() : m_dbSet(false), m_dbReady(false) { } ~AlarmServer() { for (auto item : dataMap) { delete &(item.second); // item.second = NULL; } dataMap.clear(); dataAddBuffer.clear(); dataRemoveBuffer.clear(); } void initDB(std::string str_config) { std::thread thd(loadDBCahce, this, str_config); thd.detach(); } void removeData(std::string key) { std::lock_guard guard(dataRemoveBufferMtx); dataRemoveBuffer.insert(key); dataRemoveBufferUpdated = true; } void addData(std::string key, FeatureData &value) { std::lock_guard guard(dataAddBufferMtx); dataAddBuffer[key] = value; dataAddBufferUpdated = true; } bool getDBReady() { return m_dbReady; } //m_dbReady is false return,true go on //use parallelFor virtual bool compare(std::thread::id key, AlarmData *, int topN) = 0; private: //init data,this is thread body static void loadDBCahce(AlarmServer *compareServer, std::string str_config) { std::lock_guard guard(compareServer->dataRemoveBufferMtx); std::lock_guard guard2(compareServer->dataAddBufferMtx); std::lock_guard dataGuard(compareServer->dataMtx); compareServer->dataRemoveBuffer.clear(); compareServer->dataAddBuffer.clear(); compareServer->m_rwLock.wrlock(); compareServer->loadDBData(str_config); compareServer->m_dbReady = true; compareServer->m_rwLock.unlock(); INFO("m_dbReady is true"); } virtual void loadDBData(std::string tableName) = 0; bool m_dbSet; bool m_dbReady; bool dataAddBufferUpdated; bool dataRemoveBufferUpdated; bool dateResetUpdated; RWLock m_rwLock; void updateDataRemove() { if (dataRemoveBufferUpdated) { std::lock_guard dataRemoveGuard(dataRemoveBufferMtx); std::lock_guard dataGuard(dataMtx); if (!dataRemoveBufferUpdated)return; for (auto key: dataRemoveBuffer) { this->dataMap.erase(key); } dataRemoveBuffer.clear(); dataRemoveBufferUpdated = false; } } void updateDataAdd() { if (dataAddBufferUpdated) { std::lock_guard dataRemoveGuard(dataAddBufferMtx); std::lock_guard dataGuard(dataMtx); if (!dataAddBufferUpdated)return; for (auto addData: dataAddBuffer) { this->dataMap.insert(addData); } dataAddBuffer.clear(); dataAddBufferUpdated = false; } } protected: Base64 base64; //#todo value is vector not is struct std::map dataMap; std::map dataAddBuffer; std::set dataRemoveBuffer; std::mutex dataMtx; std::mutex dataAddBufferMtx; std::mutex dataRemoveBufferMtx; std::mutex dataRestMtx; void parallelFor(int threads, ParalleFunc func) { updateDataRemove(); updateDataAdd(); // std::lock_guard dataGuard(dataMtx); m_rwLock.rdlock(); MultiThread mthd(threads, [&func, this](int idx, int num) { int size = dataMap.size(); int step = size / num; if (step < 1) { step = 1; if (idx >= size)return; } auto iter = dataMap.begin(); for (int i = idx * step; i > 0; i--) { iter++; } for (int i = 0; i < step && iter != dataMap.end(); iter++, i++) { auto &data = iter->second; std::string key = iter->first; func(key, data); } }); mthd.join(); m_rwLock.unlock(); } }; #endif //COMPARETEST_COMPARESERVER_H