#include "PipeLinePool.h" #include "logger.h" #include #define PLP_MUTEX_LOCK(mut,_ret) if (mut != nullptr) {\ int ret = pthread_mutex_lock((pthread_mutex_t*)mut); \ if(ret != 0) \ { \ LOG_ERROR << "pthread_mutex_lock " << #mut << ": " << ret; \ return _ret; \ } \ } #define PLP_MUTEX_UNLOCK(mut,_ret) if (mut != nullptr) {\ int ret = pthread_mutex_unlock((pthread_mutex_t*)mut); \ if(ret != 0) \ { \ LOG_ERROR << "pthread_mutex_unlock " << #mut << ": " << ret; \ return _ret; \ } \ } PipeLinePool::PipeLinePool(bool _multithread_safe) : multithread_safe(_multithread_safe), tsafe_mutex(nullptr), pl_mutex(nullptr), pipelines(), pipelines_free() { if (multithread_safe) { tsafe_mutex = new pthread_mutex_t; pthread_mutex_init((pthread_mutex_t*)tsafe_mutex, NULL); pl_mutex = new pthread_mutex_t; pthread_mutex_init((pthread_mutex_t*)pl_mutex, NULL); PLP_MUTEX_LOCK(pl_mutex,); } } PipeLinePool::~PipeLinePool() { if (multithread_safe) { PLP_MUTEX_UNLOCK(pl_mutex,); } pthread_mutex_destroy((pthread_mutex_t*)tsafe_mutex); delete (pthread_mutex_t*)tsafe_mutex; tsafe_mutex = nullptr; pthread_mutex_destroy((pthread_mutex_t*)pl_mutex); delete (pthread_mutex_t*)pl_mutex; pl_mutex = nullptr; pipelines_free.clear(); for (pl_set_t::iterator iter = pipelines.begin(); iter != pipelines.end(); ++iter) delete *iter; pipelines.clear(); } void PipeLinePool::manage(PipeLine* pl) { if (pl == nullptr) return; PLP_MUTEX_LOCK(tsafe_mutex,); if (pipelines.find(pl) != pipelines.end()) return; pipelines.insert(pl); pipelines_free.insert(pl); PLP_MUTEX_UNLOCK(tsafe_mutex,); } void PipeLinePool::unmanage(PipeLine* pl) { PLP_MUTEX_LOCK(tsafe_mutex,); pipelines.erase(pl); pipelines_free.erase(pl); PLP_MUTEX_UNLOCK(tsafe_mutex,); } PipeLine* PipeLinePool::get_free() { if (pipelines_free.empty()) { PLP_MUTEX_LOCK(pl_mutex, nullptr); } PLP_MUTEX_LOCK(tsafe_mutex, nullptr); if (pipelines_free.empty()) return nullptr; pl_set_t::iterator iter = pipelines_free.begin(); PipeLine* pl = *iter; pipelines_free.erase(iter); PLP_MUTEX_UNLOCK(tsafe_mutex, nullptr); return pl; } void PipeLinePool::release(PipeLine* pl) { if (pipelines.find(pl) == pipelines.end()) return; if (pipelines_free.find(pl) != pipelines.end()) return; PLP_MUTEX_LOCK(tsafe_mutex,); pipelines_free.insert(pl); PLP_MUTEX_UNLOCK(tsafe_mutex,); PLP_MUTEX_UNLOCK(pl_mutex,); }