#include "PipeLinePool.h"
|
#include "logger.h"
|
#include <pthread.h>
|
|
#define PLP_MUTEX_LOCK(mut,_ret) if (mut != nullptr) {\
|
int ret = pthread_mutex_lock((pthread_mutex_t*)mut); \
|
if(ret != 0) \
|
{ \
|
LOG_ERROR << "pthread_mutex_lock " << #mut << ": " << ret << std::endl; \
|
return _ret; \
|
} \
|
}
|
|
#define PLP_MUTEX_UNLOCK(mut,_ret) if (mut != nullptr) {\
|
int ret = pthread_mutex_unlock((pthread_mutex_t*)mut); \
|
if(ret != 0) \
|
{ \
|
LOG_ERROR << "pthread_mutex_unlock " << #mut << ": " << ret << std::endl; \
|
return _ret; \
|
} \
|
}
|
|
struct MutexLocker
|
{
|
pthread_mutex_t* mut;
|
MutexLocker(void* _mut) : mut((pthread_mutex_t*)_mut)
|
{
|
PLP_MUTEX_LOCK(mut,);
|
}
|
~MutexLocker()
|
{
|
PLP_MUTEX_UNLOCK(mut,);
|
}
|
};
|
|
PipeLinePool::PipeLinePool(bool _multithread_safe) :
|
multithread_safe(_multithread_safe), tsafe_mutex(nullptr), pl_mutex(nullptr),
|
pipelines(), pipelines_free()
|
{
|
if (multithread_safe)
|
{
|
tsafe_mutex = new pthread_mutex_t;
|
pthread_mutex_init((pthread_mutex_t*)tsafe_mutex, NULL);
|
|
pl_mutex = new pthread_mutex_t;
|
pthread_mutex_init((pthread_mutex_t*)pl_mutex, NULL);
|
|
// in ctor pool is empty
|
PLP_MUTEX_LOCK(pl_mutex,);
|
}
|
}
|
|
PipeLinePool::~PipeLinePool()
|
{
|
if (multithread_safe)
|
{
|
PLP_MUTEX_UNLOCK(pl_mutex,);
|
}
|
|
pthread_mutex_destroy((pthread_mutex_t*)tsafe_mutex);
|
delete (pthread_mutex_t*)tsafe_mutex;
|
tsafe_mutex = nullptr;
|
|
pthread_mutex_destroy((pthread_mutex_t*)pl_mutex);
|
delete (pthread_mutex_t*)pl_mutex;
|
pl_mutex = nullptr;
|
|
pipelines_free.clear();
|
|
for (pl_set_t::iterator iter = pipelines.begin(); iter != pipelines.end(); ++iter)
|
delete *iter;
|
|
pipelines.clear();
|
}
|
|
void PipeLinePool::manage(PipeLine* pl)
|
{
|
if (pl == nullptr)
|
return;
|
|
MutexLocker _ml(tsafe_mutex);
|
|
if (pipelines.find(pl) != pipelines.end())
|
return;
|
|
pipelines.insert(pl);
|
pipelines_free.insert(pl);
|
}
|
|
void PipeLinePool::unmanage(PipeLine* pl)
|
{
|
MutexLocker _ml(tsafe_mutex);
|
|
pipelines.erase(pl);
|
pipelines_free.erase(pl);
|
}
|
|
PipeLine* PipeLinePool::get_free()
|
{
|
MutexLocker _ml(tsafe_mutex);
|
|
if (pipelines_free.empty())
|
return nullptr;
|
|
pl_set_t::iterator iter = pipelines_free.begin();
|
PipeLine* pl = *iter;
|
pipelines_free.erase(iter);
|
|
return pl;
|
}
|
|
void PipeLinePool::release(PipeLine* pl)
|
{
|
MutexLocker _ml(tsafe_mutex);
|
|
if (pipelines.find(pl) == pipelines.end())
|
return;
|
if (pipelines_free.find(pl) != pipelines.end())
|
return;
|
|
pipelines_free.insert(pl);
|
}
|
|
bool PipeLinePool::wait_free()
|
{
|
if (pipelines_free.empty())
|
{
|
PLP_MUTEX_LOCK(pl_mutex, false);
|
}
|
|
return true;
|
}
|
|
bool PipeLinePool::notify_free()
|
{
|
PLP_MUTEX_UNLOCK(pl_mutex, false);
|
}
|