From b022b91c0c6fa807424b6c12cc92ac5946838083 Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期四, 13 七月 2017 16:34:39 +0800 Subject: [PATCH] update pipeline --- RtspFace/PL_Queue.cpp | 316 +++++++++++++++++++++++++++++++++------------------- 1 files changed, 202 insertions(+), 114 deletions(-) diff --git a/RtspFace/PL_Queue.cpp b/RtspFace/PL_Queue.cpp index 7965877..aa43adb 100644 --- a/RtspFace/PL_Queue.cpp +++ b/RtspFace/PL_Queue.cpp @@ -1,17 +1,22 @@ #include "PL_Queue.h" +#include "logger.h" +#include "MaterialBuffer.h" #include <set> #include <queue> #include <string.h> struct QBlock { + PipeMaterial lastPM; + std::vector<MB_Frame> lastMbfs; + uint8_t* data; size_t maxSize; size_t size; - QBlock() : data(nullptr), maxSize(0), size(0) { } + QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { } - QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { } + QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { } ~QBlock() { @@ -19,80 +24,113 @@ maxSize = 0; size = 0; } + + void reset() + { + size = 0; + lastPM.reset(); + lastMbfs.clear(); + } }; struct QBlockPool { - uint8_t* blocksDataPool; + uint8_t* blocksDataCache; - typedef std::set<QBlock*> qpool_t; - qpool_t staticPool; - qpool_t freePool; - - QBlockPool() : blocksDataPool(nullptr) + typedef std::set<QBlock*> qblock_set_t; + qblock_set_t staticPool; + qblock_set_t freePool; + + pthread_mutex_t pool_mutex; + + QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex() { } QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : - blocksDataPool(nullptr) + blocksDataCache(nullptr) { bp_init(maxBlockCount, maxBlockSize); } void bp_init(size_t maxBlockCount, size_t maxBlockSize) { - const size_t totalBytes = maxBlockCount * maxBlockSize; - blocksDataPool = new uint8_t[totalBytes]; - printf("QBlockPool allocate byte: %u\n", totalBytes); + pthread_mutex_init(&pool_mutex, NULL); + pthread_mutex_lock(&pool_mutex); + + const size_t totalBytes = maxBlockCount * maxBlockSize; + blocksDataCache = new uint8_t[totalBytes]; + LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes); for (size_t i = 0; i < maxBlockCount; i++) { - uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize]; + uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize]; QBlock* qb = new QBlock(qbData, maxBlockSize); staticPool.insert(qb); freePool.insert(qb); } - } - + + pthread_mutex_unlock(&pool_mutex); + } + + void bp_reset() + { + pthread_mutex_lock(&pool_mutex); + + if (freePool.size() != staticPool.size()) + { + LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL; + } + + freePool.clear(); + + for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter) + delete *iter; + staticPool.clear(); + + delete[] blocksDataCache; + blocksDataCache = nullptr; + + pthread_mutex_unlock(&pool_mutex); + } + ~QBlockPool() { - if (freePool.size() != staticPool.size()) - { - printf("QBlockPool memory leakage\n"); - } - - for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter) - { - delete *iter; - } - - delete[] blocksDataPool; - blocksDataPool = nullptr; - } + bp_reset(); + pthread_mutex_destroy(&pool_mutex); + } QBlock* bp_alloc() { - if (freePool.empty()) + pthread_mutex_lock(&pool_mutex); + + if (freePool.empty()) { - printf("QBlockPool bp_alloc empty\n"); + pthread_mutex_unlock(&pool_mutex); + LOG_WARN << "no free blocks" << LOG_ENDL; return nullptr; } - - qpool_t::iterator iter = freePool.begin(); + + qblock_set_t::iterator iter = freePool.begin(); QBlock* qb = *iter; freePool.erase(iter); - return qb; - } + + pthread_mutex_unlock(&pool_mutex); + return qb; + } void bp_free(QBlock* qb) { if (qb == nullptr) return; - - qpool_t::iterator iter = staticPool.find(qb); + + pthread_mutex_lock(&pool_mutex); + + qblock_set_t::iterator iter = staticPool.find(qb); if (iter == staticPool.end()) { - printf("QBlockPool bp_free not in pool\n"); + pthread_mutex_unlock(&pool_mutex); + LOG_WARN << "block not belongs to pool" << LOG_ENDL; return; } @@ -103,9 +141,10 @@ // return; //} - qb->size = 0; + qb->reset(); freePool.insert(qb); - } + pthread_mutex_unlock(&pool_mutex); + } bool bp_empty() const { @@ -116,32 +155,32 @@ struct PL_Queue_Internal { PL_Queue_Config config; - QBlockPool pool; + QBlockPool poolBlocks; - typedef std::queue<QBlock*> queue_t; - queue_t que; + typedef std::queue<QBlock*> qblock_queue_t; + qblock_queue_t queBlocks; - pthread_mutex_t* queue_pool_mutex; + pthread_mutex_t* queue_mutex; pthread_mutex_t* sync_full_mutex; pthread_mutex_t* sync_empty_mutex; PL_Queue_Internal() : - config(), pool(), que(), - queue_pool_mutex(new pthread_mutex_t), + config(), poolBlocks(), queBlocks(), + queue_mutex(new pthread_mutex_t), sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t) { - pthread_mutex_init(queue_pool_mutex, NULL); + pthread_mutex_init(queue_mutex, NULL); pthread_mutex_init(sync_full_mutex, NULL); pthread_mutex_init(sync_empty_mutex, NULL); } ~PL_Queue_Internal() { - if (queue_pool_mutex != nullptr) + if (queue_mutex != nullptr) { - pthread_mutex_destroy(queue_pool_mutex); - delete queue_pool_mutex; - queue_pool_mutex = nullptr; + pthread_mutex_destroy(queue_mutex); + delete queue_mutex; + queue_mutex = nullptr; } if (sync_full_mutex != nullptr) @@ -161,15 +200,15 @@ void reset() { - if (queue_pool_mutex != nullptr) + if (queue_mutex != nullptr) { - pthread_mutex_destroy(queue_pool_mutex); - delete queue_pool_mutex; - queue_pool_mutex = nullptr; + pthread_mutex_destroy(queue_mutex); + delete queue_mutex; + queue_mutex = nullptr; } - queue_pool_mutex = new pthread_mutex_t; - pthread_mutex_init(queue_pool_mutex, NULL); + queue_mutex = new pthread_mutex_t; + pthread_mutex_init(queue_mutex, NULL); if (sync_full_mutex != nullptr) { @@ -190,8 +229,8 @@ sync_empty_mutex = new pthread_mutex_t; pthread_mutex_init(sync_empty_mutex, NULL); - - //#todo free que + + poolBlocks.bp_reset(); } }; @@ -213,28 +252,31 @@ bool PL_Queue::init(void* args) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; - PL_Queue_Config* config = (PL_Queue_Config*)args; - + in->reset(); - in->config = *config; - in->pool.bp_init(config->maxBlockCount, config->maxBlockSize); + if (args != nullptr) + { + PL_Queue_Config* config = (PL_Queue_Config*)args; + in->config = *config; + } + in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize); - if (config->syncQueueFull) + if (in->config.syncQueueFull) { int ret = pthread_mutex_lock(in->sync_full_mutex); if(ret != 0) { - printf("pthread_mutex_lock sync_full_mutex: %d\n", ret); + LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret); return false; } } - if (config->syncQueueEmpty) + if (in->config.syncQueueEmpty) { int ret = pthread_mutex_lock(in->sync_empty_mutex); if(ret != 0) { - printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret); + LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret); return false; } } @@ -252,63 +294,108 @@ PL_Queue_Internal* in = (PL_Queue_Internal*)internal; QBlock* qb = nullptr; - if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty()) + if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty()) { // there is no available qb - if (in->config.queueFullDropBlock) + if (in->config.queueFullDropFrontBlock) { - printf("PL_Queue::pay queueFullDropBlock\n"); + LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL; - pthread_mutex_lock(in->queue_pool_mutex); - qb = in->que.front(); - in->que.pop(); - - in->pool.bp_free(qb); + pthread_mutex_lock(in->queue_mutex); + qb = in->queBlocks.front(); + in->queBlocks.pop(); + pthread_mutex_unlock(in->queue_mutex); + + in->poolBlocks.bp_free(qb); qb = nullptr; - pthread_mutex_unlock(in->queue_pool_mutex); } else { if (in->config.syncQueueFull) { - printf("PL_Queue::pay sync by sync_full_mutex\n"); + LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL; pthread_mutex_lock(in->sync_full_mutex); } } } - if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty()) + if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty()) { - printf("PL_Queue::pay full\n"); + LOG_WARN << "PL_Queue::pay full" << LOG_ENDL; return false; } - pthread_mutex_lock(in->queue_pool_mutex); - qb = in->pool.bp_alloc(); - if (qb != nullptr) - in->que.push(qb); - pthread_mutex_unlock(in->queue_pool_mutex); - + qb = in->poolBlocks.bp_alloc(); if (qb == nullptr) { - printf("PL_Queue::pay bp_alloc nullptr\n"); + LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL; return false; } - - if (in->config.copyData) - { - memcpy(qb->data, pm.buffer, pm.buffSize); - qb->size = pm.buffSize; - } - + + bool cacheOk = false; + + if (pm.type == PipeMaterial::PMT_BYTES) + { + qb->lastPM = pm; + + if (in->config.copyData) + { + memcpy(qb->data, pm.buffer, pm.buffSize); + qb->size = pm.buffSize; + qb->lastPM.buffer = qb->data; + } + + cacheOk = true; + } + else if (pm.type == PipeMaterial::PMT_FRAME) + { + qb->lastPM = pm; + + MB_Frame* mbf = (MB_Frame*)pm.buffer; + qb->lastMbfs.push_back(*mbf); + + qb->lastPM.buffer = &(qb->lastMbfs[0]); + + if (in->config.copyData) + { + memcpy(qb->data, mbf->buffer, mbf->buffSize); + qb->size = mbf->buffSize; + qb->lastMbfs[0].buffer = qb->data; + } + + cacheOk = true; + } + else if (pm.type == PipeMaterial::PMT_FRAME_LIST) + { + if (in->config.cacheFrameListFunc != nullptr) + { + size_t s = qb->maxSize; + cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s); + if (cacheOk) + qb->size = s; + } + } + + if (! cacheOk) + { + in->poolBlocks.bp_free(qb); + + LOG_WARN << "pm/mbf type not support" << LOG_ENDL; + return false; + } + + pthread_mutex_lock(in->queue_mutex); + in->queBlocks.push(qb); + pthread_mutex_unlock(in->queue_mutex); + if (in->config.syncQueueEmpty) pthread_mutex_unlock(in->sync_empty_mutex); return true; } -void PL_Queue::pm_deleter_qb(PipeMaterial* pm) +void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet) { if (pm->former == nullptr || pm->args == nullptr) return; @@ -316,25 +403,30 @@ PL_Queue* _this = (PL_Queue*)pm->former; QBlock* qb = (QBlock*)pm->args; PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal; - - pthread_mutex_lock(in->queue_pool_mutex); - in->pool.bp_free(qb); - pthread_mutex_unlock(in->queue_pool_mutex); + + pthread_mutex_lock(in->queue_mutex);//#todo lastRet + in->queBlocks.pop(); + pthread_mutex_unlock(in->queue_mutex); + + in->poolBlocks.bp_free(qb); + + if (in->config.syncQueueFull) + pthread_mutex_unlock(in->sync_full_mutex); } bool PL_Queue::gain(PipeMaterial& pm) { PL_Queue_Internal* in = (PL_Queue_Internal*)internal; - if (in->que.empty()) + if (in->queBlocks.empty()) { if (in->config.syncQueueEmpty) pthread_mutex_lock(in->sync_empty_mutex); } - if (in->que.empty()) + if (in->queBlocks.empty()) { - printf("PL_Queue::gain empty\n"); + LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL; pm.buffer = nullptr; pm.buffSize = 0; pm.former = this; @@ -342,19 +434,15 @@ } QBlock* qb = nullptr; - pthread_mutex_lock(in->queue_pool_mutex); - qb = in->que.front(); - in->que.pop(); - pthread_mutex_unlock(in->queue_pool_mutex); - - if (in->config.syncQueueFull) - pthread_mutex_unlock(in->sync_full_mutex); - - pm.type = PipeMaterial::PMT_BYTES; - pm.buffer = qb->data; - pm.buffSize = qb->size; - pm.former = this; - pm.deleter = pm_deleter_qb; - pm.args = qb; + pthread_mutex_lock(in->queue_mutex); + qb = in->queBlocks.front(); + //in->queBlocks.pop(); // pop in deleter + pthread_mutex_unlock(in->queue_mutex); + + pm = qb->lastPM; + + pm.former = this; + pm.deleter = pm_deleter_qb; + pm.args = qb; return true; } -- Gitblit v1.8.0