From d9ffa50c7e8d6b8c3157690aef8e2a70af1d1695 Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期三, 09 八月 2017 13:58:01 +0800
Subject: [PATCH] rtps server (not ok)

---
 RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp |   50 +++-
 RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp          |   19 +
 RtspFace/FFmpegRTSPServer/LiveRTSPServer.h              |    8 
 RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp            |   36 +-
 RtspFace/PipeLine.h                                     |    8 
 RtspFace/MediaHelper.cpp                                |   15 +
 RtspFace/PreAllocBufferQueue.h                          |   56 ++++
 RtspFace/PL_RTSPServer2.cpp                             |  305 +++++++++++++++++++++++++
 RtspFace/PL_AndroidMediaCodecDecoder.cpp                |    2 
 RtspFace/PL_RTSPServer2.h                               |   36 +++
 RtspFace/PL_AndroidMediaCodecEncoder.cpp                |   21 +
 RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h   |   33 +-
 RtspFace/MaterialBuffer.h                               |   11 
 RtspFace/PreAllocBufferQueue.cpp                        |   82 ++++++
 RtspFace/MediaHelper.h                                  |   14 +
 15 files changed, 642 insertions(+), 54 deletions(-)

diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp b/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
index c2d5ab5..68fd872 100644
--- a/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
+++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
@@ -49,13 +49,18 @@
 		static unsigned newFrameSize = 0;
 
 		/* get the data frame from the Encoding thread.. */
-		if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize)){
-			if (newFrameDataStart!=NULL) {
+		if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize) != 0)
+		{
+			if (newFrameDataStart != NULL && newFrameSize > 0)
+			{
 				/* This should never happen, but check anyway.. */
-				if (newFrameSize > fMaxSize) {
+				if (newFrameSize > fMaxSize)
+				{
 					fFrameSize = fMaxSize;
 					fNumTruncatedBytes = newFrameSize - fMaxSize;
-				} else {
+				}
+				else
+				{
 					fFrameSize = newFrameSize;
 				}
 
@@ -67,12 +72,14 @@
 				
 				Encoding_Source->ReleaseFrame();
 			}
-			else {
+			else
+			{
 				fFrameSize=0;
 				fTo=NULL;
 				handleClosure(this);
 			}
-		}else
+		}
+		else
 		{
 			fFrameSize = 0;
 		}
diff --git a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
index 2f6790a..16af86c 100644
--- a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
+++ b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
@@ -6,12 +6,14 @@
 //  Copyright (c) 2015 Mina Saad. All rights reserved.
 //
 
+#include "../logger.h"
 #include "LiveRTSPServer.h"
 
 namespace MESAI
 {
 	LiveRTSPServer::LiveRTSPServer( IEncoder * a_Encoder, int port, int httpPort )
-		: m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort)
+		: env(nullptr), framedSource(nullptr),
+		m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort)
 	{
 		quit = 0;
 	}
@@ -21,16 +23,20 @@
 
 	}
 
