From 4ef430e946e717d72e923c4708a9120f94d55dbd Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期三, 28 十二月 2016 09:35:14 +0800
Subject: [PATCH] test h264 encoder

---
 RtspFace/make.sh                                |    5 
 RtspFace/PL_RTSPServer.cpp                      |   78 +++-
 RtspFace/PipeLine.h                             |   26 +
 RtspFace/PL_RTSPServer.h                        |   12 
 RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp |  114 ++++++-
 RtspFace/PL_Queue.h                             |   44 +++
 RtspFace/main.cpp                               |   71 +++-
 RtspFace/PL_H264Encoder.cpp                     |   62 +++-
 RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h   |    9 
 RtspFace/PL_Queue.cpp                           |  360 +++++++++++++++++++++++++
 RtspFace/PL_AVFrameYUV420.cpp                   |    1 
 RtspFace/PipeLine.cpp                           |   25 +
 12 files changed, 715 insertions(+), 92 deletions(-)

diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
index 0b38140..9ea1bba 100644
--- a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
+++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
@@ -16,6 +16,13 @@
 		pthread_mutex_init(&outqueue_mutex,NULL);
 
 	}
+	
+	FFmpegH264Encoder::~FFmpegH264Encoder()
+	{
+		pthread_mutex_init(&inqueue_mutex,NULL);
+		pthread_mutex_init(&outqueue_mutex,NULL);
+
+	}
 
 	void FFmpegH264Encoder::setCallbackFunctionFrameIsReady(std::function<void()> func)
 	{
@@ -45,13 +52,13 @@
 				pthread_mutex_unlock(&inqueue_mutex);
 				if(frame != NULL)
 				{
-					WriteFrame(frame);
+					WriteFrameRGB(frame);
 				}
 			}
         }
 	}
 
-	void FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id)
+	bool FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id)
 	{
 		int ret;
 		m_sws_flags = SWS_BICUBIC;
@@ -60,14 +67,17 @@
 		avcodec_register_all();
 		av_register_all();
 		
-		avformat_alloc_output_context2(&m_oc, NULL, NULL, filename);
+		if (strlen(filename) == 0)
+			avformat_alloc_output_context2(&m_oc, NULL, "h264", filename);
+		else 
+			avformat_alloc_output_context2(&m_oc, NULL, NULL, filename);
 		
 		if (!m_oc) {
 			avformat_alloc_output_context2(&m_oc, NULL, "avi", filename);
 		}
 
 		if (!m_oc) {
-			return;
+			return false;
 		}
 
 		m_fmt = m_oc->oformat;
@@ -79,13 +89,13 @@
 
 		m_video_codec = avcodec_find_encoder(m_fmt->video_codec);
 		if (!(m_video_codec)) {
-				return;
+				return false;
 		}
 
 		st = avformat_new_stream(m_oc, m_video_codec);
 
 		if (!st) {
-				return;
+				return false;
 		}
 
 		st->id = m_oc->nb_streams-1;
@@ -112,7 +122,7 @@
 
 		ret = avcodec_open2(c, m_video_codec, NULL);
 		if (ret < 0) {
-			return;
+			return false;
 		}
 
 		//ret = avpicture_alloc(&m_dst_picture, c->pix_fmt, c->width, c->height);
@@ -126,7 +136,7 @@
 
 		ret = av_image_alloc(m_dst_picture->data, m_dst_picture->linesize, c->width, c->height, (AVPixelFormat)m_dst_picture->format, 32);
 		if (ret < 0) {
-			return;
+			return false;
 		}
 
 		//ret = avpicture_alloc(&m_src_picture, AV_PIX_FMT_BGR24, c->width, c->height);
@@ -135,7 +145,7 @@
 		ret = av_image_alloc(m_src_picture->data, m_src_picture->linesize, c->width, c->height, AV_PIX_FMT_BGR24, 24);
 
 		if (ret < 0) {
-			return;
+			return false;
 		}
 
 		bufferSize = ret;
@@ -145,27 +155,28 @@
 		if (!(m_fmt->flags & AVFMT_NOFILE)) {
 			ret = avio_open(&m_oc->pb, filename, AVIO_FLAG_WRITE);
 			if (ret < 0) {
-				return;
+				return false;
 			}
 		}
 		
 		ret = avformat_write_header(m_oc, NULL);
 		
 		if (ret < 0) {
-			return;
+			return false;
 		}
 
 		sws_ctx = sws_getContext(c->width, c->height, AV_PIX_FMT_BGR24,
 								 c->width, c->height, AV_PIX_FMT_YUV420P,
 								 SWS_BICUBIC, NULL, NULL, NULL);
 		if (!sws_ctx) {
-			return;
+			return false;
 		}
+		
+		return true;
 	}
 
