#include "PL_Queue.h" #include "logger.h" #include "MaterialBuffer.h" #include #include #include struct QBlock { PipeMaterial lastPM; std::vector lastMbfs; uint8_t* data; size_t maxSize; size_t size; QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { } QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { } ~QBlock() { data = nullptr; maxSize = 0; size = 0; } void reset() { size = 0; lastPM.reset(); lastMbfs.clear(); } }; struct QBlockPool { uint8_t* blocksDataCache; typedef std::set qblock_set_t; qblock_set_t staticPool; qblock_set_t freePool; pthread_mutex_t pool_mutex; QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex() { } QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : blocksDataCache(nullptr) { bp_init(maxBlockCount, maxBlockSize); } void bp_init(size_t maxBlockCount, size_t maxBlockSize) { pthread_mutex_init(&pool_mutex, NULL); pthread_mutex_lock(&pool_mutex); const size_t totalBytes = maxBlockCount * maxBlockSize; blocksDataCache = new uint8_t[totalBytes]; LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes); for (size_t i = 0; i < maxBlockCount; i++) { uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize]; QBlock* qb = new QBlock(qbData, maxBlockSize); staticPool.insert(qb); freePool.insert(qb); } pthread_mutex_unlock(&pool_mutex); } void bp_reset() { pthread_mutex_lock(&pool_mutex); if (freePool.size() != staticPool.size()) { LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL; } freePool.clear(); for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter) delete *iter; staticPool.clear(); delete[] blocksDataCache; blocksDataCache = nullptr; pthread_mutex_unlock(&pool_mutex); } ~QBlockPool() { bp_reset(); pthread_mutex_destroy(&pool_mutex); } QBlock* bp_alloc() { pthread_mutex_lock(&pool_mutex); if (freePool.empty()) { pthread_mutex_unlock(&pool_mutex); LOG_WARN << "no free blocks" << LOG_ENDL; return nullptr; } qblock_set_t::iterator iter = freePool.begin(); QBlock* qb = *iter; freePool.erase(iter); pthread_mutex_unlock(&pool_mutex); return qb; } void bp_free(QBlock* qb) { if (qb == nullptr) return; pthread_mutex_lock(&pool_mutex); qblock_set_t::iterator iter = staticPool.find(qb); if (iter == staticPool.end()) { pthread_mutex_unlock(&pool_mutex); LOG_WARN << "block not belongs to pool" << LOG_ENDL; return; } //iter = freePool.find(qb); //if (iter != freePool.end()) //{ // printf("QBlockPool bp_free block is free"); // return; //} qb->reset(); freePool.insert(qb); pthread_mutex_unlock(&pool_mutex); } bool bp_empty() const { return freePool.empty(); } }; struct PL_Queue_Internal { PL_Queue_Config config; QBlockPool poolBlocks; typedef std::queue qblock_queue_t; qblock_queue_t queBlocks; pthread_mutex_t* queue_mutex; pthread_mutex_t* sync_full_mutex; pthread_mutex_t* sync_empty_mutex; PL_Queue_Internal() : config(), poolBlocks(), queBlocks(), queue_mutex(new pthread_mutex_t), sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t) { pthread_mutex_init(queue_mutex, NULL); pthread_mutex_init(sync_full_mutex, NULL); pthread_mutex_init(sync_empty_mutex, NULL); } ~PL_Queue_Internal() { if (queue_mutex != nullptr) { pthread_mutex_destroy(queue_mutex); delete queue_mutex; queue_mutex = nullptr; } if (sync_full_mutex != nullptr) { pthread_mutex_destroy(sync_full_mutex); delete sync_full_mutex; sync_full_mutex = nullptr; } if (sync_empty_mutex != nullptr) { pthread_mutex_destroy(sync_empty_mutex); delete sync_empty_mutex; sync_empty_mutex = nullptr; } } void reset() { if (queue_mutex != nullptr) { pthread_mutex_destroy(queue_mutex); delete queue_mutex; queue_mutex = nullptr; } queue_mutex = new pthread_mutex_t; pthread_mutex_init(queue_mutex, NULL); if (sync_full_mutex != nullptr) { pthread_mutex_destroy(sync_full_mutex); delete sync_full_mutex; sync_full_mutex = nullptr; } sync_full_mutex = new pthread_mutex_t; pthread_mutex_init(sync_full_mutex, NULL); if (sync_empty_mutex != nullptr) { pthread_mutex_destroy(sync_empty_mutex); delete sync_empty_mutex; sync_empty_mutex = nullptr; } sync_empty_mutex = new pthread_mutex_t; pthread_mutex_init(sync_empty_mutex, NULL); poolBlocks.bp_reset(); } }; PipeLineElem* create_PL_Queue() { return new PL_Queue; } PL_Queue::PL_Queue() : internal(new PL_Queue_Internal) { } PL_Queue::~PL_Queue() { delete (PL_Queue_Internal*)internal; internal= nullptr; } bool PL_Queue::init(void* args) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; in->reset(); if (args != nullptr) { PL_Queue_Config* config = (PL_Queue_Config*)args; in->config = *config; } in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize); if (in->config.syncQueueFull) { int ret = pthread_mutex_lock(in->sync_full_mutex); if(ret != 0) { LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret); return false; } } if (in->config.syncQueueEmpty) { int ret = pthread_mutex_lock(in->sync_empty_mutex); if(ret != 0) { LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret); return false; } } return true; } void PL_Queue::finit() { PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete } bool PL_Queue::pay(const PipeMaterial& pm) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; QBlock* qb = nullptr; if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty()) { // there is no available qb if (in->config.queueFullDropFrontBlock) { LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL; pthread_mutex_lock(in->queue_mutex); qb = in->queBlocks.front(); in->queBlocks.pop(); pthread_mutex_unlock(in->queue_mutex); in->poolBlocks.bp_free(qb); qb = nullptr; } else { if (in->config.syncQueueFull) { LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL; pthread_mutex_lock(in->sync_full_mutex); } } } if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty()) { LOG_WARN << "PL_Queue::pay full" << LOG_ENDL; return false; } qb = in->poolBlocks.bp_alloc(); if (qb == nullptr) { LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL; return false; } bool cacheOk = false; if (pm.type == PipeMaterial::PMT_BYTES) { qb->lastPM = pm; if (in->config.copyData) { memcpy(qb->data, pm.buffer, pm.buffSize); qb->size = pm.buffSize; qb->lastPM.buffer = qb->data; } cacheOk = true; } else if (pm.type == PipeMaterial::PMT_FRAME) { qb->lastPM = pm; MB_Frame* mbf = (MB_Frame*)pm.buffer; qb->lastMbfs.push_back(*mbf); qb->lastPM.buffer = &(qb->lastMbfs[0]); if (in->config.copyData) { memcpy(qb->data, mbf->buffer, mbf->buffSize); qb->size = mbf->buffSize; qb->lastMbfs[0].buffer = qb->data; } cacheOk = true; } else if (pm.type == PipeMaterial::PMT_FRAME_LIST) { if (in->config.cacheFrameListFunc != nullptr) { size_t s = qb->maxSize; cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s); if (cacheOk) qb->size = s; } } if (! cacheOk) { in->poolBlocks.bp_free(qb); LOG_WARN << "pm/mbf type not support" << LOG_ENDL; return false; } pthread_mutex_lock(in->queue_mutex); in->queBlocks.push(qb); pthread_mutex_unlock(in->queue_mutex); if (in->config.syncQueueEmpty) pthread_mutex_unlock(in->sync_empty_mutex); return true; } void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet) { if (pm->former == nullptr || pm->args == nullptr) return; PL_Queue* _this = (PL_Queue*)pm->former; QBlock* qb = (QBlock*)pm->args; PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal; pthread_mutex_lock(in->queue_mutex);//#todo lastRet in->queBlocks.pop(); pthread_mutex_unlock(in->queue_mutex); in->poolBlocks.bp_free(qb); if (in->config.syncQueueFull) pthread_mutex_unlock(in->sync_full_mutex); } bool PL_Queue::gain(PipeMaterial& pm) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; if (in->queBlocks.empty()) { if (in->config.syncQueueEmpty) pthread_mutex_lock(in->sync_empty_mutex); } if (in->queBlocks.empty()) { LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL; pm.buffer = nullptr; pm.buffSize = 0; pm.former = this; return false; } QBlock* qb = nullptr; pthread_mutex_lock(in->queue_mutex); qb = in->queBlocks.front(); //in->queBlocks.pop(); // pop in deleter pthread_mutex_unlock(in->queue_mutex); pm = qb->lastPM; pm.former = this; pm.deleter = pm_deleter_qb; pm.args = qb; return true; }