From b022b91c0c6fa807424b6c12cc92ac5946838083 Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期四, 13 七月 2017 16:34:39 +0800
Subject: [PATCH] update pipeline

---
 RtspFace/PL_Queue.cpp |  316 +++++++++++++++++++++++++++++++++-------------------
 1 files changed, 202 insertions(+), 114 deletions(-)

diff --git a/RtspFace/PL_Queue.cpp b/RtspFace/PL_Queue.cpp
index 7965877..aa43adb 100644
--- a/RtspFace/PL_Queue.cpp
+++ b/RtspFace/PL_Queue.cpp
@@ -1,17 +1,22 @@
 #include "PL_Queue.h"
+#include "logger.h"
+#include "MaterialBuffer.h"
 #include <set>
 #include <queue>
 #include <string.h>
 
 struct QBlock
 {
+	PipeMaterial lastPM;
+	std::vector<MB_Frame> lastMbfs;
+
 	uint8_t* data;
 	size_t maxSize;
 	size_t size;
 	
-	QBlock() : data(nullptr), maxSize(0), size(0) { }
+	QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { }
 	
-	QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
+	QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { }
 	
 	~QBlock()
 	{
@@ -19,80 +24,113 @@
 		maxSize = 0;
 		size = 0;
 	}
+
+	void reset()
+	{
+		size = 0;
+		lastPM.reset();
+		lastMbfs.clear();
+	}
 };
 
 struct QBlockPool
 {
-	uint8_t* blocksDataPool;
+	uint8_t* blocksDataCache;
 	
-	typedef std::set<QBlock*> qpool_t;
-	qpool_t staticPool;
-	qpool_t freePool;
-	
-	QBlockPool() : blocksDataPool(nullptr)
+	typedef std::set<QBlock*> qblock_set_t;
+	qblock_set_t staticPool;
+	qblock_set_t freePool;
+
+    pthread_mutex_t pool_mutex;
+
+    QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex()
 	{
 	}
 	
 	QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : 
-		blocksDataPool(nullptr)
+		blocksDataCache(nullptr)
 	{
 		bp_init(maxBlockCount, maxBlockSize);
 	}
 	
 	void bp_init(size_t maxBlockCount, size_t maxBlockSize)
 	{
-		const size_t totalBytes = maxBlockCount * maxBlockSize;
-		blocksDataPool = new uint8_t[totalBytes];
-		printf("QBlockPool allocate byte: %u\n", totalBytes);
+        pthread_mutex_init(&pool_mutex, NULL);
+        pthread_mutex_lock(&pool_mutex);
+
+        const size_t totalBytes = maxBlockCount * maxBlockSize;
+		blocksDataCache = new uint8_t[totalBytes];
+		LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes);
 		
 		for (size_t i = 0; i < maxBlockCount; i++)
 		{
-			uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
+			uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize];
 			QBlock* qb = new QBlock(qbData, maxBlockSize);
 			staticPool.insert(qb);
 			freePool.insert(qb);
 		}