-	void FFmpegH264Encoder::WriteFrame(uint8_t * RGBFrame )
-	{	
-
+	bool FFmpegH264Encoder::WriteFrameRGB(uint8_t * RGBFrame )
+	{
 		memcpy(m_src_picture->data[0], RGBFrame, bufferSize);
 
 		sws_scale(sws_ctx,
@@ -182,7 +193,7 @@
 		ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet);
 		
 		if (ret < 0) {
-			return;
+			return false;
 		}
 
 		if (!ret && got_packet && pkt.size) 
@@ -215,10 +226,75 @@
 		m_frame_count++;
 		m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base);
 		
-		onFrame();
+		if (onFrame != nullptr)
+			onFrame();
+		
+		return true;
+	}
+	
+	void copyAVFrame(AVFrame* dest, AVFrame* src)
+	{
+		int height = dest->height;
+		int width = dest->width;
+
+		memcpy(dest->data[0], src->data[0], height * width); // Y
+		memcpy(dest->data[1], src->data[1], height * width / 4); // U
+		memcpy(dest->data[2], src->data[2], height * width / 4); // V
+	}
+	
+	bool FFmpegH264Encoder::WriteFrameYUV420(AVFrame * YUVFrame)
+	{
+		copyAVFrame(m_dst_picture, YUVFrame);
+
+        AVPacket pkt = { 0 };
+		int got_packet;
+		av_init_packet(&pkt);
+
+		int ret = 0;
+
+		ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet);
+		
+		if (ret < 0) {
+			return false;
+		}
+
+		if (!ret && got_packet && pkt.size) 
+		{
+			pkt.stream_index = m_video_st->index;
+			FrameStructure * frame = new FrameStructure();
+			frame->dataPointer = new uint8_t[pkt.size];
+			frame->dataSize = pkt.size-4;
+			frame->frameID = m_frame_count;
+
+			memcpy(frame->dataPointer,pkt.data+4,pkt.size-4);
+
+			pthread_mutex_lock(&outqueue_mutex);
+
+			if(outqueue.size()<30)
+			{
+				outqueue.push(frame);
+			}
+			else
+			{
+				delete frame;
+			}
+
+			pthread_mutex_unlock(&outqueue_mutex);
+
+		}
+
+		av_free_packet(&pkt);
+
+		m_frame_count++;
+		m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base);
+		
+		if (onFrame != nullptr)
+			onFrame();
+		
+		return true;
 	}
 
-	void FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond)
+	bool FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond)
 	{
 		m_filename = filename;
 		m_AVIMOV_WIDTH=Width;	//Movie width
@@ -227,7 +303,7 @@
 		m_AVIMOV_GOB=GOB;		//I frames per no of P frames, see note below!
 		m_AVIMOV_BPS=BitPerSecond; //Bits per second, if this is too low then movie will become garbled
 		
-		SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264);	
+		return SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264);	
 	}
 
 	void FFmpegH264Encoder::CloseCodec()
diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
index 31e345d..d07e113 100644
--- a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
+++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
@@ -49,18 +49,19 @@
 	{
 	public:
 		FFmpegH264Encoder();
-		~FFmpegH264Encoder();
+		virtual ~FFmpegH264Encoder();
 		
 		virtual void setCallbackFunctionFrameIsReady(std::function<void()> func);
 		
-		void SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond);
+		bool SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond);
 		void CloseVideo();
-		void SetupCodec(const char *filename, int codec_id);
+		bool SetupCodec(const char *filename, int codec_id);
 		void CloseCodec();
 		
 
 		void SendNewFrame(uint8_t * RGBFrame);		
-		void WriteFrame(uint8_t * RGBFrame);
+		bool WriteFrameRGB(uint8_t * RGBFrame);
+		bool WriteFrameYUV420(AVFrame * YUVFrame);
 		virtual char ReleaseFrame();
 
 		void run();	
diff --git a/RtspFace/PL_AVFrameYUV420.cpp b/RtspFace/PL_AVFrameYUV420.cpp
index 1384218..fc85e4c 100644
--- a/RtspFace/PL_AVFrameYUV420.cpp
+++ b/RtspFace/PL_AVFrameYUV420.cpp
@@ -104,6 +104,7 @@
 
 	//in->buffer readly
 
+	//#test
 	//static size_t f=0;
 	//char fname[50];
 	//sprintf(fname, "%u.yuv420", ++f);
diff --git a/RtspFace/PL_H264Encoder.cpp b/RtspFace/PL_H264Encoder.cpp
index 9fc0a0b..e2fcdf2 100644
--- a/RtspFace/PL_H264Encoder.cpp
+++ b/RtspFace/PL_H264Encoder.cpp
@@ -19,11 +19,13 @@
 
 	AVCodecContext* pAVCodecContext;
 	AVFrame* pAVFrame;//#todo delete
+	AVStream* pAVStream;
+	AVFormatContext* pAVFormatContext;
 	
 	H264Encoder_Internal() : 
 		buffSize(0), buffSizeMax(sizeof(buffer)), 
 		payError(true), ffmpegInited(false), frameCount(0), 
