From 663104b9be90ed303b87c8acddac8421583a9e39 Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期三, 16 八月 2017 12:38:59 +0800
Subject: [PATCH] aaaaa
---
RtspFace/PL_Queue.cpp | 316 +++++++++++++++++++++++++++++++++-------------------
1 files changed, 202 insertions(+), 114 deletions(-)
diff --git a/RtspFace/PL_Queue.cpp b/RtspFace/PL_Queue.cpp
index 4641eec..aa43adb 100644
--- a/RtspFace/PL_Queue.cpp
+++ b/RtspFace/PL_Queue.cpp
@@ -1,17 +1,22 @@
#include "PL_Queue.h"
+#include "logger.h"
+#include "MaterialBuffer.h"
#include <set>
#include <queue>
#include <string.h>
struct QBlock
{
+ PipeMaterial lastPM;
+ std::vector<MB_Frame> lastMbfs;
+
uint8_t* data;
size_t maxSize;
size_t size;
- QBlock() : data(nullptr), maxSize(0), size(0) { }
+ QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { }
- QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
+ QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { }
~QBlock()
{
@@ -19,80 +24,113 @@
maxSize = 0;
size = 0;
}
+
+ void reset()
+ {
+ size = 0;
+ lastPM.reset();
+ lastMbfs.clear();
+ }
};
struct QBlockPool
{
- uint8_t* blocksDataPool;
+ uint8_t* blocksDataCache;
- typedef std::set<QBlock*> qpool_t;
- qpool_t staticPool;
- qpool_t freePool;
-
- QBlockPool() : blocksDataPool(nullptr)
+ typedef std::set<QBlock*> qblock_set_t;
+ qblock_set_t staticPool;
+ qblock_set_t freePool;
+
+ pthread_mutex_t pool_mutex;
+
+ QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex()
{
}
QBlockPool(size_t maxBlockCount, size_t maxBlockSize) :
- blocksDataPool(nullptr)
+ blocksDataCache(nullptr)
{
bp_init(maxBlockCount, maxBlockSize);
}
void bp_init(size_t maxBlockCount, size_t maxBlockSize)
{
- const size_t totalBytes = maxBlockCount * maxBlockSize;
- blocksDataPool = new uint8_t[totalBytes];
- printf("QBlockPool allocate byte: %u\n", totalBytes);
+ pthread_mutex_init(&pool_mutex, NULL);
+ pthread_mutex_lock(&pool_mutex);
+
+ const size_t totalBytes = maxBlockCount * maxBlockSize;
+ blocksDataCache = new uint8_t[totalBytes];
+ LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes);
for (size_t i = 0; i < maxBlockCount; i++)
{
- uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
+ uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize];
QBlock* qb = new QBlock(qbData, maxBlockSize);
staticPool.insert(qb);
freePool.insert(qb);
}
- }
-
+
+ pthread_mutex_unlock(&pool_mutex);
+ }
+
+ void bp_reset()
+ {
+ pthread_mutex_lock(&pool_mutex);
+
+ if (freePool.size() != staticPool.size())
+ {
+ LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL;
+ }
+
+ freePool.clear();
+
+ for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
+ delete *iter;
+ staticPool.clear();
+
+ delete[] blocksDataCache;
+ blocksDataCache = nullptr;
+
+ pthread_mutex_unlock(&pool_mutex);
+ }
+
~QBlockPool()
{
- if (freePool.size() != staticPool.size())
- {
- printf("QBlockPool memory leakage\n");
- }
-
- for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
- {
- delete *iter;
- }
-
- delete[] blocksDataPool;
- blocksDataPool = nullptr;
- }
+ bp_reset();
+ pthread_mutex_destroy(&pool_mutex);
+ }
QBlock* bp_alloc()
{
- if (freePool.empty())
+ pthread_mutex_lock(&pool_mutex);
+
+ if (freePool.empty())
{
- printf("QBlockPool bp_alloc empty\n");
+ pthread_mutex_unlock(&pool_mutex);
+ LOG_WARN << "no free blocks" << LOG_ENDL;
return nullptr;
}
-
- qpool_t::iterator iter = freePool.begin();
+
+ qblock_set_t::iterator iter = freePool.begin();
QBlock* qb = *iter;
freePool.erase(iter);
- return qb;
- }
+
+ pthread_mutex_unlock(&pool_mutex);
+ return qb;
+ }
void bp_free(QBlock* qb)
{
if (qb == nullptr)
return;
-
- qpool_t::iterator iter = staticPool.find(qb);
+
+ pthread_mutex_lock(&pool_mutex);
+
+ qblock_set_t::iterator iter = staticPool.find(qb);
if (iter == staticPool.end())
{
- printf("QBlockPool bp_free not in pool\n");
+ pthread_mutex_unlock(&pool_mutex);
+ LOG_WARN << "block not belongs to pool" << LOG_ENDL;
return;
}
@@ -103,9 +141,10 @@
// return;
//}
- qb->size = 0;
+ qb->reset();
freePool.insert(qb);
- }
+ pthread_mutex_unlock(&pool_mutex);
+ }
bool bp_empty() const
{
@@ -116,32 +155,32 @@
struct PL_Queue_Internal
{
PL_Queue_Config config;
- QBlockPool pool;
+ QBlockPool poolBlocks;
- typedef std::queue<QBlock*> queue_t;
- queue_t que;
+ typedef std::queue<QBlock*> qblock_queue_t;
+ qblock_queue_t queBlocks;
- pthread_mutex_t* queue_pool_mutex;
+ pthread_mutex_t* queue_mutex;
pthread_mutex_t* sync_full_mutex;
pthread_mutex_t* sync_empty_mutex;
PL_Queue_Internal() :
- config(), pool(), que(),
- queue_pool_mutex(new pthread_mutex_t),
+ config(), poolBlocks(), queBlocks(),
+ queue_mutex(new pthread_mutex_t),
sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
{
- pthread_mutex_init(queue_pool_mutex, NULL);
+ pthread_mutex_init(queue_mutex, NULL);
pthread_mutex_init(sync_full_mutex, NULL);
pthread_mutex_init(sync_empty_mutex, NULL);
}
~PL_Queue_Internal()
{
- if (queue_pool_mutex != nullptr)
+ if (queue_mutex != nullptr)
{
- pthread_mutex_destroy(queue_pool_mutex);
- delete queue_pool_mutex;
- queue_pool_mutex = nullptr;
+ pthread_mutex_destroy(queue_mutex);
+ delete queue_mutex;
+ queue_mutex = nullptr;
}
if (sync_full_mutex != nullptr)
@@ -161,15 +200,15 @@
void reset()
{
- if (queue_pool_mutex != nullptr)
+ if (queue_mutex != nullptr)
{
- pthread_mutex_destroy(queue_pool_mutex);
- delete queue_pool_mutex;
- queue_pool_mutex = nullptr;
+ pthread_mutex_destroy(queue_mutex);
+ delete queue_mutex;
+ queue_mutex = nullptr;
}
- queue_pool_mutex = new pthread_mutex_t;
- pthread_mutex_init(queue_pool_mutex, NULL);
+ queue_mutex = new pthread_mutex_t;
+ pthread_mutex_init(queue_mutex, NULL);
if (sync_full_mutex != nullptr)
{
@@ -190,8 +229,8 @@
sync_empty_mutex = new pthread_mutex_t;
pthread_mutex_init(sync_empty_mutex, NULL);
-
- //#todo free que
+
+ poolBlocks.bp_reset();
}
};
@@ -213,28 +252,31 @@
bool PL_Queue::init(void* args)
{
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
- PL_Queue_Config* config = (PL_Queue_Config*)args;
-
+
in->reset();
- in->config = *config;
- in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
+ if (args != nullptr)
+ {
+ PL_Queue_Config* config = (PL_Queue_Config*)args;
+ in->config = *config;
+ }
+ in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize);
- if (config->syncQueueFull)
+ if (in->config.syncQueueFull)
{
int ret = pthread_mutex_lock(in->sync_full_mutex);
if(ret != 0)
{
- printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
+ LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret);
return false;
}
}
- if (config->syncQueueEmpty)
+ if (in->config.syncQueueEmpty)
{
int ret = pthread_mutex_lock(in->sync_empty_mutex);
if(ret != 0)
{
- printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
+ LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret);
return false;
}
}
@@ -252,63 +294,108 @@
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
QBlock* qb = nullptr;
- if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+ if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
{
// there is no available qb
- if (in->config.queueFullDropBlock)
+ if (in->config.queueFullDropFrontBlock)
{
- printf("PL_Queue::pay queueFullDropBlock\n");
+ LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL;
- pthread_mutex_lock(in->queue_pool_mutex);
- qb = in->que.front();
- in->que.pop();
-
- in->pool.bp_free(qb);
+ pthread_mutex_lock(in->queue_mutex);
+ qb = in->queBlocks.front();
+ in->queBlocks.pop();
+ pthread_mutex_unlock(in->queue_mutex);
+
+ in->poolBlocks.bp_free(qb);
qb = nullptr;
- pthread_mutex_unlock(in->queue_pool_mutex);
}
else
{
if (in->config.syncQueueFull)
{
- printf("PL_Queue::pay sync by sync_full_mutex\n");
+ LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL;
pthread_mutex_lock(in->sync_full_mutex);
}
}
}
- if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+ if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
{
- printf("PL_Queue::pay full\n");
+ LOG_WARN << "PL_Queue::pay full" << LOG_ENDL;
return false;
}
- pthread_mutex_lock(in->queue_pool_mutex);
- qb = in->pool.bp_alloc();
- if (qb != nullptr)
- in->que.push(qb);
- pthread_mutex_unlock(in->queue_pool_mutex);
-
+ qb = in->poolBlocks.bp_alloc();
if (qb == nullptr)
{
- printf("PL_Queue::pay bp_alloc nullptr\n");
+ LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL;
return false;
}
-
- if (in->config.copyData)
- {
- memcpy(qb->data, pm.buffer, pm.buffSize);
- qb->size = pm.buffSize;
- }
-
+
+ bool cacheOk = false;
+
+ if (pm.type == PipeMaterial::PMT_BYTES)
+ {
+ qb->lastPM = pm;
+
+ if (in->config.copyData)
+ {
+ memcpy(qb->data, pm.buffer, pm.buffSize);
+ qb->size = pm.buffSize;
+ qb->lastPM.buffer = qb->data;
+ }
+
+ cacheOk = true;
+ }
+ else if (pm.type == PipeMaterial::PMT_FRAME)
+ {
+ qb->lastPM = pm;
+
+ MB_Frame* mbf = (MB_Frame*)pm.buffer;
+ qb->lastMbfs.push_back(*mbf);
+
+ qb->lastPM.buffer = &(qb->lastMbfs[0]);
+
+ if (in->config.copyData)
+ {
+ memcpy(qb->data, mbf->buffer, mbf->buffSize);
+ qb->size = mbf->buffSize;
+ qb->lastMbfs[0].buffer = qb->data;
+ }
+
+ cacheOk = true;
+ }
+ else if (pm.type == PipeMaterial::PMT_FRAME_LIST)
+ {
+ if (in->config.cacheFrameListFunc != nullptr)
+ {
+ size_t s = qb->maxSize;
+ cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s);
+ if (cacheOk)
+ qb->size = s;
+ }
+ }
+
+ if (! cacheOk)
+ {
+ in->poolBlocks.bp_free(qb);
+
+ LOG_WARN << "pm/mbf type not support" << LOG_ENDL;
+ return false;
+ }
+
+ pthread_mutex_lock(in->queue_mutex);
+ in->queBlocks.push(qb);
+ pthread_mutex_unlock(in->queue_mutex);
+
if (in->config.syncQueueEmpty)
pthread_mutex_unlock(in->sync_empty_mutex);
return true;
}
-void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
+void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet)
{
if (pm->former == nullptr || pm->args == nullptr)
return;
@@ -316,25 +403,30 @@
PL_Queue* _this = (PL_Queue*)pm->former;
QBlock* qb = (QBlock*)pm->args;
PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
-
- pthread_mutex_lock(in->queue_pool_mutex);
- in->pool.bp_free(qb);
- pthread_mutex_unlock(in->queue_pool_mutex);
+
+ pthread_mutex_lock(in->queue_mutex);//#todo lastRet
+ in->queBlocks.pop();
+ pthread_mutex_unlock(in->queue_mutex);
+
+ in->poolBlocks.bp_free(qb);
+
+ if (in->config.syncQueueFull)
+ pthread_mutex_unlock(in->sync_full_mutex);
}
bool PL_Queue::gain(PipeMaterial& pm)
{
PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
- if (in->que.empty())
+ if (in->queBlocks.empty())
{
if (in->config.syncQueueEmpty)
pthread_mutex_lock(in->sync_empty_mutex);
}
- if (in->que.empty())
+ if (in->queBlocks.empty())
{
- printf("PL_Queue::gain empty\n");
+ LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL;
pm.buffer = nullptr;
pm.buffSize = 0;
pm.former = this;
@@ -342,19 +434,15 @@
}
QBlock* qb = nullptr;
- pthread_mutex_lock(in->queue_pool_mutex);
- qb = in->que.front();
- in->que.pop();
- pthread_mutex_unlock(in->queue_pool_mutex);
-
- if (in->config.syncQueueFull)
- pthread_mutex_unlock(in->sync_full_mutex);
-
- pm.type = PMT_BYTES;
- pm.buffer = qb->data;
- pm.buffSize = qb->size;
- pm.former = this;
- pm.deleter = pm_deleter_qb;
- pm.args = qb;
+ pthread_mutex_lock(in->queue_mutex);
+ qb = in->queBlocks.front();
+ //in->queBlocks.pop(); // pop in deleter
+ pthread_mutex_unlock(in->queue_mutex);
+
+ pm = qb->lastPM;
+
+ pm.former = this;
+ pm.deleter = pm_deleter_qb;
+ pm.args = qb;
return true;
}
--
Gitblit v1.8.0