+	void LiveRTSPServer::init()
+	{
+		TaskScheduler* scheduler = BasicTaskScheduler::createNew();
+		env = BasicUsageEnvironment::createNew(*scheduler);
+	}
+
 	void LiveRTSPServer::run()
 	{
-		TaskScheduler    *scheduler;
-		UsageEnvironment *env ;
+		if (env == nullptr)
+			init();
+
 		char RTSP_Address[1024];
 		RTSP_Address[0]=0x00;
 
-        scheduler = BasicTaskScheduler::createNew();
-        env = BasicUsageEnvironment::createNew(*scheduler);
-        
         UserAuthenticationDatabase* authDB = NULL;
         
         // if (m_Enable_Pass){
@@ -43,11 +49,10 @@
         
         if (rtspServer == NULL)
         {
-            *env <<"LIVE555: Failed to create RTSP server: %s\n", env->getResultMsg();
+            LOG_ERROR <<"LIVE555: Failed to create RTSP server: " << env->getResultMsg() << LOG_ENDL;
         }
-        else {
-            
-            
+        else
+		{
             if(httpTunnelingPort)
             {
                 rtspServer->setUpTunnelingOverHTTP(httpTunnelingPort);
@@ -55,15 +60,16 @@
             
             char const* descriptionString = "MESAI Streaming Session";
             
-            FFmpegH264Source * source = FFmpegH264Source::createNew(*env,m_Encoder);
-            StreamReplicator * inputDevice = StreamReplicator::createNew(*env, source, false);
-            
+			if (framedSource == nullptr)
+				framedSource = FFmpegH264Source::createNew(*env,m_Encoder);
+            StreamReplicator * inputDevice = StreamReplicator::createNew(*env, framedSource, false);
+
             ServerMediaSession* sms = ServerMediaSession::createNew(*env, RTSP_Address, RTSP_Address, descriptionString);
             sms->addSubsession(MESAI::LiveServerMediaSubsession::createNew(*env, inputDevice));
             rtspServer->addServerMediaSession(sms);
             
             char* url = rtspServer->rtspURL(sms);
-            *env << "Play this stream using the URL \"" << url << "\"\n";
+            LOG_INFO << "Play this stream using the URL " << url << LOG_ENDL;
             delete [] url;
             
             //signal(SIGNIT,sighandler);
@@ -74,6 +80,6 @@
         }
         
         env->reclaim();
-        delete scheduler;
+		//delete scheduler; // #todo
 	}
 }
\ No newline at end of file
diff --git a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
index 006e459..0c3f6a7 100644
--- a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
+++ b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
@@ -22,17 +22,21 @@
 	class LiveRTSPServer
 	{
 	public:
-
 		LiveRTSPServer(IEncoder  * a_Encoder, int port, int httpPort );
 		~LiveRTSPServer();
+
+		void init();
 		void run();
+
+	public:
+		UsageEnvironment* env;
+		FramedSource* framedSource;
 
 	private:
 		int portNumber;
 		int httpTunnelingPort;
 		IEncoder * m_Encoder;
 		char quit;
-
 	};
 }
 
diff --git a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
index 6d16082..d5204f7 100644
--- a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
+++ b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
@@ -7,23 +7,47 @@
 //
 
 #include "LiveServerMediaSubsession.h"
+#include "H264FramedSource.h"
 
 namespace MESAI
 {
-	LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator)
-	{ 
-		return new LiveServerMediaSubsession(env,replicator);
-	}
-					
-	FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
+
+LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator)
+{
+	return new LiveServerMediaSubsession(env,replicator);
+}
+
+FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
+{
+	FramedSource* source = m_replicator->createStreamReplica();
+	estBitrate = 5000;//#todo
+	return H264VideoStreamDiscreteFramer::createNew(envir(), source);
+}
+
+RTPSink* LiveServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock,  unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource)
+{
+	return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic);
+}
+
+char const* LiveServerMediaSubsession::sdpLines()
+{
+	if (m_SDPLines.empty())
 	{
-		FramedSource* source = m_replicator->createStreamReplica();
-		return H264VideoStreamDiscreteFramer::createNew(envir(), source);
+		m_SDPLines.assign(OnDemandServerMediaSubsession::sdpLines());
+
+		H264FramedSource* framedSource = nullptr;
+		{
+			FramedSource* _framedSource = m_replicator->inputSource();
+			framedSource = dynamic_cast<H264FramedSource*>(_framedSource);
+		};
+
+		if (framedSource != nullptr)
+		{
+			m_SDPLines.append(framedSource->getAuxLine());
+		}
 	}
-		
-	RTPSink* LiveServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock,  unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource)
-	{
-		return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic);
-	}
+
+	return m_SDPLines.c_str();
+}
 
 }
diff --git a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
index 83c3b2e..263c053 100644
--- a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
+++ b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
@@ -16,24 +16,29 @@
 #include <liveMedia/H264VideoStreamDiscreteFramer.hh>
 #include <UsageEnvironment/UsageEnvironment.hh>
 #include <groupsock/Groupsock.hh>
+#include <string>
 
-namespace MESAI 
+namespace MESAI
 {
 
-  class LiveServerMediaSubsession: public OnDemandServerMediaSubsession
-  {
-    public:
-      static LiveServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator);
-    
-    protected:
-      LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator)
-          : OnDemandServerMediaSubsession(env, False), m_replicator(replicator) {};
-      
-      virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate);
-      virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,  unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource);    
+class LiveServerMediaSubsession: public OnDemandServerMediaSubsession
+{
+public:
+	static LiveServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator);
 