-		pAVCodecContext(nullptr), pAVFrame(nullptr)
+		pAVCodecContext(nullptr), pAVFrame(nullptr), pAVStream(nullptr), pAVFormatContext(nullptr)
 		
 	{
 	}
@@ -41,6 +43,8 @@
 		
 		pAVCodecContext = nullptr;
 		pAVFrame = nullptr;
+		pAVStream = nullptr;
+		pAVFormatContext = nullptr;
 	}
 };
 
@@ -88,7 +92,7 @@
 
 	in->pAVCodecContext = avcodec_alloc_context3(avCodec);
 
-	in->pAVCodecContext->bit_rate = 3*1024*1024*8; // 3MB
+	in->pAVCodecContext->bit_rate = 1*1024*1024*8; // 3MB
     in->pAVCodecContext->width = 1920;
     in->pAVCodecContext->height = 1080;//#todo from config
     in->pAVCodecContext->time_base.num=1;
@@ -96,6 +100,9 @@
     in->pAVCodecContext->gop_size = 20;
     in->pAVCodecContext->max_b_frames = 0;
     in->pAVCodecContext->pix_fmt = AV_PIX_FMT_YUV420P;
+	
+	//av_opt_set(c->priv_data, "preset", "superfast", 0);  
+	//av_opt_set(c->priv_data, "tune", "zerolatency", 0);
 
 	if(avcodec_open2(in->pAVCodecContext, avCodec, NULL) >= 0)
 	{
@@ -119,17 +126,36 @@
 		return false;
 	}
 	
+	//int ret = avformat_alloc_output_context2(&(in->pAVFormatContext), NULL, "avi", "");
+	//if (ret < 0 || in->pAVFormatContext == nullptr)
+	//{
+	//	printf("avformat_alloc_output_context2 error\n");
+	//	return false;
+	//}
+	//
+	//in->pAVStream = avformat_new_stream(in->pAVFormatContext, avCodec);
+	//if (in->pAVStream == nullptr)
+	//{
+	//	printf("avformat_new_stream error\n");
+	//	return false;
+	//}
+	//in->pAVStream->id = in->pAVFormatContext->nb_streams-1;
+	
 	return true;
 }
 
 void copyAVFrame(AVFrame* dest, AVFrame* src)
 {
-	int height = dest->height;
-	int width = dest->width;
+	dest->data[0] = src->data[0];
+	dest->data[1] = src->data[1];
+	dest->data[2] = src->data[2];
 	
-	memcpy(dest->data[0], src->data[0], height * width); // Y
-	memcpy(dest->data[1], src->data[1], height * width / 4); // U
-	memcpy(dest->data[2], src->data[2], height * width / 4); // V
+	//int height = dest->height;
+	//int width = dest->width;
+	//
+	//memcpy(dest->data[0], src->data[0], height * width); // Y
+	//memcpy(dest->data[1], src->data[1], height * width / 4); // U
+	//memcpy(dest->data[2], src->data[2], height * width / 4); // V
 }
 
 bool encodeH264(H264Encoder_Internal* in, AVFrame* pAVFrame, size_t buffSize)  
@@ -138,13 +164,16 @@
 	in->frameCount++;
 
 	copyAVFrame(in->pAVFrame, pAVFrame);
-	in->pAVFrame->pts = in->frameCount;
-	
+
+	//in->pAVFrame->pts = (1.0 / 25) * 90000 * in->frameCount;
+	in->pAVFrame->pts = time(nullptr);
+
 	AVPacket pAVPacket = {0};
 	av_init_packet(&pAVPacket);
-	
+
 	// encode the image
 	int gotPacket = 0;
+	
 	int ret = avcodec_encode_video2(in->pAVCodecContext, &pAVPacket, in->pAVFrame, &gotPacket);  
 	if (ret < 0)
 	{
@@ -154,9 +183,9 @@
 	
 	if (gotPacket > 0)
 	{
-		printf("Succeed to encode (1) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size);
-		memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size);
-		in->buffSize += pAVPacket.size;
+		printf("Succeed to encode (1) frame=%d, size=%d\n", in->frameCount, pAVPacket.size);
+		memcpy(in->buffer, pAVPacket.data, pAVPacket.size);
+		in->buffSize = pAVPacket.size;
 		av_free_packet(&pAVPacket);
 	}
 	
@@ -172,12 +201,13 @@
 	//	}  
 	//	if (gotPacket > 0)
 	//	{  
-	//		printf("Succeed to encode (2) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size);
+	//		printf("Succeed to encode (2) frame=%d, size=%d\n", in->frameCount, pAVPacket.size);
 	//		memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size);
 	//		in->buffSize += pAVPacket.size; 
 	//		av_free_packet(&pAVPacket);  
 	//	}  
-	//} 
+	//}
+	
 	
 	//#test
 	if (in->buffSize > 0)
@@ -186,7 +216,7 @@
 		fwrite (in->buffer , sizeof(char), in->buffSize, pFile);
 		fflush(pFile);
 	}
