#include "PL_Queue.h"
|
#include "logger.h"
|
#include "MaterialBuffer.h"
|
#include <set>
|
#include <queue>
|
#include <string.h>
|
|
struct QBlock
|
{
|
PipeMaterial lastPM;
|
std::vector<MB_Frame> lastMbfs;
|
|
uint8_t* data;
|
size_t maxSize;
|
size_t size;
|
|
QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { }
|
|
QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { }
|
|
~QBlock()
|
{
|
data = nullptr;
|
maxSize = 0;
|
size = 0;
|
}
|
|
void reset()
|
{
|
size = 0;
|
lastPM.reset();
|
lastMbfs.clear();
|
}
|
};
|
|
struct QBlockPool
|
{
|
uint8_t* blocksDataCache;
|
|
typedef std::set<QBlock*> qblock_set_t;
|
qblock_set_t staticPool;
|
qblock_set_t freePool;
|
|
pthread_mutex_t pool_mutex;
|
|
QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex()
|
{
|
}
|
|
QBlockPool(size_t maxBlockCount, size_t maxBlockSize) :
|
blocksDataCache(nullptr)
|
{
|
bp_init(maxBlockCount, maxBlockSize);
|
}
|
|
void bp_init(size_t maxBlockCount, size_t maxBlockSize)
|
{
|
pthread_mutex_init(&pool_mutex, NULL);
|
pthread_mutex_lock(&pool_mutex);
|
|
const size_t totalBytes = maxBlockCount * maxBlockSize;
|
blocksDataCache = new uint8_t[totalBytes];
|
LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes);
|
|
for (size_t i = 0; i < maxBlockCount; i++)
|
{
|
uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize];
|
QBlock* qb = new QBlock(qbData, maxBlockSize);
|
staticPool.insert(qb);
|
freePool.insert(qb);
|
}
|
|
pthread_mutex_unlock(&pool_mutex);
|
}
|
|
void bp_reset()
|
{
|
pthread_mutex_lock(&pool_mutex);
|
|
if (freePool.size() != staticPool.size())
|
{
|
LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL;
|
}
|
|
freePool.clear();
|
|
for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
|
delete *iter;
|
staticPool.clear();
|
|
delete[] blocksDataCache;
|
blocksDataCache = nullptr;
|
|
pthread_mutex_unlock(&pool_mutex);
|
}
|
|
~QBlockPool()
|
{
|
bp_reset();
|
pthread_mutex_destroy(&pool_mutex);
|
}
|
|
QBlock* bp_alloc()
|
{
|
pthread_mutex_lock(&pool_mutex);
|
|
if (freePool.empty())
|
{
|
pthread_mutex_unlock(&pool_mutex);
|
LOG_WARN << "no free blocks" << LOG_ENDL;
|
return nullptr;
|
}
|
|
qblock_set_t::iterator iter = freePool.begin();
|
QBlock* qb = *iter;
|
freePool.erase(iter);
|
|
pthread_mutex_unlock(&pool_mutex);
|
return qb;
|
}
|
|
void bp_free(QBlock* qb)
|
{
|
if (qb == nullptr)
|
return;
|
|
pthread_mutex_lock(&pool_mutex);
|
|
qblock_set_t::iterator iter = staticPool.find(qb);
|
if (iter == staticPool.end())
|
{
|
pthread_mutex_unlock(&pool_mutex);
|
LOG_WARN << "block not belongs to pool" << LOG_ENDL;
|
return;
|
}
|
|
//iter = freePool.find(qb);
|
//if (iter != freePool.end())
|
//{
|
// printf("QBlockPool bp_free block is free");
|
// return;
|
//}
|
|
qb->reset();
|
freePool.insert(qb);
|
pthread_mutex_unlock(&pool_mutex);
|
}
|
|
bool bp_empty() const
|
{
|
return freePool.empty();
|
}
|
};
|
|
struct PL_Queue_Internal
|
{
|
PL_Queue_Config config;
|
QBlockPool poolBlocks;
|
|
typedef std::queue<QBlock*> qblock_queue_t;
|
qblock_queue_t queBlocks;
|
|
pthread_mutex_t* queue_mutex;
|
pthread_mutex_t* sync_full_mutex;
|
pthread_mutex_t* sync_empty_mutex;
|
|
PL_Queue_Internal() :
|
config(), poolBlocks(), queBlocks(),
|
queue_mutex(new pthread_mutex_t),
|
sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
|
{
|
pthread_mutex_init(queue_mutex, NULL);
|
pthread_mutex_init(sync_full_mutex, NULL);
|
pthread_mutex_init(sync_empty_mutex, NULL);
|
}
|
|
~PL_Queue_Internal()
|
{
|
if (queue_mutex != nullptr)
|
{
|
pthread_mutex_destroy(queue_mutex);
|
delete queue_mutex;
|
queue_mutex = nullptr;
|
}
|
|
if (sync_full_mutex != nullptr)
|
{
|
pthread_mutex_destroy(sync_full_mutex);
|
delete sync_full_mutex;
|
sync_full_mutex = nullptr;
|
}
|
|
if (sync_empty_mutex != nullptr)
|
{
|
pthread_mutex_destroy(sync_empty_mutex);
|
delete sync_empty_mutex;
|
sync_empty_mutex = nullptr;
|
}
|
}
|
|
void reset()
|
{
|
if (queue_mutex != nullptr)
|
{
|
pthread_mutex_destroy(queue_mutex);
|
delete queue_mutex;
|
queue_mutex = nullptr;
|
}
|
|
queue_mutex = new pthread_mutex_t;
|
pthread_mutex_init(queue_mutex, NULL);
|
|
if (sync_full_mutex != nullptr)
|
{
|
pthread_mutex_destroy(sync_full_mutex);
|
delete sync_full_mutex;
|
sync_full_mutex = nullptr;
|
}
|
|
sync_full_mutex = new pthread_mutex_t;
|
pthread_mutex_init(sync_full_mutex, NULL);
|
|
if (sync_empty_mutex != nullptr)
|
{
|
pthread_mutex_destroy(sync_empty_mutex);
|
delete sync_empty_mutex;
|
sync_empty_mutex = nullptr;
|
}
|
|
sync_empty_mutex = new pthread_mutex_t;
|
pthread_mutex_init(sync_empty_mutex, NULL);
|
|
poolBlocks.bp_reset();
|
}
|
};
|
|
PipeLineElem* create_PL_Queue()
|
{
|
return new PL_Queue;
|
}
|
|
PL_Queue::PL_Queue() : internal(new PL_Queue_Internal)
|
{
|
}
|
|
PL_Queue::~PL_Queue()
|
{
|
delete (PL_Queue_Internal*)internal;
|
internal= nullptr;
|
}
|
|
bool PL_Queue::init(void* args)
|
{
|
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
|
|
in->reset();
|
if (args != nullptr)
|
{
|
PL_Queue_Config* config = (PL_Queue_Config*)args;
|
in->config = *config;
|
}
|
in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize);
|
|
if (in->config.syncQueueFull)
|
{
|
int ret = pthread_mutex_lock(in->sync_full_mutex);
|
if(ret != 0)
|
{
|
LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret);
|
return false;
|
}
|
}
|
|
if (in->config.syncQueueEmpty)
|
{
|
int ret = pthread_mutex_lock(in->sync_empty_mutex);
|
if(ret != 0)
|
{
|
LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret);
|
return false;
|
}
|
}
|
|
return true;
|
}
|
|
void PL_Queue::finit()
|
{
|
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete
|
}
|
|
bool PL_Queue::pay(const PipeMaterial& pm)
|
{
|
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
|
|
QBlock* qb = nullptr;
|
if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
|
{
|
// there is no available qb
|
|
if (in->config.queueFullDropFrontBlock)
|
{
|
LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL;
|
|
pthread_mutex_lock(in->queue_mutex);
|
qb = in->queBlocks.front();
|
in->queBlocks.pop();
|
pthread_mutex_unlock(in->queue_mutex);
|
|
in->poolBlocks.bp_free(qb);
|
qb = nullptr;
|
}
|
else
|
{
|
if (in->config.syncQueueFull)
|
{
|
LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL;
|
pthread_mutex_lock(in->sync_full_mutex);
|
}
|
}
|
}
|
|
if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
|
{
|
LOG_WARN << "PL_Queue::pay full" << LOG_ENDL;
|
return false;
|
}
|
|
qb = in->poolBlocks.bp_alloc();
|
if (qb == nullptr)
|
{
|
LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL;
|
return false;
|
}
|
|
bool cacheOk = false;
|
|
if (pm.type == PipeMaterial::PMT_BYTES)
|
{
|
qb->lastPM = pm;
|
|
if (in->config.copyData)
|
{
|
memcpy(qb->data, pm.buffer, pm.buffSize);
|
qb->size = pm.buffSize;
|
qb->lastPM.buffer = qb->data;
|
}
|
|
cacheOk = true;
|
}
|
else if (pm.type == PipeMaterial::PMT_FRAME)
|
{
|
qb->lastPM = pm;
|
|
MB_Frame* mbf = (MB_Frame*)pm.buffer;
|
qb->lastMbfs.push_back(*mbf);
|
|
qb->lastPM.buffer = &(qb->lastMbfs[0]);
|
|
if (in->config.copyData)
|
{
|
memcpy(qb->data, mbf->buffer, mbf->buffSize);
|
qb->size = mbf->buffSize;
|
qb->lastMbfs[0].buffer = qb->data;
|
}
|
|
cacheOk = true;
|
}
|
else if (pm.type == PipeMaterial::PMT_FRAME_LIST)
|
{
|
if (in->config.cacheFrameListFunc != nullptr)
|
{
|
size_t s = qb->maxSize;
|
cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s);
|
if (cacheOk)
|
qb->size = s;
|
}
|
}
|
|
if (! cacheOk)
|
{
|
in->poolBlocks.bp_free(qb);
|
|
LOG_WARN << "pm/mbf type not support" << LOG_ENDL;
|
return false;
|
}
|
|
pthread_mutex_lock(in->queue_mutex);
|
in->queBlocks.push(qb);
|
pthread_mutex_unlock(in->queue_mutex);
|
|
if (in->config.syncQueueEmpty)
|
pthread_mutex_unlock(in->sync_empty_mutex);
|
|
return true;
|
}
|
|
void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet)
|
{
|
if (pm->former == nullptr || pm->args == nullptr)
|
return;
|
|
PL_Queue* _this = (PL_Queue*)pm->former;
|
QBlock* qb = (QBlock*)pm->args;
|
PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
|
|
pthread_mutex_lock(in->queue_mutex);//#todo lastRet
|
in->queBlocks.pop();
|
pthread_mutex_unlock(in->queue_mutex);
|
|
in->poolBlocks.bp_free(qb);
|
|
if (in->config.syncQueueFull)
|
pthread_mutex_unlock(in->sync_full_mutex);
|
}
|
|
bool PL_Queue::gain(PipeMaterial& pm)
|
{
|
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
|
|
if (in->queBlocks.empty())
|
{
|
if (in->config.syncQueueEmpty)
|
pthread_mutex_lock(in->sync_empty_mutex);
|
}
|
|
if (in->queBlocks.empty())
|
{
|
LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL;
|
pm.buffer = nullptr;
|
pm.buffSize = 0;
|
pm.former = this;
|
return false;
|
}
|
|
QBlock* qb = nullptr;
|
pthread_mutex_lock(in->queue_mutex);
|
qb = in->queBlocks.front();
|
//in->queBlocks.pop(); // pop in deleter
|
pthread_mutex_unlock(in->queue_mutex);
|
|
pm = qb->lastPM;
|
|
pm.former = this;
|
pm.deleter = pm_deleter_qb;
|
pm.args = qb;
|
return true;
|
}
|