-	}
-	
+
+        pthread_mutex_unlock(&pool_mutex);
+    }
+
+	void bp_reset()
+	{
+        pthread_mutex_lock(&pool_mutex);
+
+        if (freePool.size() != staticPool.size())
+		{
+			LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL;
+		}
+
+		freePool.clear();
+
+		for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
+			delete *iter;
+		staticPool.clear();
+
+		delete[] blocksDataCache;
+		blocksDataCache = nullptr;
+
+        pthread_mutex_unlock(&pool_mutex);
+    }
+
 	~QBlockPool()
 	{
-		if (freePool.size() != staticPool.size())
-		{
-			printf("QBlockPool memory leakage\n");
-		}
-		
-		for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
-		{
-			delete *iter;
-		}
-		
-		delete[] blocksDataPool;
-		blocksDataPool = nullptr;
-	}
+		bp_reset();
+        pthread_mutex_destroy(&pool_mutex);
+    }
 	
 	QBlock* bp_alloc()
 	{
-		if (freePool.empty())
+        pthread_mutex_lock(&pool_mutex);
+
+        if (freePool.empty())
 		{
-			printf("QBlockPool bp_alloc empty\n");
+            pthread_mutex_unlock(&pool_mutex);
+            LOG_WARN << "no free blocks" << LOG_ENDL;
 			return nullptr;
 		}
-		
-		qpool_t::iterator iter = freePool.begin();
+
+        qblock_set_t::iterator iter = freePool.begin();
 		QBlock* qb = *iter;
 		freePool.erase(iter);
-		return qb;
-	}
+
+        pthread_mutex_unlock(&pool_mutex);
+        return qb;
+    }
 	
 	void bp_free(QBlock* qb)
 	{
 		if (qb == nullptr)
 			return;
-		
-		qpool_t::iterator iter = staticPool.find(qb);
+
+        pthread_mutex_lock(&pool_mutex);
+
+        qblock_set_t::iterator iter = staticPool.find(qb);
 		if (iter == staticPool.end())
 		{
-			printf("QBlockPool bp_free not in pool\n");
+            pthread_mutex_unlock(&pool_mutex);
+            LOG_WARN << "block not belongs to pool" << LOG_ENDL;
 			return;
 		}
 		
@@ -103,9 +141,10 @@
 		//	return;
 		//}
 		
-		qb->size = 0;
+		qb->reset();
 		freePool.insert(qb);
-	}
+        pthread_mutex_unlock(&pool_mutex);
+    }
 	
 	bool bp_empty() const
 	{
@@ -116,32 +155,32 @@
 struct PL_Queue_Internal
 {
 	PL_Queue_Config config;
-	QBlockPool pool;
+	QBlockPool poolBlocks;
 	
-	typedef std::queue<QBlock*> queue_t;
-	queue_t que;
+	typedef std::queue<QBlock*> qblock_queue_t;
+	qblock_queue_t queBlocks;
 	
-	pthread_mutex_t* queue_pool_mutex;
+	pthread_mutex_t* queue_mutex;
 	pthread_mutex_t* sync_full_mutex;
 	pthread_mutex_t* sync_empty_mutex;
 	
 	PL_Queue_Internal() : 
-		config(), pool(), que(), 
-		queue_pool_mutex(new pthread_mutex_t), 
+		config(), poolBlocks(), queBlocks(),
+		queue_mutex(new pthread_mutex_t),
 		sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
 	{
-		pthread_mutex_init(queue_pool_mutex, NULL);
+		pthread_mutex_init(queue_mutex, NULL);
 		pthread_mutex_init(sync_full_mutex, NULL);
 		pthread_mutex_init(sync_empty_mutex, NULL);
 	}
 	
 	~PL_Queue_Internal()
 	{
-		if (queue_pool_mutex != nullptr)
+		if (queue_mutex != nullptr)
 		{
-			pthread_mutex_destroy(queue_pool_mutex);
-			delete queue_pool_mutex;
-			queue_pool_mutex = nullptr;
+			pthread_mutex_destroy(queue_mutex);
+			delete queue_mutex;
+			queue_mutex = nullptr;
 		}
 		
 		if (sync_full_mutex != nullptr)
@@ -161,15 +200,15 @@
 
 	void reset()
 	{
-		if (queue_pool_mutex != nullptr)
+		if (queue_mutex != nullptr)
 		{
-			pthread_mutex_destroy(queue_pool_mutex);
-			delete queue_pool_mutex;
-			queue_pool_mutex = nullptr;
+			pthread_mutex_destroy(queue_mutex);
+			delete queue_mutex;
+			queue_mutex = nullptr;
 		}
 		
-		queue_pool_mutex = new pthread_mutex_t;
-		pthread_mutex_init(queue_pool_mutex, NULL);
+		queue_mutex = new pthread_mutex_t;
+		pthread_mutex_init(queue_mutex, NULL);
 		
 		if (sync_full_mutex != nullptr)
 		{
@@ -190,8 +229,8 @@
 		
 		sync_empty_mutex = new pthread_mutex_t;
 		pthread_mutex_init(sync_empty_mutex, NULL);
-		
-		//#todo free que
+
+		poolBlocks.bp_reset();
 	}
 };
 
@@ -213,28 +252,31 @@
 bool PL_Queue::init(void* args)
 {
 	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
-	PL_Queue_Config* config = (PL_Queue_Config*)args;
-	
+
 	in->reset();
-	in->config = *config;
-	in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
+    if (args != nullptr)
+    {
+        PL_Queue_Config* config = (PL_Queue_Config*)args;
+        in->config = *config;
+    }
+	in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize);
 	
-	if (config->syncQueueFull)
+	if (in->config.syncQueueFull)
 	{
 		int ret = pthread_mutex_lock(in->sync_full_mutex);
 		if(ret != 0)
 		{
-			printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
+			LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret);
 			return false;
 		}
 	}
 	
-	if (config->syncQueueEmpty)
+	if (in->config.syncQueueEmpty)
 	{
 		int ret = pthread_mutex_lock(in->sync_empty_mutex);
 		if(ret != 0)
 		{
-			printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
+			LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret);
 			return false;
 		}
 	}
@@ -252,63 +294,108 @@
 	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
 	
 	QBlock* qb = nullptr;
-	if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+	if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
 	{
 		// there is no available qb
 		
-		if (in->config.queueFullDropBlock)
+		if (in->config.queueFullDropFrontBlock)
 		{
-			printf("PL_Queue::pay queueFullDropBlock\n");
+			LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL;
 		
-			pthread_mutex_lock(in->queue_pool_mutex);
-			qb = in->que.front();
-			in->que.pop();
-			
-			in->pool.bp_free(qb);
+			pthread_mutex_lock(in->queue_mutex);
+			qb = in->queBlocks.front();
+			in->queBlocks.pop();
+            pthread_mutex_unlock(in->queue_mutex);
+
+            in->poolBlocks.bp_free(qb);
 			qb = nullptr;
-			pthread_mutex_unlock(in->queue_pool_mutex);
 		}
 		else
 		{
 			if (in->config.syncQueueFull)
 			{
-				printf("PL_Queue::pay sync by sync_full_mutex\n");
+                LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL;
 				pthread_mutex_lock(in->sync_full_mutex);
 			}
 		}
 	}
 	
-	if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+	if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
 	{
-		printf("PL_Queue::pay full\n");
+        LOG_WARN << "PL_Queue::pay full" << LOG_ENDL;
 		return false;
 	}
 	
-	pthread_mutex_lock(in->queue_pool_mutex);
-	qb = in->pool.bp_alloc();
-	if (qb != nullptr)
-		in->que.push(qb);
-	pthread_mutex_unlock(in->queue_pool_mutex);
-	
+	qb = in->poolBlocks.bp_alloc();
 	if (qb == nullptr)
 	{
-		printf("PL_Queue::pay bp_alloc nullptr\n");
+        LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL;
 		return false;
 	}
-	
-	if (in->config.copyData)
-	{
-		memcpy(qb->data, pm.buffer, pm.buffSize);
-		qb->size = pm.buffSize;
-	}
-	
+
+    bool cacheOk = false;
+
+    if (pm.type == PipeMaterial::PMT_BYTES)
+    {
+        qb->lastPM = pm;
+
+        if (in->config.copyData)
+        {
+            memcpy(qb->data, pm.buffer, pm.buffSize);
+            qb->size = pm.buffSize;
+            qb->lastPM.buffer = qb->data;
+        }
+
+        cacheOk = true;
+    }
+    else if (pm.type == PipeMaterial::PMT_FRAME)
+    {
+        qb->lastPM = pm;
+
+        MB_Frame* mbf = (MB_Frame*)pm.buffer;
+        qb->lastMbfs.push_back(*mbf);
+
+        qb->lastPM.buffer = &(qb->lastMbfs[0]);
+
+        if (in->config.copyData)
+        {
+            memcpy(qb->data, mbf->buffer, mbf->buffSize);
+            qb->size = mbf->buffSize;
+            qb->lastMbfs[0].buffer = qb->data;
+        }
+
+        cacheOk = true;
+    }
+    else if (pm.type == PipeMaterial::PMT_FRAME_LIST)
+    {
+        if (in->config.cacheFrameListFunc != nullptr)
+        {
+            size_t s = qb->maxSize;
+            cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s);
+            if (cacheOk)
+                qb->size = s;
+        }
+    }
+
+    if (! cacheOk)
+    {
+        in->poolBlocks.bp_free(qb);
+
+        LOG_WARN << "pm/mbf type not support" << LOG_ENDL;
+        return false;
+    }
+
+    pthread_mutex_lock(in->queue_mutex);
+    in->queBlocks.push(qb);
+    pthread_mutex_unlock(in->queue_mutex);
+
 	if (in->config.syncQueueEmpty)
 		pthread_mutex_unlock(in->sync_empty_mutex);
 
 	return true;
 }
 
