From 663104b9be90ed303b87c8acddac8421583a9e39 Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期三, 16 八月 2017 12:38:59 +0800
Subject: [PATCH] aaaaa

---
 RtspFace/PL_RTSPServer2.cpp |   76 ++++++++++++++++++++++++++++++--------
 1 files changed, 60 insertions(+), 16 deletions(-)

diff --git a/RtspFace/PL_RTSPServer2.cpp b/RtspFace/PL_RTSPServer2.cpp
index bd98fa4..adc8d6c 100644
--- a/RtspFace/PL_RTSPServer2.cpp
+++ b/RtspFace/PL_RTSPServer2.cpp
@@ -24,6 +24,7 @@
 	PreAllocBufferQueue* frameQueue;
 	pthread_mutex_t* queue_mutex;
 	pthread_mutex_t* queue_empty_mutex;
+	pthread_mutex_t* queue_full_mutex;
 
 	bool auxLineSet;
 
@@ -31,10 +32,9 @@
 		config(),
 		live_daemon_thid(0), live_daemon_running(false),
 		server(nullptr),
-		frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
+		frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config
 		auxLineSet(false)
 	{
-		pthread_mutex_init(queue_mutex, NULL);
 	}
 	
 	~RTSPServer2_Internal()
@@ -59,7 +59,6 @@
 			delete queue_mutex;
 			queue_mutex = nullptr;
 		}
-		
 		queue_mutex = new pthread_mutex_t;
 		pthread_mutex_init(queue_mutex, NULL);
 
@@ -69,9 +68,17 @@
 			delete queue_empty_mutex;
 			queue_empty_mutex = nullptr;
 		}
-
 		queue_empty_mutex = new pthread_mutex_t;
 		pthread_mutex_init(queue_empty_mutex, NULL);
+
+		if (queue_full_mutex != nullptr)
+		{
+			pthread_mutex_destroy(queue_full_mutex);
+			delete queue_full_mutex;
+			queue_full_mutex = nullptr;
+		}
+		queue_full_mutex = new pthread_mutex_t;
+		pthread_mutex_init(queue_full_mutex, NULL);
 
 		live_daemon_thid = 0;
 		live_daemon_running = false;
@@ -120,19 +127,28 @@
 	{
 		DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
 
+		if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full())
+		{
+			int ret = pthread_mutex_unlock(_this->in->queue_full_mutex);
+			if (ret != 0)
+			{
+				LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL;
+			}
+		}
+
 		if (_this->in->frameQueue->Empty())
 		{
 			int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
 			if (ret != 0)
 			{
-				LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
+				LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
 			}
 		}
 
 		int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
 		if (ret != 0)
 		{
-			LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
+			LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
 		}
 
 		ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
@@ -150,11 +166,13 @@
 
 		buffer = _this->lastBuffer->buffer;
 		buffSize = _this->lastBuffer->buffSize;
+		LOG_WARN << "sizeS=" << buffSize << LOG_ENDL;
 
 		//LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
 		//static size_t f = 0;
 		//static FILE *pFile = fopen("/data/bb.264", "wb");
 		//fwrite(buffer, sizeof(char), buffSize, pFile);
+		//fflush(pFile);
 		//if (++f > 30){
 		//	fclose(pFile);
 		//	exit(0);
@@ -212,14 +230,14 @@
 	qcfg.multithreadSafe = false;
 	qcfg.fullQueueDropFront = true;
 	qcfg.fullQueueSync = false;
-	qcfg.count = 32;
-	qcfg.maxBuffSize = 200000;
+	qcfg.count = 20;
+	qcfg.maxBuffSize = 524288; // 512KB
 	in->frameQueue = new PreAllocBufferQueue(qcfg);
 
 	int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
 	if(ret != 0)
 	{
-		LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
+		LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL;
 		return false;
 	}
 
@@ -242,7 +260,7 @@
 	
 	if (pm.type != PipeMaterial::PMT_FRAME)
 	{
-		LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << std::endl;
+		LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL;
 		return false;
 	}
 
@@ -263,28 +281,48 @@
 		}
 	}
 
+	while (in->config.payBlockFullQueue && in->frameQueue->Full())
+	{
+		int ret = pthread_mutex_lock(in->queue_full_mutex);
+		if (ret != 0)
+		{
+			LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL;
+		}
+
+		//if (in->frameQueue->Full())
+		//{
+		//	LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL;
+		//	return false;
+		//}
+	}
+
 	MB_Frame* frame = (MB_Frame*)pm.buffer;
 	if (frame->buffer == nullptr || frame->buffSize == 0)
 		return false;
 
+	LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL;
+
 	ScopeLocker<pthread_mutex_t>(in->queue_mutex);
 	//if (in->frameQueue->Full())
-	//	LOG_WARN << "PL_RTSPServer2::pay may lost data" << std::endl;
+	//	LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL;
 
 	PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
 	if (qbuff == nullptr)
 	{
-		LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << std::endl;
+		LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL;
 		int ret = pthread_mutex_unlock(in->queue_empty_mutex);
 		if (ret != 0)
 		{
-			LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+			LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
 		}
 		return false;
 	}
 
-	memcpy(qbuff->buffer, frame->buffer, frame->buffSize);//#todo size min
-	qbuff->buffSize = frame->buffSize;
+	const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig());
+	size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize);
+
+	memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min
+	qbuff->buffSize = copySize;
 
 	//static size_t f = 0;
 	//static FILE *pFile = fopen("/data/aa.264", "wb");
@@ -297,8 +335,14 @@
 	int ret = pthread_mutex_unlock(in->queue_empty_mutex);
 	if (ret != 0)
 	{
-		LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+		LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
 	}
+
+	if (copySize < frame->buffSize)
+	{
+		LOG_WARN << "copy frame truncated" << LOG_ENDL;
+	}
+
 	return true;
 }
 

--
Gitblit v1.8.0