-      StreamReplicator * m_replicator;
-  };
+protected:
+	LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator)
+			: OnDemandServerMediaSubsession(env, False), m_replicator(replicator), m_SDPLines()
+	{}
+
+	virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate);
+	virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,  unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource);
+	virtual char const* sdpLines();
+	//virtual char const* getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource);
+
+	StreamReplicator * m_replicator;
+	std::string m_SDPLines;
+};
 
 }
 #endif
\ No newline at end of file
diff --git a/RtspFace/MaterialBuffer.h b/RtspFace/MaterialBuffer.h
index 715b49e..fa2d12d 100644
--- a/RtspFace/MaterialBuffer.h
+++ b/RtspFace/MaterialBuffer.h
@@ -48,6 +48,17 @@
 		MBFT__LAST
 	};
 
+	enum MBFUsage
+	{
+		MBFU__FIRST,
+
+		MBFU_ORIGIN_IMAGE,
+		MBFU_PROCESSED_IMAGE,
+		MBFU_INFORMATION,
+
+		MBFU__LAST
+	};
+
 	MBFType type;
 	void* buffer;
 	size_t buffSize;
diff --git a/RtspFace/MediaHelper.cpp b/RtspFace/MediaHelper.cpp
index 42f7467..5081788 100644
--- a/RtspFace/MediaHelper.cpp
+++ b/RtspFace/MediaHelper.cpp
@@ -3,6 +3,21 @@
 #include <liveMedia/liveMedia.hh>
 #include <liveMedia/Base64.hh>
 
+uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros)
+{
+	unsigned _resultSize = resultSize;
+	Boolean _trimTrailingZeros = trimTrailingZeros;
+	unsigned char* ret = base64Decode(in, inSize, _resultSize, _trimTrailingZeros);
+	resultSize = _resultSize;
+	return ret;
+}
+
+char* base64_encode(char const* orig, size_t origLength)
+{
+	unsigned _origLength = origLength;
+	return base64Encode(orig, _origLength);
+}
+
 SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords)
 {  
   // Make a copy of the input string, so we can replace the commas with '\0's:  
diff --git a/RtspFace/MediaHelper.h b/RtspFace/MediaHelper.h
index 2c40bd4..f2f4d23 100644
--- a/RtspFace/MediaHelper.h
+++ b/RtspFace/MediaHelper.h
@@ -64,6 +64,20 @@
 	}
 };
 
+template<typename T>
+struct ScopeLocker;
+
+template<>
+struct ScopeLocker<pthread_mutex_t>
+{
+    pthread_mutex_t* mut;
+	ScopeLocker(pthread_mutex_t* _mut) : mut(_mut) { if (mut) pthread_mutex_lock(mut); }
+    ~ScopeLocker(){ if (mut) pthread_mutex_unlock(mut); }
+};
+
+uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros = true);
+char* base64_encode(char const* orig, size_t origLength);
+
 class SPropRecord;
 SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords);
 
diff --git a/RtspFace/PL_AndroidMediaCodecDecoder.cpp b/RtspFace/PL_AndroidMediaCodecDecoder.cpp
index 65f6b35..82d19a2 100644
--- a/RtspFace/PL_AndroidMediaCodecDecoder.cpp
+++ b/RtspFace/PL_AndroidMediaCodecDecoder.cpp
@@ -56,7 +56,7 @@
 		PL_AndroidMediaCodecDecoder_Config _config;
 		config = _config;
 		
-		codec = nullptr;
+		codec = nullptr;//#todo destory
 	}
 };
 
diff --git a/RtspFace/PL_AndroidMediaCodecEncoder.cpp b/RtspFace/PL_AndroidMediaCodecEncoder.cpp
index ca2b98b..481ce39 100644
--- a/RtspFace/PL_AndroidMediaCodecEncoder.cpp
+++ b/RtspFace/PL_AndroidMediaCodecEncoder.cpp
@@ -91,6 +91,7 @@
 	AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, config->ak_bit_rate);
 	AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_FRAME_RATE, config->ak_frame_rate);
 	AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_I_FRAME_INTERVAL, config->ak_i_frame_interval);
