| | |
| | | #include "PL_Queue.h"
|
| | | #include "logger.h"
|
| | | #include "MaterialBuffer.h"
|
| | | #include <set>
|
| | | #include <queue>
|
| | | #include <string.h>
|
| | |
|
| | | struct QBlock
|
| | | {
|
| | | PipeMaterial lastPM;
|
| | | std::vector<MB_Frame> lastMbfs;
|
| | |
|
| | | uint8_t* data;
|
| | | size_t maxSize;
|
| | | size_t size;
|
| | |
|
| | | QBlock() : data(nullptr), maxSize(0), size(0) { }
|
| | | QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { }
|
| | |
|
| | | QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
|
| | | QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { }
|
| | |
|
| | | ~QBlock()
|
| | | {
|
| | |
| | | maxSize = 0;
|
| | | size = 0;
|
| | | }
|
| | |
|
| | | void reset()
|
| | | {
|
| | | size = 0;
|
| | | lastPM.reset();
|
| | | lastMbfs.clear();
|
| | | }
|
| | | };
|
| | |
|
| | | struct QBlockPool
|
| | | {
|
| | | uint8_t* blocksDataPool;
|
| | | uint8_t* blocksDataCache;
|
| | |
|
| | | typedef std::set<QBlock*> qpool_t;
|
| | | qpool_t staticPool;
|
| | | qpool_t freePool;
|
| | | |
| | | QBlockPool() : blocksDataPool(nullptr)
|
| | | typedef std::set<QBlock*> qblock_set_t;
|
| | | qblock_set_t staticPool;
|
| | | qblock_set_t freePool;
|
| | |
|
| | | pthread_mutex_t pool_mutex;
|
| | |
|
| | | QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex()
|
| | | {
|
| | | }
|
| | |
|
| | | QBlockPool(size_t maxBlockCount, size_t maxBlockSize) :
|
| | | blocksDataPool(nullptr)
|
| | | blocksDataCache(nullptr)
|
| | | {
|
| | | bp_init(maxBlockCount, maxBlockSize);
|
| | | }
|
| | |
|
| | | void bp_init(size_t maxBlockCount, size_t maxBlockSize)
|
| | | {
|
| | | const size_t totalBytes = maxBlockCount * maxBlockSize;
|
| | | blocksDataPool = new uint8_t[totalBytes];
|
| | | printf("QBlockPool allocate byte: %u\n", totalBytes);
|
| | | pthread_mutex_init(&pool_mutex, NULL);
|
| | | pthread_mutex_lock(&pool_mutex);
|
| | |
|
| | | const size_t totalBytes = maxBlockCount * maxBlockSize;
|
| | | blocksDataCache = new uint8_t[totalBytes];
|
| | | LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes);
|
| | |
|
| | | for (size_t i = 0; i < maxBlockCount; i++)
|
| | | {
|
| | | uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
|
| | | uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize];
|
| | | QBlock* qb = new QBlock(qbData, maxBlockSize);
|
| | | staticPool.insert(qb);
|
| | | freePool.insert(qb);
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | pthread_mutex_unlock(&pool_mutex);
|
| | | }
|
| | |
|
| | | void bp_reset()
|
| | | {
|
| | | pthread_mutex_lock(&pool_mutex);
|
| | |
|
| | | if (freePool.size() != staticPool.size())
|
| | | {
|
| | | LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL;
|
| | | }
|
| | |
|
| | | freePool.clear();
|
| | |
|
| | | for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
|
| | | delete *iter;
|
| | | staticPool.clear();
|
| | |
|
| | | delete[] blocksDataCache;
|
| | | blocksDataCache = nullptr;
|
| | |
|
| | | pthread_mutex_unlock(&pool_mutex);
|
| | | }
|
| | |
|
| | | ~QBlockPool()
|
| | | {
|
| | | if (freePool.size() != staticPool.size())
|
| | | {
|
| | | printf("QBlockPool memory leakage\n");
|
| | | }
|
| | | |
| | | for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
|
| | | {
|
| | | delete *iter;
|
| | | }
|
| | | |
| | | delete[] blocksDataPool;
|
| | | blocksDataPool = nullptr;
|
| | | }
|
| | | bp_reset();
|
| | | pthread_mutex_destroy(&pool_mutex);
|
| | | }
|
| | |
|
| | | QBlock* bp_alloc()
|
| | | {
|
| | | if (freePool.empty())
|
| | | pthread_mutex_lock(&pool_mutex);
|
| | |
|
| | | if (freePool.empty())
|
| | | {
|
| | | printf("QBlockPool bp_alloc empty\n");
|
| | | pthread_mutex_unlock(&pool_mutex);
|
| | | LOG_WARN << "no free blocks" << LOG_ENDL;
|
| | | return nullptr;
|
| | | }
|
| | | |
| | | qpool_t::iterator iter = freePool.begin();
|
| | |
|
| | | qblock_set_t::iterator iter = freePool.begin();
|
| | | QBlock* qb = *iter;
|
| | | freePool.erase(iter);
|
| | | return qb;
|
| | | }
|
| | |
|
| | | pthread_mutex_unlock(&pool_mutex);
|
| | | return qb;
|
| | | }
|
| | |
|
| | | void bp_free(QBlock* qb)
|
| | | {
|
| | | if (qb == nullptr)
|
| | | return;
|
| | | |
| | | qpool_t::iterator iter = staticPool.find(qb);
|
| | |
|
| | | pthread_mutex_lock(&pool_mutex);
|
| | |
|
| | | qblock_set_t::iterator iter = staticPool.find(qb);
|
| | | if (iter == staticPool.end())
|
| | | {
|
| | | printf("QBlockPool bp_free not in pool\n");
|
| | | pthread_mutex_unlock(&pool_mutex);
|
| | | LOG_WARN << "block not belongs to pool" << LOG_ENDL;
|
| | | return;
|
| | | }
|
| | |
|
| | |
| | | // return;
|
| | | //}
|
| | |
|
| | | qb->size = 0;
|
| | | qb->reset();
|
| | | freePool.insert(qb);
|
| | | }
|
| | | pthread_mutex_unlock(&pool_mutex);
|
| | | }
|
| | |
|
| | | bool bp_empty() const
|
| | | {
|
| | |
| | | struct PL_Queue_Internal
|
| | | {
|
| | | PL_Queue_Config config;
|
| | | QBlockPool pool;
|
| | | QBlockPool poolBlocks;
|
| | |
|
| | | typedef std::queue<QBlock*> queue_t;
|
| | | queue_t que;
|
| | | typedef std::queue<QBlock*> qblock_queue_t;
|
| | | qblock_queue_t queBlocks;
|
| | |
|
| | | pthread_mutex_t* queue_pool_mutex;
|
| | | pthread_mutex_t* queue_mutex;
|
| | | pthread_mutex_t* sync_full_mutex;
|
| | | pthread_mutex_t* sync_empty_mutex;
|
| | |
|
| | | PL_Queue_Internal() :
|
| | | config(), pool(), que(), |
| | | queue_pool_mutex(new pthread_mutex_t), |
| | | config(), poolBlocks(), queBlocks(),
|
| | | queue_mutex(new pthread_mutex_t),
|
| | | sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
|
| | | {
|
| | | pthread_mutex_init(queue_pool_mutex, NULL);
|
| | | pthread_mutex_init(queue_mutex, NULL);
|
| | | pthread_mutex_init(sync_full_mutex, NULL);
|
| | | pthread_mutex_init(sync_empty_mutex, NULL);
|
| | | }
|
| | |
|
| | | ~PL_Queue_Internal()
|
| | | {
|
| | | if (queue_pool_mutex != nullptr)
|
| | | if (queue_mutex != nullptr)
|
| | | {
|
| | | pthread_mutex_destroy(queue_pool_mutex);
|
| | | delete queue_pool_mutex;
|
| | | queue_pool_mutex = nullptr;
|
| | | pthread_mutex_destroy(queue_mutex);
|
| | | delete queue_mutex;
|
| | | queue_mutex = nullptr;
|
| | | }
|
| | |
|
| | | if (sync_full_mutex != nullptr)
|
| | |
| | |
|
| | | void reset()
|
| | | {
|
| | | if (queue_pool_mutex != nullptr)
|
| | | if (queue_mutex != nullptr)
|
| | | {
|
| | | pthread_mutex_destroy(queue_pool_mutex);
|
| | | delete queue_pool_mutex;
|
| | | queue_pool_mutex = nullptr;
|
| | | pthread_mutex_destroy(queue_mutex);
|
| | | delete queue_mutex;
|
| | | queue_mutex = nullptr;
|
| | | }
|
| | |
|
| | | queue_pool_mutex = new pthread_mutex_t;
|
| | | pthread_mutex_init(queue_pool_mutex, NULL);
|
| | | queue_mutex = new pthread_mutex_t;
|
| | | pthread_mutex_init(queue_mutex, NULL);
|
| | |
|
| | | if (sync_full_mutex != nullptr)
|
| | | {
|
| | |
| | |
|
| | | sync_empty_mutex = new pthread_mutex_t;
|
| | | pthread_mutex_init(sync_empty_mutex, NULL);
|
| | | |
| | | //#todo free que
|
| | |
|
| | | poolBlocks.bp_reset();
|
| | | }
|
| | | };
|
| | |
|
| | |
| | | bool PL_Queue::init(void* args)
|
| | | {
|
| | | PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
|
| | | PL_Queue_Config* config = (PL_Queue_Config*)args;
|
| | | |
| | |
|
| | | in->reset();
|
| | | in->config = *config;
|
| | | in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
|
| | | if (args != nullptr)
|
| | | {
|
| | | PL_Queue_Config* config = (PL_Queue_Config*)args;
|
| | | in->config = *config;
|
| | | }
|
| | | in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize);
|
| | |
|
| | | if (config->syncQueueFull)
|
| | | if (in->config.syncQueueFull)
|
| | | {
|
| | | int ret = pthread_mutex_lock(in->sync_full_mutex);
|
| | | if(ret != 0)
|
| | | {
|
| | | printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
|
| | | LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret);
|
| | | return false;
|
| | | }
|
| | | }
|
| | |
|
| | | if (config->syncQueueEmpty)
|
| | | if (in->config.syncQueueEmpty)
|
| | | {
|
| | | int ret = pthread_mutex_lock(in->sync_empty_mutex);
|
| | | if(ret != 0)
|
| | | {
|
| | | printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
|
| | | LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret);
|
| | | return false;
|
| | | }
|
| | | }
|
| | |
| | | PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
|
| | |
|
| | | QBlock* qb = nullptr;
|
| | | if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
|
| | | if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
|
| | | {
|
| | | // there is no available qb
|
| | |
|
| | | if (in->config.queueFullDropBlock)
|
| | | if (in->config.queueFullDropFrontBlock)
|
| | | {
|
| | | printf("PL_Queue::pay queueFullDropBlock\n");
|
| | | LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL;
|
| | |
|
| | | pthread_mutex_lock(in->queue_pool_mutex);
|
| | | qb = in->que.front();
|
| | | in->que.pop();
|
| | | |
| | | in->pool.bp_free(qb);
|
| | | pthread_mutex_lock(in->queue_mutex);
|
| | | qb = in->queBlocks.front();
|
| | | in->queBlocks.pop();
|
| | | pthread_mutex_unlock(in->queue_mutex);
|
| | |
|
| | | in->poolBlocks.bp_free(qb);
|
| | | qb = nullptr;
|
| | | pthread_mutex_unlock(in->queue_pool_mutex);
|
| | | }
|
| | | else
|
| | | {
|
| | | if (in->config.syncQueueFull)
|
| | | {
|
| | | printf("PL_Queue::pay sync by sync_full_mutex\n");
|
| | | LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL;
|
| | | pthread_mutex_lock(in->sync_full_mutex);
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
|
| | | if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
|
| | | {
|
| | | printf("PL_Queue::pay full\n");
|
| | | LOG_WARN << "PL_Queue::pay full" << LOG_ENDL;
|
| | | return false;
|
| | | }
|
| | |
|
| | | pthread_mutex_lock(in->queue_pool_mutex);
|
| | | qb = in->pool.bp_alloc();
|
| | | if (qb != nullptr)
|
| | | in->que.push(qb);
|
| | | pthread_mutex_unlock(in->queue_pool_mutex);
|
| | | |
| | | qb = in->poolBlocks.bp_alloc();
|
| | | if (qb == nullptr)
|
| | | {
|
| | | printf("PL_Queue::pay bp_alloc nullptr\n");
|
| | | LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL;
|
| | | return false;
|
| | | }
|
| | | |
| | | if (in->config.copyData)
|
| | | {
|
| | | memcpy(qb->data, pm.buffer, pm.buffSize);
|
| | | qb->size = pm.buffSize;
|
| | | }
|
| | | |
| | |
|
| | | bool cacheOk = false;
|
| | |
|
| | | if (pm.type == PipeMaterial::PMT_BYTES)
|
| | | {
|
| | | qb->lastPM = pm;
|
| | |
|
| | | if (in->config.copyData)
|
| | | {
|
| | | memcpy(qb->data, pm.buffer, pm.buffSize);
|
| | | qb->size = pm.buffSize;
|
| | | qb->lastPM.buffer = qb->data;
|
| | | }
|
| | |
|
| | | cacheOk = true;
|
| | | }
|
| | | else if (pm.type == PipeMaterial::PMT_FRAME)
|
| | | {
|
| | | qb->lastPM = pm;
|
| | |
|
| | | MB_Frame* mbf = (MB_Frame*)pm.buffer;
|
| | | qb->lastMbfs.push_back(*mbf);
|
| | |
|
| | | qb->lastPM.buffer = &(qb->lastMbfs[0]);
|
| | |
|
| | | if (in->config.copyData)
|
| | | {
|
| | | memcpy(qb->data, mbf->buffer, mbf->buffSize);
|
| | | qb->size = mbf->buffSize;
|
| | | qb->lastMbfs[0].buffer = qb->data;
|
| | | }
|
| | |
|
| | | cacheOk = true;
|
| | | }
|
| | | else if (pm.type == PipeMaterial::PMT_FRAME_LIST)
|
| | | {
|
| | | if (in->config.cacheFrameListFunc != nullptr)
|
| | | {
|
| | | size_t s = qb->maxSize;
|
| | | cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s);
|
| | | if (cacheOk)
|
| | | qb->size = s;
|
| | | }
|
| | | }
|
| | |
|
| | | if (! cacheOk)
|
| | | {
|
| | | in->poolBlocks.bp_free(qb);
|
| | |
|
| | | LOG_WARN << "pm/mbf type not support" << LOG_ENDL;
|
| | | return false;
|
| | | }
|
| | |
|
| | | pthread_mutex_lock(in->queue_mutex);
|
| | | in->queBlocks.push(qb);
|
| | | pthread_mutex_unlock(in->queue_mutex);
|
| | |
|
| | | if (in->config.syncQueueEmpty)
|
| | | pthread_mutex_unlock(in->sync_empty_mutex);
|
| | |
|
| | | return true;
|
| | | }
|
| | |
|
| | | void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
|
| | | void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet)
|
| | | {
|
| | | if (pm->former == nullptr || pm->args == nullptr)
|
| | | return;
|
| | |
| | | PL_Queue* _this = (PL_Queue*)pm->former;
|
| | | QBlock* qb = (QBlock*)pm->args;
|
| | | PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
|
| | | |
| | | pthread_mutex_lock(in->queue_pool_mutex);
|
| | | in->pool.bp_free(qb);
|
| | | pthread_mutex_unlock(in->queue_pool_mutex);
|
| | |
|
| | | pthread_mutex_lock(in->queue_mutex);//#todo lastRet
|
| | | in->queBlocks.pop();
|
| | | pthread_mutex_unlock(in->queue_mutex);
|
| | |
|
| | | in->poolBlocks.bp_free(qb);
|
| | |
|
| | | if (in->config.syncQueueFull)
|
| | | pthread_mutex_unlock(in->sync_full_mutex);
|
| | | }
|
| | |
|
| | | bool PL_Queue::gain(PipeMaterial& pm)
|
| | | {
|
| | | PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
|
| | |
|
| | | if (in->que.empty())
|
| | | if (in->queBlocks.empty())
|
| | | {
|
| | | if (in->config.syncQueueEmpty)
|
| | | pthread_mutex_lock(in->sync_empty_mutex);
|
| | | }
|
| | |
|
| | | if (in->que.empty())
|
| | | if (in->queBlocks.empty())
|
| | | {
|
| | | printf("PL_Queue::gain empty\n");
|
| | | LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL;
|
| | | pm.buffer = nullptr;
|
| | | pm.buffSize = 0;
|
| | | pm.former = this;
|
| | |
| | | }
|
| | |
|
| | | QBlock* qb = nullptr;
|
| | | pthread_mutex_lock(in->queue_pool_mutex);
|
| | | qb = in->que.front();
|
| | | in->que.pop();
|
| | | pthread_mutex_unlock(in->queue_pool_mutex);
|
| | | |
| | | if (in->config.syncQueueFull)
|
| | | pthread_mutex_unlock(in->sync_full_mutex);
|
| | | |
| | | pm.type = PipeMaterial::PMT_BYTES;
|
| | | pm.buffer = qb->data;
|
| | | pm.buffSize = qb->size;
|
| | | pm.former = this;
|
| | | pm.deleter = pm_deleter_qb;
|
| | | pm.args = qb;
|
| | | pthread_mutex_lock(in->queue_mutex);
|
| | | qb = in->queBlocks.front();
|
| | | //in->queBlocks.pop(); // pop in deleter
|
| | | pthread_mutex_unlock(in->queue_mutex);
|
| | |
|
| | | pm = qb->lastPM;
|
| | |
|
| | | pm.former = this;
|
| | | pm.deleter = pm_deleter_qb;
|
| | | pm.args = qb;
|
| | | return true;
|
| | | }
|