From d9ffa50c7e8d6b8c3157690aef8e2a70af1d1695 Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期三, 09 八月 2017 13:58:01 +0800
Subject: [PATCH] rtps server (not ok)
---
RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp | 50 +++-
RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp | 19 +
RtspFace/FFmpegRTSPServer/LiveRTSPServer.h | 8
RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp | 36 +-
RtspFace/PipeLine.h | 8
RtspFace/MediaHelper.cpp | 15 +
RtspFace/PreAllocBufferQueue.h | 56 ++++
RtspFace/PL_RTSPServer2.cpp | 305 +++++++++++++++++++++++++
RtspFace/PL_AndroidMediaCodecDecoder.cpp | 2
RtspFace/PL_RTSPServer2.h | 36 +++
RtspFace/PL_AndroidMediaCodecEncoder.cpp | 21 +
RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h | 33 +-
RtspFace/MaterialBuffer.h | 11
RtspFace/PreAllocBufferQueue.cpp | 82 ++++++
RtspFace/MediaHelper.h | 14 +
15 files changed, 642 insertions(+), 54 deletions(-)
diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp b/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
index c2d5ab5..68fd872 100644
--- a/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
+++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
@@ -49,13 +49,18 @@
static unsigned newFrameSize = 0;
/* get the data frame from the Encoding thread.. */
- if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize)){
- if (newFrameDataStart!=NULL) {
+ if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize) != 0)
+ {
+ if (newFrameDataStart != NULL && newFrameSize > 0)
+ {
/* This should never happen, but check anyway.. */
- if (newFrameSize > fMaxSize) {
+ if (newFrameSize > fMaxSize)
+ {
fFrameSize = fMaxSize;
fNumTruncatedBytes = newFrameSize - fMaxSize;
- } else {
+ }
+ else
+ {
fFrameSize = newFrameSize;
}
@@ -67,12 +72,14 @@
Encoding_Source->ReleaseFrame();
}
- else {
+ else
+ {
fFrameSize=0;
fTo=NULL;
handleClosure(this);
}
- }else
+ }
+ else
{
fFrameSize = 0;
}
diff --git a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
index 2f6790a..16af86c 100644
--- a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
+++ b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
@@ -6,12 +6,14 @@
// Copyright (c) 2015 Mina Saad. All rights reserved.
//
+#include "../logger.h"
#include "LiveRTSPServer.h"
namespace MESAI
{
LiveRTSPServer::LiveRTSPServer( IEncoder * a_Encoder, int port, int httpPort )
- : m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort)
+ : env(nullptr), framedSource(nullptr),
+ m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort)
{
quit = 0;
}
@@ -21,16 +23,20 @@
}
+ void LiveRTSPServer::init()
+ {
+ TaskScheduler* scheduler = BasicTaskScheduler::createNew();
+ env = BasicUsageEnvironment::createNew(*scheduler);
+ }
+
void LiveRTSPServer::run()
{
- TaskScheduler *scheduler;
- UsageEnvironment *env ;
+ if (env == nullptr)
+ init();
+
char RTSP_Address[1024];
RTSP_Address[0]=0x00;
- scheduler = BasicTaskScheduler::createNew();
- env = BasicUsageEnvironment::createNew(*scheduler);
-
UserAuthenticationDatabase* authDB = NULL;
// if (m_Enable_Pass){
@@ -43,11 +49,10 @@
if (rtspServer == NULL)
{
- *env <<"LIVE555: Failed to create RTSP server: %s\n", env->getResultMsg();
+ LOG_ERROR <<"LIVE555: Failed to create RTSP server: " << env->getResultMsg() << LOG_ENDL;
}
- else {
-
-
+ else
+ {
if(httpTunnelingPort)
{
rtspServer->setUpTunnelingOverHTTP(httpTunnelingPort);
@@ -55,15 +60,16 @@
char const* descriptionString = "MESAI Streaming Session";
- FFmpegH264Source * source = FFmpegH264Source::createNew(*env,m_Encoder);
- StreamReplicator * inputDevice = StreamReplicator::createNew(*env, source, false);
-
+ if (framedSource == nullptr)
+ framedSource = FFmpegH264Source::createNew(*env,m_Encoder);
+ StreamReplicator * inputDevice = StreamReplicator::createNew(*env, framedSource, false);
+
ServerMediaSession* sms = ServerMediaSession::createNew(*env, RTSP_Address, RTSP_Address, descriptionString);
sms->addSubsession(MESAI::LiveServerMediaSubsession::createNew(*env, inputDevice));
rtspServer->addServerMediaSession(sms);
char* url = rtspServer->rtspURL(sms);
- *env << "Play this stream using the URL \"" << url << "\"\n";
+ LOG_INFO << "Play this stream using the URL " << url << LOG_ENDL;
delete [] url;
//signal(SIGNIT,sighandler);
@@ -74,6 +80,6 @@
}
env->reclaim();
- delete scheduler;
+ //delete scheduler; // #todo
}
}
\ No newline at end of file
diff --git a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
index 006e459..0c3f6a7 100644
--- a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
+++ b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
@@ -22,17 +22,21 @@
class LiveRTSPServer
{
public:
-
LiveRTSPServer(IEncoder * a_Encoder, int port, int httpPort );
~LiveRTSPServer();
+
+ void init();
void run();
+
+ public:
+ UsageEnvironment* env;
+ FramedSource* framedSource;
private:
int portNumber;
int httpTunnelingPort;
IEncoder * m_Encoder;
char quit;
-
};
}
diff --git a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
index 6d16082..d5204f7 100644
--- a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
+++ b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
@@ -7,23 +7,47 @@
//
#include "LiveServerMediaSubsession.h"
+#include "H264FramedSource.h"
namespace MESAI
{
- LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator)
- {
- return new LiveServerMediaSubsession(env,replicator);
- }
-
- FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
+
+LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator)
+{
+ return new LiveServerMediaSubsession(env,replicator);
+}
+
+FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
+{
+ FramedSource* source = m_replicator->createStreamReplica();
+ estBitrate = 5000;//#todo
+ return H264VideoStreamDiscreteFramer::createNew(envir(), source);
+}
+
+RTPSink* LiveServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource)
+{
+ return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic);
+}
+
+char const* LiveServerMediaSubsession::sdpLines()
+{
+ if (m_SDPLines.empty())
{
- FramedSource* source = m_replicator->createStreamReplica();
- return H264VideoStreamDiscreteFramer::createNew(envir(), source);
+ m_SDPLines.assign(OnDemandServerMediaSubsession::sdpLines());
+
+ H264FramedSource* framedSource = nullptr;
+ {
+ FramedSource* _framedSource = m_replicator->inputSource();
+ framedSource = dynamic_cast<H264FramedSource*>(_framedSource);
+ };
+
+ if (framedSource != nullptr)
+ {
+ m_SDPLines.append(framedSource->getAuxLine());
+ }
}
-
- RTPSink* LiveServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource)
- {
- return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic);
- }
+
+ return m_SDPLines.c_str();
+}
}
diff --git a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
index 83c3b2e..263c053 100644
--- a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
+++ b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
@@ -16,24 +16,29 @@
#include <liveMedia/H264VideoStreamDiscreteFramer.hh>
#include <UsageEnvironment/UsageEnvironment.hh>
#include <groupsock/Groupsock.hh>
+#include <string>
-namespace MESAI
+namespace MESAI
{
- class LiveServerMediaSubsession: public OnDemandServerMediaSubsession
- {
- public:
- static LiveServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator);
-
- protected:
- LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator)
- : OnDemandServerMediaSubsession(env, False), m_replicator(replicator) {};
-
- virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate);
- virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource);
+class LiveServerMediaSubsession: public OnDemandServerMediaSubsession
+{
+public:
+ static LiveServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator);
- StreamReplicator * m_replicator;
- };
+protected:
+ LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator)
+ : OnDemandServerMediaSubsession(env, False), m_replicator(replicator), m_SDPLines()
+ {}
+
+ virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate);
+ virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource);
+ virtual char const* sdpLines();
+ //virtual char const* getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource);
+
+ StreamReplicator * m_replicator;
+ std::string m_SDPLines;
+};
}
#endif
\ No newline at end of file
diff --git a/RtspFace/MaterialBuffer.h b/RtspFace/MaterialBuffer.h
index 715b49e..fa2d12d 100644
--- a/RtspFace/MaterialBuffer.h
+++ b/RtspFace/MaterialBuffer.h
@@ -48,6 +48,17 @@
MBFT__LAST
};
+ enum MBFUsage
+ {
+ MBFU__FIRST,
+
+ MBFU_ORIGIN_IMAGE,
+ MBFU_PROCESSED_IMAGE,
+ MBFU_INFORMATION,
+
+ MBFU__LAST
+ };
+
MBFType type;
void* buffer;
size_t buffSize;
diff --git a/RtspFace/MediaHelper.cpp b/RtspFace/MediaHelper.cpp
index 42f7467..5081788 100644
--- a/RtspFace/MediaHelper.cpp
+++ b/RtspFace/MediaHelper.cpp
@@ -3,6 +3,21 @@
#include <liveMedia/liveMedia.hh>
#include <liveMedia/Base64.hh>
+uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros)
+{
+ unsigned _resultSize = resultSize;
+ Boolean _trimTrailingZeros = trimTrailingZeros;
+ unsigned char* ret = base64Decode(in, inSize, _resultSize, _trimTrailingZeros);
+ resultSize = _resultSize;
+ return ret;
+}
+
+char* base64_encode(char const* orig, size_t origLength)
+{
+ unsigned _origLength = origLength;
+ return base64Encode(orig, _origLength);
+}
+
SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords)
{
// Make a copy of the input string, so we can replace the commas with '\0's:
diff --git a/RtspFace/MediaHelper.h b/RtspFace/MediaHelper.h
index 2c40bd4..f2f4d23 100644
--- a/RtspFace/MediaHelper.h
+++ b/RtspFace/MediaHelper.h
@@ -64,6 +64,20 @@
}
};
+template<typename T>
+struct ScopeLocker;
+
+template<>
+struct ScopeLocker<pthread_mutex_t>
+{
+ pthread_mutex_t* mut;
+ ScopeLocker(pthread_mutex_t* _mut) : mut(_mut) { if (mut) pthread_mutex_lock(mut); }
+ ~ScopeLocker(){ if (mut) pthread_mutex_unlock(mut); }
+};
+
+uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros = true);
+char* base64_encode(char const* orig, size_t origLength);
+
class SPropRecord;
SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords);
diff --git a/RtspFace/PL_AndroidMediaCodecDecoder.cpp b/RtspFace/PL_AndroidMediaCodecDecoder.cpp
index 65f6b35..82d19a2 100644
--- a/RtspFace/PL_AndroidMediaCodecDecoder.cpp
+++ b/RtspFace/PL_AndroidMediaCodecDecoder.cpp
@@ -56,7 +56,7 @@
PL_AndroidMediaCodecDecoder_Config _config;
config = _config;
- codec = nullptr;
+ codec = nullptr;//#todo destory
}
};
diff --git a/RtspFace/PL_AndroidMediaCodecEncoder.cpp b/RtspFace/PL_AndroidMediaCodecEncoder.cpp
index ca2b98b..481ce39 100644
--- a/RtspFace/PL_AndroidMediaCodecEncoder.cpp
+++ b/RtspFace/PL_AndroidMediaCodecEncoder.cpp
@@ -91,6 +91,7 @@
AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, config->ak_bit_rate);
AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_FRAME_RATE, config->ak_frame_rate);
AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_I_FRAME_INTERVAL, config->ak_i_frame_interval);
+ //AMediaFormat_setInt32(format, "profile", 0x00000100);
// see: https://developer.android.com/reference/android/media/MediaCodecInfo.CodecCapabilities.html#COLOR_FormatYUV420Flexible
#define AMEDIA_COLOR_FormatYUV420Flexible 0x7f420888
@@ -231,7 +232,7 @@
pm.buffSize = 0;
//static size_t f = 0;
- //static FILE *pFile = fopen("/sdcard/aa.264", "wb");
+ //static FILE *pFile = fopen("/data/aa.264", "wb");
//fwrite(in->buffer, sizeof(char), in->buffSize, pFile);
//if (++f > 400){
// fclose(pFile);
@@ -251,6 +252,24 @@
{
auto format = AMediaCodec_getOutputFormat(in->codec);
LOGP(INFO, "format changed to: %s", AMediaFormat_toString(format));
+
+ uint8_t* sps = nullptr;
+ size_t spsSize = 0;
+ uint8_t* pps = nullptr;
+ size_t ppsSize = 0;
+
+ AMediaFormat_getBuffer(format, "csd-0", (void**)&sps, &spsSize); // sps
+ AMediaFormat_getBuffer(format, "csd-1", (void**)&pps, &ppsSize); // pps
+
+ if (spsSize != 0)
+ {
+ std::string spsStr = base64_encode(((const char*)sps) + 4, spsSize - 4);//#todo aux
+ std::string ppsStr = base64_encode(((const char*)pps) + 4, ppsSize - 4);
+
+ this->manager->set_param(PLGP_ENC_SPS_B64, spsStr);
+ this->manager->set_param(PLGP_ENC_PPS_B64, ppsStr);
+ }
+
AMediaFormat_delete(format);
}
else if (outputBuffIdx == AMEDIACODEC_INFO_TRY_AGAIN_LATER)
diff --git a/RtspFace/PL_RTSPServer2.cpp b/RtspFace/PL_RTSPServer2.cpp
new file mode 100644
index 0000000..bda31d5
--- /dev/null
+++ b/RtspFace/PL_RTSPServer2.cpp
@@ -0,0 +1,305 @@
+#include "PL_RTSPServer.h"
+#include "MaterialBuffer.h"
+#include "logger.h"
+
+#include <liveMedia/liveMedia.hh>
+#include <BasicUsageEnvironment/BasicUsageEnvironment.hh>
+
+#include "FFmpegRTSPServer/IEncoder.h"
+#include "FFmpegRTSPServer/LiveRTSPServer.h"
+#include "FFmpegRTSPServer/H264FramedSource.h"
+#include "FFmpegRTSPServer/LiveServerMediaSubsession.h"
+#include "PreAllocBufferQueue.h"
+#include "MediaHelper.h"
+
+struct RTSPServer_Internal
+{
+ RTSPServerConfig config;
+
+ pthread_t live_daemon_thid;
+ bool live_daemon_running;
+
+ MESAI::LiveRTSPServer* server;
+
+ PreAllocBufferQueue* frameQueue;
+ pthread_mutex_t* queue_mutex;
+ pthread_mutex_t* queue_empty_mutex;
+
+ bool auxLineSet;
+
+ RTSPServer_Internal() :
+ config(),
+ live_daemon_thid(0), live_daemon_running(false),
+ server(nullptr),
+ frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
+ auxLineSet(false)
+ {
+ pthread_mutex_init(queue_mutex, NULL);
+ }
+
+ ~RTSPServer_Internal()
+ {
+ reset();
+ }
+
+ void reset()
+ {
+ RTSPServerConfig _config;
+ config =_config;
+
+ if (frameQueue != nullptr)
+ {
+ delete frameQueue;
+ frameQueue = nullptr;
+ }
+
+ if (queue_mutex != nullptr)
+ {
+ pthread_mutex_destroy(queue_mutex);
+ delete queue_mutex;
+ queue_mutex = nullptr;
+ }
+
+ queue_mutex = new pthread_mutex_t;
+ pthread_mutex_init(queue_mutex, NULL);
+
+ if (queue_empty_mutex != nullptr)
+ {
+ pthread_mutex_destroy(queue_empty_mutex);
+ delete queue_empty_mutex;
+ queue_empty_mutex = nullptr;
+ }
+
+ queue_empty_mutex = new pthread_mutex_t;
+ pthread_mutex_init(queue_empty_mutex, NULL);
+
+ live_daemon_thid = 0;
+ live_daemon_running = false;
+
+ server = nullptr; //#todo delete
+
+ auxLineSet = false;
+ }
+};
+
+PipeLineElem* create_PL_RTSPServer()
+{
+ return new PL_RTSPServer;
+}
+
+PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal)
+{
+}
+
+PL_RTSPServer::~PL_RTSPServer()
+{
+ delete (RTSPServer_Internal*)internal;
+ internal = nullptr;
+}
+
+struct DeliverFrameCallback
+{
+ RTSPServer_Internal* in;
+ PreAllocBufferQueue::Buffer* lastBuffer;
+
+ DeliverFrameCallback(RTSPServer_Internal* _in)
+ : in(_in) , lastBuffer(nullptr)
+ {
+ }
+
+ ~DeliverFrameCallback()
+ {
+ if (lastBuffer != nullptr)
+ {
+ in->frameQueue->Release(lastBuffer);
+ lastBuffer = nullptr;
+ }
+ }
+
+ static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts)
+ {
+ DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
+
+ if (_this->in->frameQueue->Empty())
+ {
+ int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
+ if (ret != 0)
+ {
+ LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
+ }
+ }
+
+ ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
+
+ if (_this->lastBuffer != nullptr)
+ {
+ // this can not happen
+ _this->in->frameQueue->Release(_this->lastBuffer);
+ _this->lastBuffer = nullptr;
+ }
+
+ _this->lastBuffer = _this->in->frameQueue->Dequeue();
+ if (_this->lastBuffer == nullptr)
+ return false;
+
+ buffer = _this->lastBuffer->buffer;
+ buffSize = _this->lastBuffer->buffSize;
+
+ LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
+ //static size_t f = 0;
+ //static FILE *pFile = fopen("/data/bb.264", "wb");
+ //fwrite(buffer, sizeof(char), buffSize, pFile);
+ //if (++f > 30){
+ // fclose(pFile);
+ // exit(0);
+ //}
+
+ gettimeofday(&pts, NULL);
+ return (_this->lastBuffer != nullptr);
+ }
+
+ static void releaseFrame(void* args)
+ {
+ DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
+
+ if (_this->lastBuffer != nullptr)
+ {
+ ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
+ _this->in->frameQueue->Release(_this->lastBuffer);
+ _this->lastBuffer = nullptr;
+ }
+ }
+};
+
+static void* live_daemon_thd(void* arg)
+{
+ RTSPServer_Internal* in = (RTSPServer_Internal*)arg;
+
+ in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080);
+
+ in->server->init();
+
+ MESAI::H264FramedSource::FrameCallbacks cbs;
+ cbs.args = new DeliverFrameCallback(in);//#todo delete
+ cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame;
+ cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame;
+ in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs);
+
+ in->live_daemon_running = true;
+ in->server->run(); // does not return
+ //#todo delete framedSource
+ in->live_daemon_running = false;
+}
+
+bool PL_RTSPServer::init(void* args)
+{
+ RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+ if (args)
+ {
+ RTSPServerConfig* config = (RTSPServerConfig*)args;
+ in->config = *config;
+ }
+
+ PreAllocBufferQueue::Config qcfg;
+ qcfg.multithreadSafe = false;
+ qcfg.fullQueueDropFront = true;
+ qcfg.fullQueueSync = false;
+ qcfg.count = 32;
+ qcfg.maxBuffSize = 100000;
+ in->frameQueue = new PreAllocBufferQueue(qcfg);
+
+ int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
+ if(ret != 0)
+ {
+ LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
+ return false;
+ }
+
+ return true;
+}
+
+void PL_RTSPServer::finit()
+{
+ RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+ pthread_join(in->live_daemon_thid, NULL);
+}
+
+bool PL_RTSPServer::pay(const PipeMaterial& pm)
+{
+ RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+ if (pm.buffer == nullptr)
+ return false;
+
+ if (pm.type != PipeMaterial::PMT_FRAME)
+ {
+ LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl;
+ return false;
+ }
+
+ if (!in->auxLineSet)
+ {
+ std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64));
+ std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64));
+
+ if (!spsStr.empty() && !ppsStr.empty())
+ {
+ MESAI::H264FramedSource* framedSource = dynamic_cast<MESAI::H264FramedSource*>(in->server->framedSource);
+ framedSource->spsBase64 = spsStr;
+ framedSource->ppsBase64 = ppsStr;
+
+ in->auxLineSet = true;
+ }
+ }
+
+ MB_Frame* frame = (MB_Frame*)pm.buffer;
+ if (frame->buffer == nullptr || frame->buffSize == 0)
+ return false;
+
+ ScopeLocker<pthread_mutex_t>(in->queue_mutex);
+ //if (in->frameQueue->Full())
+ // LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl;
+
+ PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
+ if (qbuff == nullptr)
+ {
+ LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl;
+ int ret = pthread_mutex_unlock(in->queue_empty_mutex);
+ if (ret != 0)
+ {
+ LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+ }
+ return false;
+ }
+
+ memcpy(qbuff->buffer, frame->buffer, frame->buffSize);
+ qbuff->buffSize = frame->buffSize;
+
+ //static size_t f = 0;
+ //static FILE *pFile = fopen("/data/aa.264", "wb");
+ //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
+ //if (++f > 400){
+ // fclose(pFile);
+ // exit(0);
+ //}
+
+ int ret = pthread_mutex_unlock(in->queue_empty_mutex);
+ if (ret != 0)
+ {
+ LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+ }
+ return true;
+}
+
+bool PL_RTSPServer::gain(PipeMaterial& pm)
+{
+ RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+
+ pm.type = PipeMaterial::PMT_NONE;
+ pm.buffer = nullptr;
+ pm.buffSize = 0;
+ pm.former = this;
+ return true;
+}
diff --git a/RtspFace/PL_RTSPServer2.h b/RtspFace/PL_RTSPServer2.h
new file mode 100644
index 0000000..89e2eca
--- /dev/null
+++ b/RtspFace/PL_RTSPServer2.h
@@ -0,0 +1,36 @@
+#ifndef _PL_RTSPSERVER_H_
+#define _PL_RTSPSERVER_H_
+
+#include "PipeLine.h"
+
+struct RTSPServerConfig
+{
+ bool syncDeliverFrame;
+ bool payWithAux;
+ bool sendWithAux;
+
+ RTSPServerConfig() :
+ syncDeliverFrame(true), payWithAux(true), sendWithAux(false)
+ {
+ }
+};
+
+class PL_RTSPServer : public PipeLineElem
+{
+public:
+ PL_RTSPServer();
+ virtual ~PL_RTSPServer();
+
+ virtual bool init(void* args);
+ virtual void finit();
+
+ virtual bool pay(const PipeMaterial& pm);
+ virtual bool gain(PipeMaterial& pm);
+
+private:
+ void* internal;
+};
+
+PipeLineElem* create_PL_RTSPServer();
+
+#endif
diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h
index 393b2a5..4bf3c43 100644
--- a/RtspFace/PipeLine.h
+++ b/RtspFace/PipeLine.h
@@ -11,6 +11,10 @@
#define PLGP_RTSP_WIDTH "RTSP_WIDTH"
#define PLGP_RTSP_HEIGHT "RTSP_HEIGHT"
#define PLGP_RTSP_FPS "RTSP_FPS"
+#define PLGP_DEC_SPS_B64 "DEC_SPS_B64"
+#define PLGP_DEC_PPS_B64 "DEC_PPS_B64"
+#define PLGP_ENC_SPS_B64 "ENC_SPS_B64"
+#define PLGP_ENC_PPS_B64 "ENC_PPS_B64"
#define ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER
@@ -53,8 +57,8 @@
*this = _temp;
}
- int breake(PipeMaterialBufferType selectPmType, int _selectMbfType,
- pm_breaker_func breaker, void* args = nullptr) const;
+ int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, pm_breaker_func breaker, void* args = nullptr) const;
+ int breake(int _selectMbfUsage, pm_breaker_func breaker, void* args = nullptr) const;
//#todo assemble pm/mbf into this pm
void assemble();
diff --git a/RtspFace/PreAllocBufferQueue.cpp b/RtspFace/PreAllocBufferQueue.cpp
new file mode 100644
index 0000000..f06e9d5
--- /dev/null
+++ b/RtspFace/PreAllocBufferQueue.cpp
@@ -0,0 +1,82 @@
+#include "PreAllocBufferQueue.h"
+
+PreAllocBufferQueue::PreAllocBufferQueue(const PreAllocBufferQueue::Config& _cfg)
+ : cfg(_cfg), mtsMux(_cfg.multithreadSafe ? nullptr : nullptr)
+{
+//#todo mtsMux
+
+ for (size_t i = 0; i < cfg.count; i++)
+ {
+ Buffer* qbuff = new Buffer();
+ qbuff->buffer = new uint8_t[cfg.maxBuffSize];
+
+ allBuffers.push_back(qbuff);
+ freeBuffers.push_back(qbuff);
+ }
+}
+
+PreAllocBufferQueue::~PreAllocBufferQueue()
+{
+ if (!usedBuffers.empty())
+ {
+ //#todo warning used
+ }
+
+ freeBuffers.clear();
+ while (!usedBuffers.empty()) usedBuffers.pop();
+
+ for (buffers_vec_t::iterator iter = allBuffers.begin(); iter != allBuffers.end(); ++iter)
+ {
+ Buffer* qbuff = *iter;
+ delete qbuff->buffer;
+ delete qbuff;
+ }
+
+ allBuffers.clear();
+}
+
+PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Dequeue()
+{
+ if (usedBuffers.empty())
+ return nullptr;
+
+ Buffer* qbuff = usedBuffers.front();
+ usedBuffers.pop();
+
+ return qbuff;
+}
+
+void PreAllocBufferQueue::Release(PreAllocBufferQueue::Buffer* buffer)
+{
+ if (buffer == nullptr)
+ return;
+
+ buffer->buffSize = 0;
+ freeBuffers.push_back(buffer);
+}
+
+PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Enqueue()
+{
+ if (cfg.fullQueueDropFront && Full())
+ Release(Dequeue());
+
+ if (freeBuffers.empty())
+ return nullptr;
+
+ Buffer* qbuff = freeBuffers.back();
+ freeBuffers.pop_back();
+
+ usedBuffers.push(qbuff);
+
+ return qbuff;
+}
+
+bool PreAllocBufferQueue::Empty() const
+{
+ return usedBuffers.empty();
+}
+
+bool PreAllocBufferQueue::Full() const
+{
+ return freeBuffers.empty();
+}
diff --git a/RtspFace/PreAllocBufferQueue.h b/RtspFace/PreAllocBufferQueue.h
new file mode 100644
index 0000000..9a76649
--- /dev/null
+++ b/RtspFace/PreAllocBufferQueue.h
@@ -0,0 +1,56 @@
+#ifndef _PREALLOCBUFFERQUEUE_H_
+#define _PREALLOCBUFFERQUEUE_H_
+
+#include <cstdint>
+#include <vector>
+#include <queue>
+
+class PreAllocBufferQueue
+{
+public:
+ struct Config
+ {
+ bool multithreadSafe;
+ bool fullQueueDropFront;
+ bool fullQueueSync;
+ size_t count;
+ size_t maxBuffSize;
+
+ Config()
+ : multithreadSafe(false), fullQueueDropFront(false), fullQueueSync(false), count(0), maxBuffSize(0)
+ { }
+ };
+
+ struct Buffer
+ {
+ uint8_t* buffer;
+ size_t buffSize;
+
+ Buffer() : buffer(nullptr), buffSize(0) { }
+ };
+
+ PreAllocBufferQueue(const Config& _cfg);
+
+ ~PreAllocBufferQueue();
+
+ Buffer* Dequeue();
+ void Release(Buffer* buffer);
+
+ Buffer* Enqueue();
+
+ bool Empty() const;
+ bool Full() const;
+
+private:
+ const Config cfg;
+ void* mtsMux;
+
+ typedef std::vector<Buffer*> buffers_vec_t;
+ buffers_vec_t allBuffers;
+ buffers_vec_t freeBuffers;
+
+ typedef std::queue<Buffer*> buffers_que_t;
+ buffers_que_t usedBuffers;
+};
+
+#endif
--
Gitblit v1.8.0