+	//AMediaFormat_setInt32(format, "profile", 0x00000100);
 
 // see: https://developer.android.com/reference/android/media/MediaCodecInfo.CodecCapabilities.html#COLOR_FormatYUV420Flexible
 #define AMEDIA_COLOR_FormatYUV420Flexible 0x7f420888
@@ -231,7 +232,7 @@
 			pm.buffSize = 0;
 
 			//static size_t f = 0;
-			//static FILE *pFile = fopen("/sdcard/aa.264", "wb");
+			//static FILE *pFile = fopen("/data/aa.264", "wb");
 			//fwrite(in->buffer, sizeof(char), in->buffSize, pFile);
 			//if (++f > 400){
 			//	fclose(pFile);
@@ -251,6 +252,24 @@
 	{
 		auto format = AMediaCodec_getOutputFormat(in->codec);
 		LOGP(INFO, "format changed to: %s", AMediaFormat_toString(format));
+
+		uint8_t* sps = nullptr;
+		size_t spsSize = 0;
+		uint8_t* pps = nullptr;
+		size_t ppsSize = 0;
+
+		AMediaFormat_getBuffer(format, "csd-0", (void**)&sps, &spsSize); // sps
+		AMediaFormat_getBuffer(format, "csd-1", (void**)&pps, &ppsSize); // pps
+
+		if (spsSize != 0)
+		{
+			std::string spsStr = base64_encode(((const char*)sps) + 4, spsSize - 4);//#todo aux
+			std::string ppsStr = base64_encode(((const char*)pps) + 4, ppsSize - 4);
+
+			this->manager->set_param(PLGP_ENC_SPS_B64, spsStr);
+			this->manager->set_param(PLGP_ENC_PPS_B64, ppsStr);
+		}
+
 		AMediaFormat_delete(format);
 	}
 	else if (outputBuffIdx == AMEDIACODEC_INFO_TRY_AGAIN_LATER)
