#include "PipeLinePool.h" #include "logger.h" #include #define PLP_MUTEX_LOCK(mut,_ret) if (mut != nullptr) {\ int ret = pthread_mutex_lock((pthread_mutex_t*)mut); \ if(ret != 0) \ { \ LOG_ERROR << "pthread_mutex_lock " << #mut << ": " << ret << std::endl; \ return _ret; \ } \ } #define PLP_MUTEX_UNLOCK(mut,_ret) if (mut != nullptr) {\ int ret = pthread_mutex_unlock((pthread_mutex_t*)mut); \ if(ret != 0) \ { \ LOG_ERROR << "pthread_mutex_unlock " << #mut << ": " << ret << std::endl; \ return _ret; \ } \ } struct MutexLocker { pthread_mutex_t* mut; MutexLocker(void* _mut) : mut((pthread_mutex_t*)_mut) { PLP_MUTEX_LOCK(mut,); } ~MutexLocker() { PLP_MUTEX_UNLOCK(mut,); } }; PipeLinePool::PipeLinePool(bool _multithread_safe) : multithread_safe(_multithread_safe), tsafe_mutex(nullptr), pl_mutex(nullptr), pipelines(), pipelines_free() { if (multithread_safe) { tsafe_mutex = new pthread_mutex_t; pthread_mutex_init((pthread_mutex_t*)tsafe_mutex, NULL); pl_mutex = new pthread_mutex_t; pthread_mutex_init((pthread_mutex_t*)pl_mutex, NULL); // in ctor pool is empty PLP_MUTEX_LOCK(pl_mutex,); } } PipeLinePool::~PipeLinePool() { if (multithread_safe) { PLP_MUTEX_UNLOCK(pl_mutex,); } pthread_mutex_destroy((pthread_mutex_t*)tsafe_mutex); delete (pthread_mutex_t*)tsafe_mutex; tsafe_mutex = nullptr; pthread_mutex_destroy((pthread_mutex_t*)pl_mutex); delete (pthread_mutex_t*)pl_mutex; pl_mutex = nullptr; pipelines_free.clear(); for (pl_set_t::iterator iter = pipelines.begin(); iter != pipelines.end(); ++iter) delete *iter; pipelines.clear(); } void PipeLinePool::manage(PipeLine* pl) { if (pl == nullptr) return; MutexLocker _ml(tsafe_mutex); if (pipelines.find(pl) != pipelines.end()) return; pipelines.insert(pl); pipelines_free.insert(pl); } void PipeLinePool::unmanage(PipeLine* pl) { MutexLocker _ml(tsafe_mutex); pipelines.erase(pl); pipelines_free.erase(pl); } PipeLine* PipeLinePool::get_free() { MutexLocker _ml(tsafe_mutex); if (pipelines_free.empty()) return nullptr; pl_set_t::iterator iter = pipelines_free.begin(); PipeLine* pl = *iter; pipelines_free.erase(iter); return pl; } void PipeLinePool::release(PipeLine* pl) { MutexLocker _ml(tsafe_mutex); if (pipelines.find(pl) == pipelines.end()) return; if (pipelines_free.find(pl) != pipelines.end()) return; pipelines_free.insert(pl); } bool PipeLinePool::wait_free() { if (pipelines_free.empty()) { PLP_MUTEX_LOCK(pl_mutex, false); } return true; } bool PipeLinePool::notify_free() { PLP_MUTEX_UNLOCK(pl_mutex, false); }