-	
+
 	in->payError = (in->buffSize == 0);
 	return !(in->payError);
 }
diff --git a/RtspFace/PL_Queue.cpp b/RtspFace/PL_Queue.cpp
new file mode 100644
index 0000000..4641eec
--- /dev/null
+++ b/RtspFace/PL_Queue.cpp
@@ -0,0 +1,360 @@
+#include "PL_Queue.h"
+#include <set>
+#include <queue>
+#include <string.h>
+
+struct QBlock
+{
+	uint8_t* data;
+	size_t maxSize;
+	size_t size;
+	
+	QBlock() : data(nullptr), maxSize(0), size(0) { }
+	
+	QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
+	
+	~QBlock()
+	{
+		data = nullptr;
+		maxSize = 0;
+		size = 0;
+	}
+};
+
+struct QBlockPool
+{
+	uint8_t* blocksDataPool;
+	
+	typedef std::set<QBlock*> qpool_t;
+	qpool_t staticPool;
+	qpool_t freePool;
+	
+	QBlockPool() : blocksDataPool(nullptr)
+	{
+	}
+	
+	QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : 
+		blocksDataPool(nullptr)
+	{
+		bp_init(maxBlockCount, maxBlockSize);
+	}
+	
+	void bp_init(size_t maxBlockCount, size_t maxBlockSize)
+	{
+		const size_t totalBytes = maxBlockCount * maxBlockSize;
+		blocksDataPool = new uint8_t[totalBytes];
+		printf("QBlockPool allocate byte: %u\n", totalBytes);
+		
+		for (size_t i = 0; i < maxBlockCount; i++)
+		{
+			uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
+			QBlock* qb = new QBlock(qbData, maxBlockSize);
+			staticPool.insert(qb);
+			freePool.insert(qb);
+		}
+	}
+	
+	~QBlockPool()
+	{
+		if (freePool.size() != staticPool.size())
+		{
+			printf("QBlockPool memory leakage\n");
+		}
+		
+		for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
+		{
+			delete *iter;
+		}
+		
+		delete[] blocksDataPool;
+		blocksDataPool = nullptr;
+	}
+	
+	QBlock* bp_alloc()
+	{
+		if (freePool.empty())
+		{
+			printf("QBlockPool bp_alloc empty\n");
+			return nullptr;
+		}
+		
+		qpool_t::iterator iter = freePool.begin();
+		QBlock* qb = *iter;
+		freePool.erase(iter);
+		return qb;
+	}
+	
+	void bp_free(QBlock* qb)
+	{
+		if (qb == nullptr)
+			return;
+		
+		qpool_t::iterator iter = staticPool.find(qb);
+		if (iter == staticPool.end())
+		{
+			printf("QBlockPool bp_free not in pool\n");
+			return;
+		}
+		
+		//iter = freePool.find(qb);
+		//if (iter != freePool.end())
+		//{
+		//	printf("QBlockPool bp_free block is free");
+		//	return;
+		//}
+		
+		qb->size = 0;
+		freePool.insert(qb);
+	}
+	
+	bool bp_empty() const
+	{
+		return freePool.empty();
+	}
+};
+
+struct PL_Queue_Internal
+{
+	PL_Queue_Config config;
+	QBlockPool pool;
+	
+	typedef std::queue<QBlock*> queue_t;
+	queue_t que;
+	
+	pthread_mutex_t* queue_pool_mutex;
+	pthread_mutex_t* sync_full_mutex;
+	pthread_mutex_t* sync_empty_mutex;
+	
+	PL_Queue_Internal() : 
+		config(), pool(), que(), 
+		queue_pool_mutex(new pthread_mutex_t), 
+		sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
+	{
+		pthread_mutex_init(queue_pool_mutex, NULL);
+		pthread_mutex_init(sync_full_mutex, NULL);
+		pthread_mutex_init(sync_empty_mutex, NULL);
+	}
+	
+	~PL_Queue_Internal()
+	{
+		if (queue_pool_mutex != nullptr)
+		{
+			pthread_mutex_destroy(queue_pool_mutex);
+			delete queue_pool_mutex;
+			queue_pool_mutex = nullptr;
+		}
+		
+		if (sync_full_mutex != nullptr)
+		{
+			pthread_mutex_destroy(sync_full_mutex);
+			delete sync_full_mutex;
+			sync_full_mutex = nullptr;
+		}
+		
+		if (sync_empty_mutex != nullptr)
+		{
+			pthread_mutex_destroy(sync_empty_mutex);
+			delete sync_empty_mutex;
+			sync_empty_mutex = nullptr;
+		}
+	}
+
+	void reset()
+	{
+		if (queue_pool_mutex != nullptr)
+		{
+			pthread_mutex_destroy(queue_pool_mutex);
+			delete queue_pool_mutex;
+			queue_pool_mutex = nullptr;
+		}
+		
+		queue_pool_mutex = new pthread_mutex_t;
+		pthread_mutex_init(queue_pool_mutex, NULL);
+		
+		if (sync_full_mutex != nullptr)
+		{
+			pthread_mutex_destroy(sync_full_mutex);
+			delete sync_full_mutex;
+			sync_full_mutex = nullptr;
+		}
+		
+		sync_full_mutex = new pthread_mutex_t;
+		pthread_mutex_init(sync_full_mutex, NULL);
+		
+		if (sync_empty_mutex != nullptr)
+		{
+			pthread_mutex_destroy(sync_empty_mutex);
+			delete sync_empty_mutex;
+			sync_empty_mutex = nullptr;
+		}
+		
+		sync_empty_mutex = new pthread_mutex_t;
+		pthread_mutex_init(sync_empty_mutex, NULL);
+		
+		//#todo free que
+	}
+};
+
+PipeLineElem* create_PL_Queue()
+{
+	return new PL_Queue;
+}
+
+PL_Queue::PL_Queue() : internal(new PL_Queue_Internal)
+{
+}
+
+PL_Queue::~PL_Queue()
+{
+	delete (PL_Queue_Internal*)internal;
+	internal= nullptr;
+}
+
+bool PL_Queue::init(void* args)
+{
+	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
+	PL_Queue_Config* config = (PL_Queue_Config*)args;
+	
+	in->reset();
+	in->config = *config;
+	in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
+	
+	if (config->syncQueueFull)
+	{
+		int ret = pthread_mutex_lock(in->sync_full_mutex);
+		if(ret != 0)
+		{
+			printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
+			return false;
+		}
+	}
+	
+	if (config->syncQueueEmpty)
+	{
+		int ret = pthread_mutex_lock(in->sync_empty_mutex);
+		if(ret != 0)
+		{
+			printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
+			return false;
+		}
+	}
+
+	return true;
+}
+
+void PL_Queue::finit()
+{
+	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete
+}
+
+bool PL_Queue::pay(const PipeMaterial& pm)
+{
+	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
+	
+	QBlock* qb = nullptr;
+	if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+	{
+		// there is no available qb
+		
+		if (in->config.queueFullDropBlock)
+		{
+			printf("PL_Queue::pay queueFullDropBlock\n");
+		
+			pthread_mutex_lock(in->queue_pool_mutex);
+			qb = in->que.front();
+			in->que.pop();
+			
+			in->pool.bp_free(qb);
+			qb = nullptr;
+			pthread_mutex_unlock(in->queue_pool_mutex);
+		}
+		else
+		{
+			if (in->config.syncQueueFull)
+			{
+				printf("PL_Queue::pay sync by sync_full_mutex\n");
+				pthread_mutex_lock(in->sync_full_mutex);
+			}
+		}
+	}
+	
+	if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+	{
+		printf("PL_Queue::pay full\n");
+		return false;
+	}
+	
+	pthread_mutex_lock(in->queue_pool_mutex);
+	qb = in->pool.bp_alloc();
+	if (qb != nullptr)
+		in->que.push(qb);
+	pthread_mutex_unlock(in->queue_pool_mutex);
+	
+	if (qb == nullptr)
+	{
+		printf("PL_Queue::pay bp_alloc nullptr\n");
+		return false;
+	}
+	
+	if (in->config.copyData)
+	{
+		memcpy(qb->data, pm.buffer, pm.buffSize);
+		qb->size = pm.buffSize;
+	}
+	
+	if (in->config.syncQueueEmpty)
+		pthread_mutex_unlock(in->sync_empty_mutex);
+
+	return true;
+}
+
+void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
+{
+	if (pm->former == nullptr || pm->args == nullptr)
+		return;
+	
+	PL_Queue* _this = (PL_Queue*)pm->former;
+	QBlock* qb = (QBlock*)pm->args;
+	PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
+	
+	pthread_mutex_lock(in->queue_pool_mutex);
+	in->pool.bp_free(qb);
+	pthread_mutex_unlock(in->queue_pool_mutex);
+}
+
+bool PL_Queue::gain(PipeMaterial& pm)
+{
+	PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
+	
+	if (in->que.empty())
+	{
+		if (in->config.syncQueueEmpty)
+			pthread_mutex_lock(in->sync_empty_mutex);
+	}
+	
+	if (in->que.empty())
+	{
+		printf("PL_Queue::gain empty\n");
+		pm.buffer = nullptr;
+		pm.buffSize = 0;
+		pm.former = this;
+		return false;
+	}
+
+	QBlock* qb = nullptr;
+	pthread_mutex_lock(in->queue_pool_mutex);
+	qb = in->que.front();
+	in->que.pop();
+	pthread_mutex_unlock(in->queue_pool_mutex);
+	
+	if (in->config.syncQueueFull)
+		pthread_mutex_unlock(in->sync_full_mutex);
+	
+	pm.type = PMT_BYTES;
+	pm.buffer = qb->data;
+	pm.buffSize = qb->size;
+	pm.former = this;
+	pm.deleter = pm_deleter_qb;
+	pm.args = qb;
+	return true;
+}
diff --git a/RtspFace/PL_Queue.h b/RtspFace/PL_Queue.h
new file mode 100644
index 0000000..72f01e2
--- /dev/null
+++ b/RtspFace/PL_Queue.h
@@ -0,0 +1,44 @@
+#ifndef _PL_QUEUE_H_
+#define _PL_QUEUE_H_
+
+#include "PipeLine.h"
+
+struct PL_Queue_Config
+{
+	size_t maxBlockCount;
+	size_t maxBlockSize;
+	bool cacheEmptyBlock;//#todo not implement
+	
+	bool syncQueueFull;
+	bool syncQueueEmpty;
+	bool queueFullDropBlock;
+	bool copyData; //#todo not implement (copy ptr)
+	
+	PL_Queue_Config() : 
+		maxBlockCount(100), maxBlockSize(300000), cacheEmptyBlock(false), 
+		syncQueueFull(true), syncQueueEmpty(true), queueFullDropBlock(false), copyData(true)
+	{
+	}
+};
+
+class PL_Queue : public PipeLineElem
+{
+public:
+	PL_Queue();
+	virtual ~PL_Queue();
+
+	virtual bool init(void* args);
+	virtual void finit();
+
+	virtual bool pay(const PipeMaterial& pm);
+	virtual bool gain(PipeMaterial& pm);
+	
+private:
+	static void pm_deleter_qb(PipeMaterial* pm);
+
+	void* internal;
+};
+
+PipeLineElem* create_PL_Queue();
+
+#endif
diff --git a/RtspFace/PL_RTSPServer.cpp b/RtspFace/PL_RTSPServer.cpp
index feb25ac..ad5d1f9 100644
--- a/RtspFace/PL_RTSPServer.cpp
+++ b/RtspFace/PL_RTSPServer.cpp
@@ -12,8 +12,11 @@
 
 struct RTSPServer_Internal
 {
-	uint8_t* buffer;
+	uint8_t buffer[1920*1080*3];
 	size_t buffSize;
+	size_t buffSizeMax;
+	
+	RTSPServerConfig config;
 
 	bool payError;
 	pthread_t live_daemon_thid;
@@ -24,7 +27,7 @@
 	MyEncoderStub * encoderStub;
 
 	RTSPServer_Internal() : 
-		buffer(nullptr), buffSize(0), 
+		buffSize(0), buffSizeMax(sizeof(buffer)), config(), 
 		payError(true), live_daemon_thid(0), frame_mutex(new pthread_mutex_t), live_daemon_running(false), 
 		server(nullptr), encoderStub(nullptr)
 	{
@@ -43,8 +46,10 @@
 	
 	void reset()
 	{
-		buffer = nullptr;
 		buffSize = 0;
+		
+		RTSPServerConfig _config;
+		config =_config;
 
 		payError = true;
 
@@ -84,32 +89,43 @@
 	
 	virtual char GetFrame(u_int8_t** FrameBuffer, unsigned int *FrameSize)
 	{
-		if (in.buffer != nullptr && in.buffSize > 0)
-		{
-			*FrameBuffer = in.buffer;
-			*FrameSize = in.buffSize;
-
-			printf("send frame size=%u\n", in.buffSize);
-			
-			in.buffer = nullptr;
-			in.buffSize = 0;
-			
-			return 1;
-		}
-		else
+		if (in.buffer == nullptr || in.buffSize <= 0)
 		{
 			ReleaseFrame();
 			return 0;
 		}
+		
+		uint8_t* pBuffer = in.buffer;
+		size_t newBufferSize = in.buffSize;
+		if (in.config.payWithAux)
+		{
+			if (newBufferSize <= 4)
+			{
+				ReleaseFrame();
+				return 0;
+			}
+			pBuffer += 4;
+			newBufferSize -= 4;
+		}
+		
+		*FrameBuffer = pBuffer;
+		*FrameSize = newBufferSize;
+
+		printf("send frame size=%u\n", in.buffSize);
 	}
 	
 	virtual char ReleaseFrame()
 	{
-		int ret = pthread_mutex_unlock(in.frame_mutex);
-		if(ret != 0)
+		in.buffSize = 0;
+		
+		if (in.config.syncDeliverFrame)
 		{
-			printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
-			return 0;
+			int ret = pthread_mutex_unlock(in.frame_mutex);
+			if(ret != 0)
+			{
+				printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
+				return 0;
+			}
 		}
 		
 		return 1;
@@ -120,11 +136,14 @@
 		// write frame buffer of RTSPServer_Internal::buffer
 		onFrame();
 
-		int ret = pthread_mutex_lock(in.frame_mutex);
-		if(ret != 0)
+		if (in.config.syncDeliverFrame)
 		{
-			printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret));
-			return;
+			int ret = pthread_mutex_lock(in.frame_mutex);
+			if(ret != 0)
+			{
+				printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret));
+				return;
+			}
 		}
 	}
 