diff --git a/RtspFace/PL_RTSPServer2.cpp b/RtspFace/PL_RTSPServer2.cpp
new file mode 100644
index 0000000..bda31d5
--- /dev/null
+++ b/RtspFace/PL_RTSPServer2.cpp
@@ -0,0 +1,305 @@
+#include "PL_RTSPServer.h"
+#include "MaterialBuffer.h"
+#include "logger.h"
+
+#include <liveMedia/liveMedia.hh>
+#include <BasicUsageEnvironment/BasicUsageEnvironment.hh>
+
+#include "FFmpegRTSPServer/IEncoder.h"
+#include "FFmpegRTSPServer/LiveRTSPServer.h"
+#include "FFmpegRTSPServer/H264FramedSource.h"
+#include "FFmpegRTSPServer/LiveServerMediaSubsession.h"
+#include "PreAllocBufferQueue.h"
+#include "MediaHelper.h"
+
+struct RTSPServer_Internal
+{
+	RTSPServerConfig config;
+
+	pthread_t live_daemon_thid;
+	bool live_daemon_running;
+	
+	MESAI::LiveRTSPServer* server;
+
+	PreAllocBufferQueue* frameQueue;
+	pthread_mutex_t* queue_mutex;
+	pthread_mutex_t* queue_empty_mutex;
+
+	bool auxLineSet;
+
+	RTSPServer_Internal() : 
+		config(),
+		live_daemon_thid(0), live_daemon_running(false),
+		server(nullptr),
+		frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
+		auxLineSet(false)
+	{
+		pthread_mutex_init(queue_mutex, NULL);
+	}
+	
+	~RTSPServer_Internal()
+	{
+		reset();
+	}
+	
+	void reset()
+	{
+		RTSPServerConfig _config;
+		config =_config;
+
+		if (frameQueue != nullptr)
+		{
+			delete frameQueue;
+			frameQueue = nullptr;
+		}
+
+		if (queue_mutex != nullptr)
+		{
+			pthread_mutex_destroy(queue_mutex);
+			delete queue_mutex;
+			queue_mutex = nullptr;
+		}
+		
+		queue_mutex = new pthread_mutex_t;
+		pthread_mutex_init(queue_mutex, NULL);
+
+		if (queue_empty_mutex != nullptr)
+		{
+			pthread_mutex_destroy(queue_empty_mutex);
+			delete queue_empty_mutex;
+			queue_empty_mutex = nullptr;
+		}
+
+		queue_empty_mutex = new pthread_mutex_t;
+		pthread_mutex_init(queue_empty_mutex, NULL);
+
+		live_daemon_thid = 0;
+		live_daemon_running = false;
+		
+		server = nullptr; //#todo delete
+
+		auxLineSet = false;
+	}
+};
+
+PipeLineElem* create_PL_RTSPServer()
+{
+	return new PL_RTSPServer;
+}
+
+PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal)
+{
+}
+
+PL_RTSPServer::~PL_RTSPServer()
+{
+	delete (RTSPServer_Internal*)internal;
+	internal = nullptr;
+}
+
+struct DeliverFrameCallback
+{
+	RTSPServer_Internal* in;
+	PreAllocBufferQueue::Buffer* lastBuffer;
+
+	DeliverFrameCallback(RTSPServer_Internal* _in)
+			: in(_in) , lastBuffer(nullptr)
+	{
+	}
+
+	~DeliverFrameCallback()
+	{
+		if (lastBuffer != nullptr)
+		{
+			in->frameQueue->Release(lastBuffer);
+			lastBuffer = nullptr;
+		}
+	}
+
+	static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts)
+	{
+		DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
+
+		if (_this->in->frameQueue->Empty())
+		{
+			int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
+			if (ret != 0)
+			{
+				LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
+			}
+		}
+
+		ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
+
+		if (_this->lastBuffer != nullptr)
+		{
+			// this can not happen
+			_this->in->frameQueue->Release(_this->lastBuffer);
+			_this->lastBuffer = nullptr;
+		}
+
+		_this->lastBuffer = _this->in->frameQueue->Dequeue();
+		if (_this->lastBuffer == nullptr)
+			return false;
+
+		buffer = _this->lastBuffer->buffer;
+		buffSize = _this->lastBuffer->buffSize;
+
+		LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
+		//static size_t f = 0;
+		//static FILE *pFile = fopen("/data/bb.264", "wb");
+		//fwrite(buffer, sizeof(char), buffSize, pFile);
+		//if (++f > 30){
+		//	fclose(pFile);
+		//	exit(0);
+		//}
+
+		gettimeofday(&pts, NULL);
+		return (_this->lastBuffer != nullptr);
+	}
+
+	static void releaseFrame(void* args)
+	{
+		DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
+
+		if (_this->lastBuffer != nullptr)
+		{
+			ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
+			_this->in->frameQueue->Release(_this->lastBuffer);
+			_this->lastBuffer = nullptr;
+		}
+	}
+};
+
+static void* live_daemon_thd(void* arg)
+{
+	RTSPServer_Internal* in = (RTSPServer_Internal*)arg;
+
+	in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080);
+
+	in->server->init();
+
+	MESAI::H264FramedSource::FrameCallbacks cbs;
+	cbs.args = new DeliverFrameCallback(in);//#todo delete
+	cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame;
+	cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame;
+	in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs);
+	
+	in->live_daemon_running = true;
+	in->server->run(); // does not return
+	//#todo delete framedSource
+	in->live_daemon_running = false;
+}
+
+bool PL_RTSPServer::init(void* args)
+{
+	RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+	if (args)
+	{
+		RTSPServerConfig* config = (RTSPServerConfig*)args;
+		in->config = *config;
+	}
+
+	PreAllocBufferQueue::Config qcfg;
+	qcfg.multithreadSafe = false;
+	qcfg.fullQueueDropFront = true;
+	qcfg.fullQueueSync = false;
+	qcfg.count = 32;
+	qcfg.maxBuffSize = 100000;
+	in->frameQueue = new PreAllocBufferQueue(qcfg);
+
+	int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
+	if(ret != 0)
+	{
+		LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
+		return false;
+	}
+
+	return true;
+}
+
+void PL_RTSPServer::finit()
+{
+	RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+	pthread_join(in->live_daemon_thid, NULL);
+}
+
+bool PL_RTSPServer::pay(const PipeMaterial& pm)
+{
+	RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+	if (pm.buffer == nullptr)
+		return false;
+	
+	if (pm.type != PipeMaterial::PMT_FRAME)
+	{
+		LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl;
+		return false;
+	}
+
+	if (!in->auxLineSet)
+	{
+		std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64));
+		std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64));
+
+		if (!spsStr.empty() && !ppsStr.empty())
+		{
+			MESAI::H264FramedSource* framedSource = dynamic_cast<MESAI::H264FramedSource*>(in->server->framedSource);
+			framedSource->spsBase64 = spsStr;
+			framedSource->ppsBase64 = ppsStr;
+
+			in->auxLineSet = true;
+		}
+	}
+
+	MB_Frame* frame = (MB_Frame*)pm.buffer;
+	if (frame->buffer == nullptr || frame->buffSize == 0)
+		return false;
+
+	ScopeLocker<pthread_mutex_t>(in->queue_mutex);
+	//if (in->frameQueue->Full())
+	//	LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl;
+
+	PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
+	if (qbuff == nullptr)
+	{
+		LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl;
+		int ret = pthread_mutex_unlock(in->queue_empty_mutex);
+		if (ret != 0)
+		{
+			LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+		}
+		return false;
+	}
+
+	memcpy(qbuff->buffer, frame->buffer, frame->buffSize);
+	qbuff->buffSize = frame->buffSize;
+
+	//static size_t f = 0;
+	//static FILE *pFile = fopen("/data/aa.264", "wb");
+	//fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
+	//if (++f > 400){
+	//	fclose(pFile);
+	//	exit(0);
+	//}
+
+	int ret = pthread_mutex_unlock(in->queue_empty_mutex);
+	if (ret != 0)
+	{
+		LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+	}
+	return true;
+}
+
+bool PL_RTSPServer::gain(PipeMaterial& pm)
+{
+	RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+	pm.type = PipeMaterial::PMT_NONE;
+	pm.buffer = nullptr;
+	pm.buffSize = 0;
+	pm.former = this;
+	return true;
+}
diff --git a/RtspFace/PL_RTSPServer2.h b/RtspFace/PL_RTSPServer2.h
new file mode 100644
index 0000000..89e2eca
--- /dev/null
+++ b/RtspFace/PL_RTSPServer2.h
@@ -0,0 +1,36 @@
+#ifndef _PL_RTSPSERVER_H_
+#define _PL_RTSPSERVER_H_
+
+#include "PipeLine.h"
+
+struct RTSPServerConfig
+{
+	bool syncDeliverFrame;
+	bool payWithAux;
+	bool sendWithAux;
+	
+	RTSPServerConfig() : 
+		syncDeliverFrame(true), payWithAux(true), sendWithAux(false)
+	{
+	}
+};
+
+class PL_RTSPServer : public PipeLineElem
+{
+public:
+	PL_RTSPServer();
+	virtual ~PL_RTSPServer();
+
+	virtual bool init(void* args);
+	virtual void finit();
+
+	virtual bool pay(const PipeMaterial& pm);
+	virtual bool gain(PipeMaterial& pm);
+	
+private:
+	void* internal;
+};
+
+PipeLineElem* create_PL_RTSPServer();
+
+#endif
diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h
index 393b2a5..4bf3c43 100644
--- a/RtspFace/PipeLine.h
+++ b/RtspFace/PipeLine.h
@@ -11,6 +11,10 @@
 #define PLGP_RTSP_WIDTH "RTSP_WIDTH"
 #define PLGP_RTSP_HEIGHT "RTSP_HEIGHT"
 #define PLGP_RTSP_FPS "RTSP_FPS"
