From d9ffa50c7e8d6b8c3157690aef8e2a70af1d1695 Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期三, 09 八月 2017 13:58:01 +0800 Subject: [PATCH] rtps server (not ok) --- RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp | 50 +++- RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp | 19 + RtspFace/FFmpegRTSPServer/LiveRTSPServer.h | 8 RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp | 36 +- RtspFace/PipeLine.h | 8 RtspFace/MediaHelper.cpp | 15 + RtspFace/PreAllocBufferQueue.h | 56 ++++ RtspFace/PL_RTSPServer2.cpp | 305 +++++++++++++++++++++++++ RtspFace/PL_AndroidMediaCodecDecoder.cpp | 2 RtspFace/PL_RTSPServer2.h | 36 +++ RtspFace/PL_AndroidMediaCodecEncoder.cpp | 21 + RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h | 33 +- RtspFace/MaterialBuffer.h | 11 RtspFace/PreAllocBufferQueue.cpp | 82 ++++++ RtspFace/MediaHelper.h | 14 + 15 files changed, 642 insertions(+), 54 deletions(-) diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp b/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp index c2d5ab5..68fd872 100644 --- a/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp +++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp @@ -49,13 +49,18 @@ static unsigned newFrameSize = 0; /* get the data frame from the Encoding thread.. */ - if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize)){ - if (newFrameDataStart!=NULL) { + if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize) != 0) + { + if (newFrameDataStart != NULL && newFrameSize > 0) + { /* This should never happen, but check anyway.. */ - if (newFrameSize > fMaxSize) { + if (newFrameSize > fMaxSize) + { fFrameSize = fMaxSize; fNumTruncatedBytes = newFrameSize - fMaxSize; - } else { + } + else + { fFrameSize = newFrameSize; } @@ -67,12 +72,14 @@ Encoding_Source->ReleaseFrame(); } - else { + else + { fFrameSize=0; fTo=NULL; handleClosure(this); } - }else + } + else { fFrameSize = 0; } diff --git a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp index 2f6790a..16af86c 100644 --- a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp +++ b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp @@ -6,12 +6,14 @@ // Copyright (c) 2015 Mina Saad. All rights reserved. // +#include "../logger.h" #include "LiveRTSPServer.h" namespace MESAI { LiveRTSPServer::LiveRTSPServer( IEncoder * a_Encoder, int port, int httpPort ) - : m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort) + : env(nullptr), framedSource(nullptr), + m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort) { quit = 0; } @@ -21,16 +23,20 @@ } + void LiveRTSPServer::init() + { + TaskScheduler* scheduler = BasicTaskScheduler::createNew(); + env = BasicUsageEnvironment::createNew(*scheduler); + } + void LiveRTSPServer::run() { - TaskScheduler *scheduler; - UsageEnvironment *env ; + if (env == nullptr) + init(); + char RTSP_Address[1024]; RTSP_Address[0]=0x00; - scheduler = BasicTaskScheduler::createNew(); - env = BasicUsageEnvironment::createNew(*scheduler); - UserAuthenticationDatabase* authDB = NULL; // if (m_Enable_Pass){ @@ -43,11 +49,10 @@ if (rtspServer == NULL) { - *env <<"LIVE555: Failed to create RTSP server: %s\n", env->getResultMsg(); + LOG_ERROR <<"LIVE555: Failed to create RTSP server: " << env->getResultMsg() << LOG_ENDL; } - else { - - + else + { if(httpTunnelingPort) { rtspServer->setUpTunnelingOverHTTP(httpTunnelingPort); @@ -55,15 +60,16 @@ char const* descriptionString = "MESAI Streaming Session"; - FFmpegH264Source * source = FFmpegH264Source::createNew(*env,m_Encoder); - StreamReplicator * inputDevice = StreamReplicator::createNew(*env, source, false); - + if (framedSource == nullptr) + framedSource = FFmpegH264Source::createNew(*env,m_Encoder); + StreamReplicator * inputDevice = StreamReplicator::createNew(*env, framedSource, false); + ServerMediaSession* sms = ServerMediaSession::createNew(*env, RTSP_Address, RTSP_Address, descriptionString); sms->addSubsession(MESAI::LiveServerMediaSubsession::createNew(*env, inputDevice)); rtspServer->addServerMediaSession(sms); char* url = rtspServer->rtspURL(sms); - *env << "Play this stream using the URL \"" << url << "\"\n"; + LOG_INFO << "Play this stream using the URL " << url << LOG_ENDL; delete [] url; //signal(SIGNIT,sighandler); @@ -74,6 +80,6 @@ } env->reclaim(); - delete scheduler; + //delete scheduler; // #todo } } \ No newline at end of file diff --git a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h index 006e459..0c3f6a7 100644 --- a/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h +++ b/RtspFace/FFmpegRTSPServer/LiveRTSPServer.h @@ -22,17 +22,21 @@ class LiveRTSPServer { public: - LiveRTSPServer(IEncoder * a_Encoder, int port, int httpPort ); ~LiveRTSPServer(); + + void init(); void run(); + + public: + UsageEnvironment* env; + FramedSource* framedSource; private: int portNumber; int httpTunnelingPort; IEncoder * m_Encoder; char quit; - }; } diff --git a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp index 6d16082..d5204f7 100644 --- a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp +++ b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp @@ -7,23 +7,47 @@ // #include "LiveServerMediaSubsession.h" +#include "H264FramedSource.h" namespace MESAI { - LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator) - { - return new LiveServerMediaSubsession(env,replicator); - } - - FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) + +LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator) +{ + return new LiveServerMediaSubsession(env,replicator); +} + +FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) +{ + FramedSource* source = m_replicator->createStreamReplica(); + estBitrate = 5000;//#todo + return H264VideoStreamDiscreteFramer::createNew(envir(), source); +} + +RTPSink* LiveServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) +{ + return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic); +} + +char const* LiveServerMediaSubsession::sdpLines() +{ + if (m_SDPLines.empty()) { - FramedSource* source = m_replicator->createStreamReplica(); - return H264VideoStreamDiscreteFramer::createNew(envir(), source); + m_SDPLines.assign(OnDemandServerMediaSubsession::sdpLines()); + + H264FramedSource* framedSource = nullptr; + { + FramedSource* _framedSource = m_replicator->inputSource(); + framedSource = dynamic_cast<H264FramedSource*>(_framedSource); + }; + + if (framedSource != nullptr) + { + m_SDPLines.append(framedSource->getAuxLine()); + } } - - RTPSink* LiveServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) - { - return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic); - } + + return m_SDPLines.c_str(); +} } diff --git a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h index 83c3b2e..263c053 100644 --- a/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h +++ b/RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h @@ -16,24 +16,29 @@ #include <liveMedia/H264VideoStreamDiscreteFramer.hh> #include <UsageEnvironment/UsageEnvironment.hh> #include <groupsock/Groupsock.hh> +#include <string> -namespace MESAI +namespace MESAI { - class LiveServerMediaSubsession: public OnDemandServerMediaSubsession - { - public: - static LiveServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator); - - protected: - LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator) - : OnDemandServerMediaSubsession(env, False), m_replicator(replicator) {}; - - virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate); - virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource); +class LiveServerMediaSubsession: public OnDemandServerMediaSubsession +{ +public: + static LiveServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator); - StreamReplicator * m_replicator; - }; +protected: + LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator) + : OnDemandServerMediaSubsession(env, False), m_replicator(replicator), m_SDPLines() + {} + + virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate); + virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource); + virtual char const* sdpLines(); + //virtual char const* getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource); + + StreamReplicator * m_replicator; + std::string m_SDPLines; +}; } #endif \ No newline at end of file diff --git a/RtspFace/MaterialBuffer.h b/RtspFace/MaterialBuffer.h index 715b49e..fa2d12d 100644 --- a/RtspFace/MaterialBuffer.h +++ b/RtspFace/MaterialBuffer.h @@ -48,6 +48,17 @@ MBFT__LAST }; + enum MBFUsage + { + MBFU__FIRST, + + MBFU_ORIGIN_IMAGE, + MBFU_PROCESSED_IMAGE, + MBFU_INFORMATION, + + MBFU__LAST + }; + MBFType type; void* buffer; size_t buffSize; diff --git a/RtspFace/MediaHelper.cpp b/RtspFace/MediaHelper.cpp index 42f7467..5081788 100644 --- a/RtspFace/MediaHelper.cpp +++ b/RtspFace/MediaHelper.cpp @@ -3,6 +3,21 @@ #include <liveMedia/liveMedia.hh> #include <liveMedia/Base64.hh> +uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros) +{ + unsigned _resultSize = resultSize; + Boolean _trimTrailingZeros = trimTrailingZeros; + unsigned char* ret = base64Decode(in, inSize, _resultSize, _trimTrailingZeros); + resultSize = _resultSize; + return ret; +} + +char* base64_encode(char const* orig, size_t origLength) +{ + unsigned _origLength = origLength; + return base64Encode(orig, _origLength); +} + SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords) { // Make a copy of the input string, so we can replace the commas with '\0's: diff --git a/RtspFace/MediaHelper.h b/RtspFace/MediaHelper.h index 2c40bd4..f2f4d23 100644 --- a/RtspFace/MediaHelper.h +++ b/RtspFace/MediaHelper.h @@ -64,6 +64,20 @@ } }; +template<typename T> +struct ScopeLocker; + +template<> +struct ScopeLocker<pthread_mutex_t> +{ + pthread_mutex_t* mut; + ScopeLocker(pthread_mutex_t* _mut) : mut(_mut) { if (mut) pthread_mutex_lock(mut); } + ~ScopeLocker(){ if (mut) pthread_mutex_unlock(mut); } +}; + +uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros = true); +char* base64_encode(char const* orig, size_t origLength); + class SPropRecord; SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords); diff --git a/RtspFace/PL_AndroidMediaCodecDecoder.cpp b/RtspFace/PL_AndroidMediaCodecDecoder.cpp index 65f6b35..82d19a2 100644 --- a/RtspFace/PL_AndroidMediaCodecDecoder.cpp +++ b/RtspFace/PL_AndroidMediaCodecDecoder.cpp @@ -56,7 +56,7 @@ PL_AndroidMediaCodecDecoder_Config _config; config = _config; - codec = nullptr; + codec = nullptr;//#todo destory } }; diff --git a/RtspFace/PL_AndroidMediaCodecEncoder.cpp b/RtspFace/PL_AndroidMediaCodecEncoder.cpp index ca2b98b..481ce39 100644 --- a/RtspFace/PL_AndroidMediaCodecEncoder.cpp +++ b/RtspFace/PL_AndroidMediaCodecEncoder.cpp @@ -91,6 +91,7 @@ AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, config->ak_bit_rate); AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_FRAME_RATE, config->ak_frame_rate); AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_I_FRAME_INTERVAL, config->ak_i_frame_interval); + //AMediaFormat_setInt32(format, "profile", 0x00000100); // see: https://developer.android.com/reference/android/media/MediaCodecInfo.CodecCapabilities.html#COLOR_FormatYUV420Flexible #define AMEDIA_COLOR_FormatYUV420Flexible 0x7f420888 @@ -231,7 +232,7 @@ pm.buffSize = 0; //static size_t f = 0; - //static FILE *pFile = fopen("/sdcard/aa.264", "wb"); + //static FILE *pFile = fopen("/data/aa.264", "wb"); //fwrite(in->buffer, sizeof(char), in->buffSize, pFile); //if (++f > 400){ // fclose(pFile); @@ -251,6 +252,24 @@ { auto format = AMediaCodec_getOutputFormat(in->codec); LOGP(INFO, "format changed to: %s", AMediaFormat_toString(format)); + + uint8_t* sps = nullptr; + size_t spsSize = 0; + uint8_t* pps = nullptr; + size_t ppsSize = 0; + + AMediaFormat_getBuffer(format, "csd-0", (void**)&sps, &spsSize); // sps + AMediaFormat_getBuffer(format, "csd-1", (void**)&pps, &ppsSize); // pps + + if (spsSize != 0) + { + std::string spsStr = base64_encode(((const char*)sps) + 4, spsSize - 4);//#todo aux + std::string ppsStr = base64_encode(((const char*)pps) + 4, ppsSize - 4); + + this->manager->set_param(PLGP_ENC_SPS_B64, spsStr); + this->manager->set_param(PLGP_ENC_PPS_B64, ppsStr); + } + AMediaFormat_delete(format); } else if (outputBuffIdx == AMEDIACODEC_INFO_TRY_AGAIN_LATER) diff --git a/RtspFace/PL_RTSPServer2.cpp b/RtspFace/PL_RTSPServer2.cpp new file mode 100644 index 0000000..bda31d5 --- /dev/null +++ b/RtspFace/PL_RTSPServer2.cpp @@ -0,0 +1,305 @@ +#include "PL_RTSPServer.h" +#include "MaterialBuffer.h" +#include "logger.h" + +#include <liveMedia/liveMedia.hh> +#include <BasicUsageEnvironment/BasicUsageEnvironment.hh> + +#include "FFmpegRTSPServer/IEncoder.h" +#include "FFmpegRTSPServer/LiveRTSPServer.h" +#include "FFmpegRTSPServer/H264FramedSource.h" +#include "FFmpegRTSPServer/LiveServerMediaSubsession.h" +#include "PreAllocBufferQueue.h" +#include "MediaHelper.h" + +struct RTSPServer_Internal +{ + RTSPServerConfig config; + + pthread_t live_daemon_thid; + bool live_daemon_running; + + MESAI::LiveRTSPServer* server; + + PreAllocBufferQueue* frameQueue; + pthread_mutex_t* queue_mutex; + pthread_mutex_t* queue_empty_mutex; + + bool auxLineSet; + + RTSPServer_Internal() : + config(), + live_daemon_thid(0), live_daemon_running(false), + server(nullptr), + frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config + auxLineSet(false) + { + pthread_mutex_init(queue_mutex, NULL); + } + + ~RTSPServer_Internal() + { + reset(); + } + + void reset() + { + RTSPServerConfig _config; + config =_config; + + if (frameQueue != nullptr) + { + delete frameQueue; + frameQueue = nullptr; + } + + if (queue_mutex != nullptr) + { + pthread_mutex_destroy(queue_mutex); + delete queue_mutex; + queue_mutex = nullptr; + } + + queue_mutex = new pthread_mutex_t; + pthread_mutex_init(queue_mutex, NULL); + + if (queue_empty_mutex != nullptr) + { + pthread_mutex_destroy(queue_empty_mutex); + delete queue_empty_mutex; + queue_empty_mutex = nullptr; + } + + queue_empty_mutex = new pthread_mutex_t; + pthread_mutex_init(queue_empty_mutex, NULL); + + live_daemon_thid = 0; + live_daemon_running = false; + + server = nullptr; //#todo delete + + auxLineSet = false; + } +}; + +PipeLineElem* create_PL_RTSPServer() +{ + return new PL_RTSPServer; +} + +PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal) +{ +} + +PL_RTSPServer::~PL_RTSPServer() +{ + delete (RTSPServer_Internal*)internal; + internal = nullptr; +} + +struct DeliverFrameCallback +{ + RTSPServer_Internal* in; + PreAllocBufferQueue::Buffer* lastBuffer; + + DeliverFrameCallback(RTSPServer_Internal* _in) + : in(_in) , lastBuffer(nullptr) + { + } + + ~DeliverFrameCallback() + { + if (lastBuffer != nullptr) + { + in->frameQueue->Release(lastBuffer); + lastBuffer = nullptr; + } + } + + static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts) + { + DeliverFrameCallback* _this = (DeliverFrameCallback*)args; + + if (_this->in->frameQueue->Empty()) + { + int ret = pthread_mutex_lock(_this->in->queue_empty_mutex); + if (ret != 0) + { + LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl; + } + } + + ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex); + + if (_this->lastBuffer != nullptr) + { + // this can not happen + _this->in->frameQueue->Release(_this->lastBuffer); + _this->lastBuffer = nullptr; + } + + _this->lastBuffer = _this->in->frameQueue->Dequeue(); + if (_this->lastBuffer == nullptr) + return false; + + buffer = _this->lastBuffer->buffer; + buffSize = _this->lastBuffer->buffSize; + + LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL; + //static size_t f = 0; + //static FILE *pFile = fopen("/data/bb.264", "wb"); + //fwrite(buffer, sizeof(char), buffSize, pFile); + //if (++f > 30){ + // fclose(pFile); + // exit(0); + //} + + gettimeofday(&pts, NULL); + return (_this->lastBuffer != nullptr); + } + + static void releaseFrame(void* args) + { + DeliverFrameCallback* _this = (DeliverFrameCallback*)args; + + if (_this->lastBuffer != nullptr) + { + ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex); + _this->in->frameQueue->Release(_this->lastBuffer); + _this->lastBuffer = nullptr; + } + } +}; + +static void* live_daemon_thd(void* arg) +{ + RTSPServer_Internal* in = (RTSPServer_Internal*)arg; + + in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080); + + in->server->init(); + + MESAI::H264FramedSource::FrameCallbacks cbs; + cbs.args = new DeliverFrameCallback(in);//#todo delete + cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame; + cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame; + in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs); + + in->live_daemon_running = true; + in->server->run(); // does not return + //#todo delete framedSource + in->live_daemon_running = false; +} + +bool PL_RTSPServer::init(void* args) +{ + RTSPServer_Internal* in = (RTSPServer_Internal*)internal; + + if (args) + { + RTSPServerConfig* config = (RTSPServerConfig*)args; + in->config = *config; + } + + PreAllocBufferQueue::Config qcfg; + qcfg.multithreadSafe = false; + qcfg.fullQueueDropFront = true; + qcfg.fullQueueSync = false; + qcfg.count = 32; + qcfg.maxBuffSize = 100000; + in->frameQueue = new PreAllocBufferQueue(qcfg); + + int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); + if(ret != 0) + { + LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl; + return false; + } + + return true; +} + +void PL_RTSPServer::finit() +{ + RTSPServer_Internal* in = (RTSPServer_Internal*)internal; + + pthread_join(in->live_daemon_thid, NULL); +} + +bool PL_RTSPServer::pay(const PipeMaterial& pm) +{ + RTSPServer_Internal* in = (RTSPServer_Internal*)internal; + + if (pm.buffer == nullptr) + return false; + + if (pm.type != PipeMaterial::PMT_FRAME) + { + LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl; + return false; + } + + if (!in->auxLineSet) + { + std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64)); + std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64)); + + if (!spsStr.empty() && !ppsStr.empty()) + { + MESAI::H264FramedSource* framedSource = dynamic_cast<MESAI::H264FramedSource*>(in->server->framedSource); + framedSource->spsBase64 = spsStr; + framedSource->ppsBase64 = ppsStr; + + in->auxLineSet = true; + } + } + + MB_Frame* frame = (MB_Frame*)pm.buffer; + if (frame->buffer == nullptr || frame->buffSize == 0) + return false; + + ScopeLocker<pthread_mutex_t>(in->queue_mutex); + //if (in->frameQueue->Full()) + // LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl; + + PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue(); + if (qbuff == nullptr) + { + LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl; + int ret = pthread_mutex_unlock(in->queue_empty_mutex); + if (ret != 0) + { + LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl; + } + return false; + } + + memcpy(qbuff->buffer, frame->buffer, frame->buffSize); + qbuff->buffSize = frame->buffSize; + + //static size_t f = 0; + //static FILE *pFile = fopen("/data/aa.264", "wb"); + //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile); + //if (++f > 400){ + // fclose(pFile); + // exit(0); + //} + + int ret = pthread_mutex_unlock(in->queue_empty_mutex); + if (ret != 0) + { + LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl; + } + return true; +} + +bool PL_RTSPServer::gain(PipeMaterial& pm) +{ + RTSPServer_Internal* in = (RTSPServer_Internal*)internal; + + pm.type = PipeMaterial::PMT_NONE; + pm.buffer = nullptr; + pm.buffSize = 0; + pm.former = this; + return true; +} diff --git a/RtspFace/PL_RTSPServer2.h b/RtspFace/PL_RTSPServer2.h new file mode 100644 index 0000000..89e2eca --- /dev/null +++ b/RtspFace/PL_RTSPServer2.h @@ -0,0 +1,36 @@ +#ifndef _PL_RTSPSERVER_H_ +#define _PL_RTSPSERVER_H_ + +#include "PipeLine.h" + +struct RTSPServerConfig +{ + bool syncDeliverFrame; + bool payWithAux; + bool sendWithAux; + + RTSPServerConfig() : + syncDeliverFrame(true), payWithAux(true), sendWithAux(false) + { + } +}; + +class PL_RTSPServer : public PipeLineElem +{ +public: + PL_RTSPServer(); + virtual ~PL_RTSPServer(); + + virtual bool init(void* args); + virtual void finit(); + + virtual bool pay(const PipeMaterial& pm); + virtual bool gain(PipeMaterial& pm); + +private: + void* internal; +}; + +PipeLineElem* create_PL_RTSPServer(); + +#endif diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h index 393b2a5..4bf3c43 100644 --- a/RtspFace/PipeLine.h +++ b/RtspFace/PipeLine.h @@ -11,6 +11,10 @@ #define PLGP_RTSP_WIDTH "RTSP_WIDTH" #define PLGP_RTSP_HEIGHT "RTSP_HEIGHT" #define PLGP_RTSP_FPS "RTSP_FPS" +#define PLGP_DEC_SPS_B64 "DEC_SPS_B64" +#define PLGP_DEC_PPS_B64 "DEC_PPS_B64" +#define PLGP_ENC_SPS_B64 "ENC_SPS_B64" +#define PLGP_ENC_PPS_B64 "ENC_PPS_B64" #define ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER @@ -53,8 +57,8 @@ *this = _temp; } - int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, - pm_breaker_func breaker, void* args = nullptr) const; + int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, pm_breaker_func breaker, void* args = nullptr) const; + int breake(int _selectMbfUsage, pm_breaker_func breaker, void* args = nullptr) const; //#todo assemble pm/mbf into this pm void assemble(); diff --git a/RtspFace/PreAllocBufferQueue.cpp b/RtspFace/PreAllocBufferQueue.cpp new file mode 100644 index 0000000..f06e9d5 --- /dev/null +++ b/RtspFace/PreAllocBufferQueue.cpp @@ -0,0 +1,82 @@ +#include "PreAllocBufferQueue.h" + +PreAllocBufferQueue::PreAllocBufferQueue(const PreAllocBufferQueue::Config& _cfg) + : cfg(_cfg), mtsMux(_cfg.multithreadSafe ? nullptr : nullptr) +{ +//#todo mtsMux + + for (size_t i = 0; i < cfg.count; i++) + { + Buffer* qbuff = new Buffer(); + qbuff->buffer = new uint8_t[cfg.maxBuffSize]; + + allBuffers.push_back(qbuff); + freeBuffers.push_back(qbuff); + } +} + +PreAllocBufferQueue::~PreAllocBufferQueue() +{ + if (!usedBuffers.empty()) + { + //#todo warning used + } + + freeBuffers.clear(); + while (!usedBuffers.empty()) usedBuffers.pop(); + + for (buffers_vec_t::iterator iter = allBuffers.begin(); iter != allBuffers.end(); ++iter) + { + Buffer* qbuff = *iter; + delete qbuff->buffer; + delete qbuff; + } + + allBuffers.clear(); +} + +PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Dequeue() +{ + if (usedBuffers.empty()) + return nullptr; + + Buffer* qbuff = usedBuffers.front(); + usedBuffers.pop(); + + return qbuff; +} + +void PreAllocBufferQueue::Release(PreAllocBufferQueue::Buffer* buffer) +{ + if (buffer == nullptr) + return; + + buffer->buffSize = 0; + freeBuffers.push_back(buffer); +} + +PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Enqueue() +{ + if (cfg.fullQueueDropFront && Full()) + Release(Dequeue()); + + if (freeBuffers.empty()) + return nullptr; + + Buffer* qbuff = freeBuffers.back(); + freeBuffers.pop_back(); + + usedBuffers.push(qbuff); + + return qbuff; +} + +bool PreAllocBufferQueue::Empty() const +{ + return usedBuffers.empty(); +} + +bool PreAllocBufferQueue::Full() const +{ + return freeBuffers.empty(); +} diff --git a/RtspFace/PreAllocBufferQueue.h b/RtspFace/PreAllocBufferQueue.h new file mode 100644 index 0000000..9a76649 --- /dev/null +++ b/RtspFace/PreAllocBufferQueue.h @@ -0,0 +1,56 @@ +#ifndef _PREALLOCBUFFERQUEUE_H_ +#define _PREALLOCBUFFERQUEUE_H_ + +#include <cstdint> +#include <vector> +#include <queue> + +class PreAllocBufferQueue +{ +public: + struct Config + { + bool multithreadSafe; + bool fullQueueDropFront; + bool fullQueueSync; + size_t count; + size_t maxBuffSize; + + Config() + : multithreadSafe(false), fullQueueDropFront(false), fullQueueSync(false), count(0), maxBuffSize(0) + { } + }; + + struct Buffer + { + uint8_t* buffer; + size_t buffSize; + + Buffer() : buffer(nullptr), buffSize(0) { } + }; + + PreAllocBufferQueue(const Config& _cfg); + + ~PreAllocBufferQueue(); + + Buffer* Dequeue(); + void Release(Buffer* buffer); + + Buffer* Enqueue(); + + bool Empty() const; + bool Full() const; + +private: + const Config cfg; + void* mtsMux; + + typedef std::vector<Buffer*> buffers_vec_t; + buffers_vec_t allBuffers; + buffers_vec_t freeBuffers; + + typedef std::queue<Buffer*> buffers_que_t; + buffers_que_t usedBuffers; +}; + +#endif -- Gitblit v1.8.0