@@ -167,6 +186,12 @@
 	RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
 	in->reset();
 
+	if (args)
+	{
+		RTSPServerConfig* config = (RTSPServerConfig*)args;
+		in->config = *config;
+	}
+
 	int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
 	if(ret != 0)
 	{
@@ -191,7 +216,10 @@
 	if (pm.buffer == nullptr || pm.buffSize <= 0)
 		return false;
 	
-	in->buffer = pm.buffer;
+	if (in->buffSize > 0)
+		printf("PL_RTSPServer::pay may lost data size=%u\n", in->buffSize);
+	
+	memcpy(in->buffer, pm.buffer, pm.buffSize);
 	in->buffSize = pm.buffSize;
 	
 	if (in->encoderStub == nullptr)
diff --git a/RtspFace/PL_RTSPServer.h b/RtspFace/PL_RTSPServer.h
index 8b0afec..ea9e154 100644
--- a/RtspFace/PL_RTSPServer.h
+++ b/RtspFace/PL_RTSPServer.h
@@ -3,6 +3,18 @@
 
 #include "PipeLine.h"
 
+struct RTSPServerConfig
+{
+	bool syncDeliverFrame;
+	bool payWithAux;
+	bool sendWithAux;
+	
+	RTSPServerConfig() : 
+		syncDeliverFrame(true), payWithAux(true), sendWithAux(false)
+	{
+	}
+};
+
 class PL_RTSPServer : public PipeLineElem
 {
 public:
diff --git a/RtspFace/PipeLine.cpp b/RtspFace/PipeLine.cpp
index 16f5ee1..8b1ffea 100644
--- a/RtspFace/PipeLine.cpp
+++ b/RtspFace/PipeLine.cpp
@@ -1,7 +1,18 @@
 #include "PipeLine.h"
 
-PipeMaterial::PipeMaterial() : buffer(nullptr), buffSize(0), former(nullptr)
+PipeMaterial::PipeMaterial() : 
+	type(PMT__FIRST), buffer(nullptr), buffSize(0), 
+	former(nullptr), deleter(nullptr), args(nullptr)
 {
+}
+
+void PipeMaterial::exec_deleter()
+{
+	if (deleter != nullptr)
+	{
+		deleter(this);
+		deleter = nullptr;
+	}
 }
 
 PipeLine::PipeLine() : global_params_map(), elem_create_func_map(), elems()
@@ -81,12 +92,16 @@
 	if (elems.size() == 1)
 	{
 		elem_begin->gain(*pm);
+		pm->exec_deleter();
 		return elem_begin;
 	}
 	else if (elems.size() == 2)
 	{
 		if (elem_begin->gain(*pm))
+		{
 			elem_last->pay(*pm);
+			pm->exec_deleter();
+		}
 		else
 			return elem_begin;
 		return elem_last;
@@ -103,16 +118,22 @@
 		while (elem_begin != elem_last)
 		{
 			if (lastRet && (lastRet = elem_begin->pay(*pm)) )
+			{
+				pm->exec_deleter();
 				lastRet = elem_begin->gain(*pm);
+			}
 			else
-				return elem_begin;
+				return elem_begin;//#todo this may memory leakage in pm
 			
 			++iter;
 			elem_begin = *iter;
 		}
 	
 		if (lastRet)
+		{
 			elem_last->pay(*pm);
+			pm->exec_deleter();
+		}
 		return elem_last;
 	}
 	
diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h
index 898e1f6..2a59df7 100644
--- a/RtspFace/PipeLine.h
+++ b/RtspFace/PipeLine.h
@@ -12,13 +12,32 @@
 class PipeLineElem;
 class PipeLine;
 
+enum PipeMaterialBufferType
+{
+	PMT__FIRST,
+	PMT_BYTES,
+	PMT_TEXT,
+	PMT_IMAGE,
+	PMT_PM_LIST,
+	PMT_PTR_AVFRAME,
+	PMT__LAST
+};
+
+struct PipeMaterial;
+typedef void (* pm_deleter_func)(PipeMaterial* pm);
+
 struct PipeMaterial
 {
+	PipeMaterialBufferType type;
 	uint8_t* buffer;
 	size_t buffSize;
 	PipeLineElem* former;
+	pm_deleter_func deleter;
+	void* args;
 	
 	PipeMaterial();
+	
+	void exec_deleter();
 };
 
 class PipeLineElem
@@ -41,9 +60,10 @@
 typedef PipeLineElem* (*elem_create_func_t)();
 
 // 0 (there is no elem). do nothing
-// 1 (there is one elem). gain
-// 2 (there is two elems). gain --> pay
-// 3 (there is more than two elems). gain --> pay gain --> pay gain --> ... --> pay
+// 1 (there is one elem). gain --> pm.deleter
+// 2 (there is two elems). gain --> pay --> pm.deleter
+// 3 (there is more than two elems). 
+//    gain --> [pay --> pm.deleter --> gain -->] [pay --> pm.deleter --> gain -->] ... --> pay --> pm.deleter
 class PipeLine
 {
 public:
diff --git a/RtspFace/main.cpp b/RtspFace/main.cpp
index ea5505d..ef672a1 100644
--- a/RtspFace/main.cpp
+++ b/RtspFace/main.cpp
@@ -5,6 +5,7 @@
 #include "PL_H264Encoder.h"
 #include "PL_AVFrameYUV420.h"
 #include "PL_AVFrameBGRA.h"
+#include "PL_Queue.h"
 
 #include <iostream>
 using namespace std;
@@ -18,33 +19,61 @@
 	pipeLine.register_elem_creator("PL_H264Decoder", create_PL_H264Decoder);
 	pipeLine.register_elem_creator("PL_AVFrameYUV420", create_PL_AVFrameYUV420);
 	pipeLine.register_elem_creator("PL_H264Encoder", create_PL_H264Encoder);
+	pipeLine.register_elem_creator("PL_Queue", create_PL_Queue);
 	
-	PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
-	RTSPConfig rtspConfig;
-	rtspConfig.progName = argv[0];
-	rtspConfig.rtspURL = argv[1];
-	rtspConfig.aux = false; // ffmpeg need aux
-	rtspConfig.verbosityLevel = 1;
-	rtspConfig.tunnelOverHTTPPortNum = 0;
-	rtspConfig.args = nullptr;
-	bool ret = rtspClient->init(&rtspConfig);
-	if (!ret)
 	{
-		cout << "rtspClient.init error" << endl;
-		exit(EXIT_FAILURE);
+		PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
+		RTSPConfig rtspConfig;
+		rtspConfig.progName = argv[0];
+		rtspConfig.rtspURL = argv[1];
+		rtspConfig.aux = true; // ffmpeg need aux, but live555 not
+		rtspConfig.verbosityLevel = 1;
+		rtspConfig.tunnelOverHTTPPortNum = 0;
+		rtspConfig.args = nullptr;
+		bool ret = rtspClient->init(&rtspConfig);
+		if (!ret)
+		{
+			cout << "rtspClient.init error" << endl;
+			exit(EXIT_FAILURE);
+		}
 	}
 	
-	//PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
-	//h264Decoder->init(nullptr);
+	//{
+	//	PL_Queue_Config config;
+	//	PL_Queue* queue1 = (PL_Queue*)pipeLine.push_elem("PL_Queue");
+	//	bool ret = queue1->init(&config);
+	//	if (!ret)
+	//	{
+	//		cout << "queue1.init error" << endl;
+	//		exit(EXIT_FAILURE);
+	//	}
+	//}
 	
-	//PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
-	//avFrameYUV420->init(nullptr);
+	{
+		PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
+		h264Decoder->init(nullptr);
+	}
+
+	//{
+	//	PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
+	//	avFrameYUV420->init(nullptr);
+	//}
+
+	{
+		PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder");
+		h264Encoder->init(nullptr);
+	}
 	
-	//PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder");
-	//h264Encoder->init(nullptr);
-	
-	PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer");
-	rtspServer->init(nullptr);
+	//{
+	//	RTSPServerConfig config;
+	//	PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer");
+	//	bool ret = rtspServer->init(&config);
+	//	if (!ret)
+	//	{
+	//		cout << "rtspServer.init error" << endl;
+	//		exit(EXIT_FAILURE);
+	//	}
+	//}
 	
 	while(true)
 	{
diff --git a/RtspFace/make.sh b/RtspFace/make.sh
index 6a122db..704fa7d 100644
--- a/RtspFace/make.sh
+++ b/RtspFace/make.sh
@@ -36,15 +36,16 @@
 g++ -g -c -std=c++11 PL_H264Encoder.cpp $CFLAGS $CPPFLAGS
 g++ -g -c -std=c++11 PL_AVFrameYUV420.cpp $CFLAGS $CPPFLAGS
 g++ -g -c -std=c++11 PL_AVFrameBGRA.cpp $CFLAGS $CPPFLAGS
+g++ -g -c -std=c++11 PL_Queue.cpp $CFLAGS $CPPFLAGS
 g++ -g -c -std=c++11 PipeLine.cpp $CFLAGS $CPPFLAGS
 
-g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS
 g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/FFmpegH264Source.cpp $CFLAGS $CPPFLAGS
+g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS
 g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS
 
 g++ -g -std=c++11 \
   main.o PipeLine.o \
-  PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o \
+  PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o \
   $FFMPEGRTSPSERVER_OBJ PL_RTSPServer.o \
   $LDFLAGS -o rtsp_face
 

--
Gitblit v1.8.0