+#define PLGP_DEC_SPS_B64 "DEC_SPS_B64"
+#define PLGP_DEC_PPS_B64 "DEC_PPS_B64"
+#define PLGP_ENC_SPS_B64 "ENC_SPS_B64"
+#define PLGP_ENC_PPS_B64 "ENC_PPS_B64"
 
 #define ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER
 
@@ -53,8 +57,8 @@
 		*this = _temp;
 	}
 
-	int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, 
-		pm_breaker_func breaker, void* args = nullptr) const;
+	int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, pm_breaker_func breaker, void* args = nullptr) const;
+	int breake(int _selectMbfUsage, pm_breaker_func breaker, void* args = nullptr) const;
 
 	//#todo assemble pm/mbf into this pm
 	void assemble();
diff --git a/RtspFace/PreAllocBufferQueue.cpp b/RtspFace/PreAllocBufferQueue.cpp
new file mode 100644
index 0000000..f06e9d5
--- /dev/null
+++ b/RtspFace/PreAllocBufferQueue.cpp
@@ -0,0 +1,82 @@
+#include "PreAllocBufferQueue.h"
+
+PreAllocBufferQueue::PreAllocBufferQueue(const PreAllocBufferQueue::Config& _cfg)
+	: cfg(_cfg), mtsMux(_cfg.multithreadSafe ? nullptr : nullptr)
+{
+//#todo mtsMux
+
+	for (size_t i = 0; i < cfg.count; i++)
+	{
+		Buffer* qbuff = new Buffer();
+		qbuff->buffer = new uint8_t[cfg.maxBuffSize];
+
+		allBuffers.push_back(qbuff);
+		freeBuffers.push_back(qbuff);
+	}
+}
+
+PreAllocBufferQueue::~PreAllocBufferQueue()
+{
+	if (!usedBuffers.empty())
+	{
+		//#todo warning used
+	}
+
+	freeBuffers.clear();
+	while (!usedBuffers.empty()) usedBuffers.pop();
+
+	for (buffers_vec_t::iterator iter = allBuffers.begin(); iter != allBuffers.end(); ++iter)
+	{
+		Buffer* qbuff = *iter;
+		delete qbuff->buffer;
+		delete qbuff;
+	}
+
+	allBuffers.clear();
+}
+
+PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Dequeue()
+{
+	if (usedBuffers.empty())
+		return nullptr;
+
+	Buffer* qbuff = usedBuffers.front();
+	usedBuffers.pop();
+
+	return qbuff;
+}
+
+void PreAllocBufferQueue::Release(PreAllocBufferQueue::Buffer* buffer)
+{
+	if (buffer == nullptr)
+		return;
+
+	buffer->buffSize = 0;
+	freeBuffers.push_back(buffer);
+}
+
+PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Enqueue()
+{
+	if (cfg.fullQueueDropFront && Full())
+		Release(Dequeue());
+
+	if (freeBuffers.empty())
+		return nullptr;
+
+	Buffer* qbuff = freeBuffers.back();
+	freeBuffers.pop_back();
+
+	usedBuffers.push(qbuff);
+
+	return qbuff;
+}
+
+bool PreAllocBufferQueue::Empty() const
+{
+	return usedBuffers.empty();
+}
+
+bool PreAllocBufferQueue::Full() const
+{
+	return freeBuffers.empty();
+}
diff --git a/RtspFace/PreAllocBufferQueue.h b/RtspFace/PreAllocBufferQueue.h
new file mode 100644
index 0000000..9a76649
--- /dev/null
+++ b/RtspFace/PreAllocBufferQueue.h
@@ -0,0 +1,56 @@
+#ifndef _PREALLOCBUFFERQUEUE_H_
+#define _PREALLOCBUFFERQUEUE_H_
+
+#include <cstdint>
+#include <vector>
+#include <queue>
+
+class PreAllocBufferQueue
+{
+public:
+	struct Config
+	{
+		bool multithreadSafe;
+		bool fullQueueDropFront;
+		bool fullQueueSync;
+		size_t count;
+		size_t maxBuffSize;
+
+		Config()
+			: multithreadSafe(false), fullQueueDropFront(false), fullQueueSync(false), count(0), maxBuffSize(0)
+		{ }
+	};
+
+	struct Buffer
+	{
+		uint8_t* buffer;
+		size_t buffSize;
+
+		Buffer() : buffer(nullptr), buffSize(0) { }
+	};
+
+	PreAllocBufferQueue(const Config& _cfg);
+
+	~PreAllocBufferQueue();
+
+	Buffer* Dequeue();
+	void Release(Buffer* buffer);
+
+	Buffer* Enqueue();
+	
+	bool Empty() const;
+	bool Full() const;
+
+private:
+	const Config cfg;
+	void* mtsMux;
+
+	typedef std::vector<Buffer*> buffers_vec_t;
+	buffers_vec_t allBuffers;
+	buffers_vec_t freeBuffers;
+
+	typedef std::queue<Buffer*> buffers_que_t;
+	buffers_que_t usedBuffers;
+};
+
+#endif

--
Gitblit v1.8.0