-void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
+void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet)
 {
 	if (pm->former == nullptr || pm->args == nullptr)
 		return;
@@ -316,25 +403,30 @@
 	PL_Queue* _this = (PL_Queue*)pm->former;
 	QBlock* qb = (QBlock*)pm->args;
 	PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
-	
-	pthread_mutex_lock(in->queue_pool_mutex);
-	in->pool.bp_free(qb);
-	pthread_mutex_unlock(in->queue_pool_mutex);
+
+	pthread_mutex_lock(in->queue_mutex);//#todo lastRet
+    in->queBlocks.pop();
+    pthread_mutex_unlock(in->queue_mutex);
+
+    in->poolBlocks.bp_free(qb);
+
+    if (in->config.syncQueueFull)
+        pthread_mutex_unlock(in->sync_full_mutex);
 }
 
 bool PL_Queue::gain(PipeMaterial& pm)
 {
 	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
 	
-	if (in->que.empty())
+	if (in->queBlocks.empty())
 	{
 		if (in->config.syncQueueEmpty)
 			pthread_mutex_lock(in->sync_empty_mutex);
 	}
 	
-	if (in->que.empty())
+	if (in->queBlocks.empty())
 	{
-		printf("PL_Queue::gain empty\n");
+		LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL;
 		pm.buffer = nullptr;
 		pm.buffSize = 0;
 		pm.former = this;
@@ -342,19 +434,15 @@
 	}
 
 	QBlock* qb = nullptr;
-	pthread_mutex_lock(in->queue_pool_mutex);
-	qb = in->que.front();
-	in->que.pop();
-	pthread_mutex_unlock(in->queue_pool_mutex);
-	
-	if (in->config.syncQueueFull)
-		pthread_mutex_unlock(in->sync_full_mutex);
-	
-	pm.type = PipeMaterial::PMT_BYTES;
-	pm.buffer = qb->data;
-	pm.buffSize = qb->size;
-	pm.former = this;
-	pm.deleter = pm_deleter_qb;
-	pm.args = qb;
+	pthread_mutex_lock(in->queue_mutex);
+	qb = in->queBlocks.front();
+	//in->queBlocks.pop(); // pop in deleter
+	pthread_mutex_unlock(in->queue_mutex);
+
+    pm = qb->lastPM;
+
+    pm.former = this;
+    pm.deleter = pm_deleter_qb;
+    pm.args = qb;
 	return true;
 }

--
Gitblit v1.8.0