RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
@@ -49,13 +49,18 @@ static unsigned newFrameSize = 0; /* get the data frame from the Encoding thread.. */ if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize)){ if (newFrameDataStart!=NULL) { if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize) != 0) { if (newFrameDataStart != NULL && newFrameSize > 0) { /* This should never happen, but check anyway.. */ if (newFrameSize > fMaxSize) { if (newFrameSize > fMaxSize) { fFrameSize = fMaxSize; fNumTruncatedBytes = newFrameSize - fMaxSize; } else { } else { fFrameSize = newFrameSize; } @@ -67,12 +72,14 @@ Encoding_Source->ReleaseFrame(); } else { else { fFrameSize=0; fTo=NULL; handleClosure(this); } }else } else { fFrameSize = 0; } RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
@@ -6,12 +6,14 @@ // Copyright (c) 2015 Mina Saad. All rights reserved. // #include "../logger.h" #include "LiveRTSPServer.h" namespace MESAI { LiveRTSPServer::LiveRTSPServer( IEncoder * a_Encoder, int port, int httpPort ) : m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort) : env(nullptr), framedSource(nullptr), m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort) { quit = 0; } @@ -21,15 +23,19 @@ } void LiveRTSPServer::init() { TaskScheduler* scheduler = BasicTaskScheduler::createNew(); env = BasicUsageEnvironment::createNew(*scheduler); } void LiveRTSPServer::run() { TaskScheduler *scheduler; UsageEnvironment *env ; if (env == nullptr) init(); char RTSP_Address[1024]; RTSP_Address[0]=0x00; scheduler = BasicTaskScheduler::createNew(); env = BasicUsageEnvironment::createNew(*scheduler); UserAuthenticationDatabase* authDB = NULL; @@ -43,11 +49,10 @@ if (rtspServer == NULL) { *env <<"LIVE555: Failed to create RTSP server: %s\n", env->getResultMsg(); LOG_ERROR <<"LIVE555: Failed to create RTSP server: " << env->getResultMsg() << LOG_ENDL; } else { else { if(httpTunnelingPort) { rtspServer->setUpTunnelingOverHTTP(httpTunnelingPort); @@ -55,15 +60,16 @@ char const* descriptionString = "MESAI Streaming Session"; FFmpegH264Source * source = FFmpegH264Source::createNew(*env,m_Encoder); StreamReplicator * inputDevice = StreamReplicator::createNew(*env, source, false); if (framedSource == nullptr) framedSource = FFmpegH264Source::createNew(*env,m_Encoder); StreamReplicator * inputDevice = StreamReplicator::createNew(*env, framedSource, false); ServerMediaSession* sms = ServerMediaSession::createNew(*env, RTSP_Address, RTSP_Address, descriptionString); sms->addSubsession(MESAI::LiveServerMediaSubsession::createNew(*env, inputDevice)); rtspServer->addServerMediaSession(sms); char* url = rtspServer->rtspURL(sms); *env << "Play this stream using the URL \"" << url << "\"\n"; LOG_INFO << "Play this stream using the URL " << url << LOG_ENDL; delete [] url; //signal(SIGNIT,sighandler); @@ -74,6 +80,6 @@ } env->reclaim(); delete scheduler; //delete scheduler; // #todo } } RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
@@ -22,17 +22,21 @@ class LiveRTSPServer { public: LiveRTSPServer(IEncoder * a_Encoder, int port, int httpPort ); ~LiveRTSPServer(); void init(); void run(); public: UsageEnvironment* env; FramedSource* framedSource; private: int portNumber; int httpTunnelingPort; IEncoder * m_Encoder; char quit; }; } RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
@@ -7,9 +7,11 @@ // #include "LiveServerMediaSubsession.h" #include "H264FramedSource.h" namespace MESAI { LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator) { return new LiveServerMediaSubsession(env,replicator); @@ -18,6 +20,7 @@ FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) { FramedSource* source = m_replicator->createStreamReplica(); estBitrate = 5000;//#todo return H264VideoStreamDiscreteFramer::createNew(envir(), source); } @@ -26,4 +29,25 @@ return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic); } char const* LiveServerMediaSubsession::sdpLines() { if (m_SDPLines.empty()) { m_SDPLines.assign(OnDemandServerMediaSubsession::sdpLines()); H264FramedSource* framedSource = nullptr; { FramedSource* _framedSource = m_replicator->inputSource(); framedSource = dynamic_cast<H264FramedSource*>(_framedSource); }; if (framedSource != nullptr) { m_SDPLines.append(framedSource->getAuxLine()); } } return m_SDPLines.c_str(); } } RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
@@ -16,6 +16,7 @@ #include <liveMedia/H264VideoStreamDiscreteFramer.hh> #include <UsageEnvironment/UsageEnvironment.hh> #include <groupsock/Groupsock.hh> #include <string> namespace MESAI { @@ -27,12 +28,16 @@ protected: LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator) : OnDemandServerMediaSubsession(env, False), m_replicator(replicator) {}; : OnDemandServerMediaSubsession(env, False), m_replicator(replicator), m_SDPLines() {} virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate); virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource); virtual char const* sdpLines(); //virtual char const* getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource); StreamReplicator * m_replicator; std::string m_SDPLines; }; } RtspFace/MaterialBuffer.h
@@ -48,6 +48,17 @@ MBFT__LAST }; enum MBFUsage { MBFU__FIRST, MBFU_ORIGIN_IMAGE, MBFU_PROCESSED_IMAGE, MBFU_INFORMATION, MBFU__LAST }; MBFType type; void* buffer; size_t buffSize; RtspFace/MediaHelper.cpp
@@ -3,6 +3,21 @@ #include <liveMedia/liveMedia.hh> #include <liveMedia/Base64.hh> uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros) { unsigned _resultSize = resultSize; Boolean _trimTrailingZeros = trimTrailingZeros; unsigned char* ret = base64Decode(in, inSize, _resultSize, _trimTrailingZeros); resultSize = _resultSize; return ret; } char* base64_encode(char const* orig, size_t origLength) { unsigned _origLength = origLength; return base64Encode(orig, _origLength); } SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords) { // Make a copy of the input string, so we can replace the commas with '\0's: RtspFace/MediaHelper.h
@@ -64,6 +64,20 @@ } }; template<typename T> struct ScopeLocker; template<> struct ScopeLocker<pthread_mutex_t> { pthread_mutex_t* mut; ScopeLocker(pthread_mutex_t* _mut) : mut(_mut) { if (mut) pthread_mutex_lock(mut); } ~ScopeLocker(){ if (mut) pthread_mutex_unlock(mut); } }; uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros = true); char* base64_encode(char const* orig, size_t origLength); class SPropRecord; SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords); RtspFace/PL_AndroidMediaCodecDecoder.cpp
@@ -56,7 +56,7 @@ PL_AndroidMediaCodecDecoder_Config _config; config = _config; codec = nullptr; codec = nullptr;//#todo destory } }; RtspFace/PL_AndroidMediaCodecEncoder.cpp
@@ -91,6 +91,7 @@ AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, config->ak_bit_rate); AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_FRAME_RATE, config->ak_frame_rate); AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_I_FRAME_INTERVAL, config->ak_i_frame_interval); //AMediaFormat_setInt32(format, "profile", 0x00000100); // see: https://developer.android.com/reference/android/media/MediaCodecInfo.CodecCapabilities.html#COLOR_FormatYUV420Flexible #define AMEDIA_COLOR_FormatYUV420Flexible 0x7f420888 @@ -231,7 +232,7 @@ pm.buffSize = 0; //static size_t f = 0; //static FILE *pFile = fopen("/sdcard/aa.264", "wb"); //static FILE *pFile = fopen("/data/aa.264", "wb"); //fwrite(in->buffer, sizeof(char), in->buffSize, pFile); //if (++f > 400){ // fclose(pFile); @@ -251,6 +252,24 @@ { auto format = AMediaCodec_getOutputFormat(in->codec); LOGP(INFO, "format changed to: %s", AMediaFormat_toString(format)); uint8_t* sps = nullptr; size_t spsSize = 0; uint8_t* pps = nullptr; size_t ppsSize = 0; AMediaFormat_getBuffer(format, "csd-0", (void**)&sps, &spsSize); // sps AMediaFormat_getBuffer(format, "csd-1", (void**)&pps, &ppsSize); // pps if (spsSize != 0) { std::string spsStr = base64_encode(((const char*)sps) + 4, spsSize - 4);//#todo aux std::string ppsStr = base64_encode(((const char*)pps) + 4, ppsSize - 4); this->manager->set_param(PLGP_ENC_SPS_B64, spsStr); this->manager->set_param(PLGP_ENC_PPS_B64, ppsStr); } AMediaFormat_delete(format); } else if (outputBuffIdx == AMEDIACODEC_INFO_TRY_AGAIN_LATER) RtspFace/PL_RTSPServer2.cpp
New file @@ -0,0 +1,305 @@ #include "PL_RTSPServer.h" #include "MaterialBuffer.h" #include "logger.h" #include <liveMedia/liveMedia.hh> #include <BasicUsageEnvironment/BasicUsageEnvironment.hh> #include "FFmpegRTSPServer/IEncoder.h" #include "FFmpegRTSPServer/LiveRTSPServer.h" #include "FFmpegRTSPServer/H264FramedSource.h" #include "FFmpegRTSPServer/LiveServerMediaSubsession.h" #include "PreAllocBufferQueue.h" #include "MediaHelper.h" struct RTSPServer_Internal { RTSPServerConfig config; pthread_t live_daemon_thid; bool live_daemon_running; MESAI::LiveRTSPServer* server; PreAllocBufferQueue* frameQueue; pthread_mutex_t* queue_mutex; pthread_mutex_t* queue_empty_mutex; bool auxLineSet; RTSPServer_Internal() : config(), live_daemon_thid(0), live_daemon_running(false), server(nullptr), frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config auxLineSet(false) { pthread_mutex_init(queue_mutex, NULL); } ~RTSPServer_Internal() { reset(); } void reset() { RTSPServerConfig _config; config =_config; if (frameQueue != nullptr) { delete frameQueue; frameQueue = nullptr; } if (queue_mutex != nullptr) { pthread_mutex_destroy(queue_mutex); delete queue_mutex; queue_mutex = nullptr; } queue_mutex = new pthread_mutex_t; pthread_mutex_init(queue_mutex, NULL); if (queue_empty_mutex != nullptr) { pthread_mutex_destroy(queue_empty_mutex); delete queue_empty_mutex; queue_empty_mutex = nullptr; } queue_empty_mutex = new pthread_mutex_t; pthread_mutex_init(queue_empty_mutex, NULL); live_daemon_thid = 0; live_daemon_running = false; server = nullptr; //#todo delete auxLineSet = false; } }; PipeLineElem* create_PL_RTSPServer() { return new PL_RTSPServer; } PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal) { } PL_RTSPServer::~PL_RTSPServer() { delete (RTSPServer_Internal*)internal; internal = nullptr; } struct DeliverFrameCallback { RTSPServer_Internal* in; PreAllocBufferQueue::Buffer* lastBuffer; DeliverFrameCallback(RTSPServer_Internal* _in) : in(_in) , lastBuffer(nullptr) { } ~DeliverFrameCallback() { if (lastBuffer != nullptr) { in->frameQueue->Release(lastBuffer); lastBuffer = nullptr; } } static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts) { DeliverFrameCallback* _this = (DeliverFrameCallback*)args; if (_this->in->frameQueue->Empty()) { int ret = pthread_mutex_lock(_this->in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl; } } ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex); if (_this->lastBuffer != nullptr) { // this can not happen _this->in->frameQueue->Release(_this->lastBuffer); _this->lastBuffer = nullptr; } _this->lastBuffer = _this->in->frameQueue->Dequeue(); if (_this->lastBuffer == nullptr) return false; buffer = _this->lastBuffer->buffer; buffSize = _this->lastBuffer->buffSize; LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL; //static size_t f = 0; //static FILE *pFile = fopen("/data/bb.264", "wb"); //fwrite(buffer, sizeof(char), buffSize, pFile); //if (++f > 30){ // fclose(pFile); // exit(0); //} gettimeofday(&pts, NULL); return (_this->lastBuffer != nullptr); } static void releaseFrame(void* args) { DeliverFrameCallback* _this = (DeliverFrameCallback*)args; if (_this->lastBuffer != nullptr) { ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex); _this->in->frameQueue->Release(_this->lastBuffer); _this->lastBuffer = nullptr; } } }; static void* live_daemon_thd(void* arg) { RTSPServer_Internal* in = (RTSPServer_Internal*)arg; in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080); in->server->init(); MESAI::H264FramedSource::FrameCallbacks cbs; cbs.args = new DeliverFrameCallback(in);//#todo delete cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame; cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame; in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs); in->live_daemon_running = true; in->server->run(); // does not return //#todo delete framedSource in->live_daemon_running = false; } bool PL_RTSPServer::init(void* args) { RTSPServer_Internal* in = (RTSPServer_Internal*)internal; if (args) { RTSPServerConfig* config = (RTSPServerConfig*)args; in->config = *config; } PreAllocBufferQueue::Config qcfg; qcfg.multithreadSafe = false; qcfg.fullQueueDropFront = true; qcfg.fullQueueSync = false; qcfg.count = 32; qcfg.maxBuffSize = 100000; in->frameQueue = new PreAllocBufferQueue(qcfg); int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); if(ret != 0) { LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl; return false; } return true; } void PL_RTSPServer::finit() { RTSPServer_Internal* in = (RTSPServer_Internal*)internal; pthread_join(in->live_daemon_thid, NULL); } bool PL_RTSPServer::pay(const PipeMaterial& pm) { RTSPServer_Internal* in = (RTSPServer_Internal*)internal; if (pm.buffer == nullptr) return false; if (pm.type != PipeMaterial::PMT_FRAME) { LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl; return false; } if (!in->auxLineSet) { std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64)); std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64)); if (!spsStr.empty() && !ppsStr.empty()) { MESAI::H264FramedSource* framedSource = dynamic_cast<MESAI::H264FramedSource*>(in->server->framedSource); framedSource->spsBase64 = spsStr; framedSource->ppsBase64 = ppsStr; in->auxLineSet = true; } } MB_Frame* frame = (MB_Frame*)pm.buffer; if (frame->buffer == nullptr || frame->buffSize == 0) return false; ScopeLocker<pthread_mutex_t>(in->queue_mutex); //if (in->frameQueue->Full()) // LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl; PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue(); if (qbuff == nullptr) { LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl; int ret = pthread_mutex_unlock(in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl; } return false; } memcpy(qbuff->buffer, frame->buffer, frame->buffSize); qbuff->buffSize = frame->buffSize; //static size_t f = 0; //static FILE *pFile = fopen("/data/aa.264", "wb"); //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile); //if (++f > 400){ // fclose(pFile); // exit(0); //} int ret = pthread_mutex_unlock(in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl; } return true; } bool PL_RTSPServer::gain(PipeMaterial& pm) { RTSPServer_Internal* in = (RTSPServer_Internal*)internal; pm.type = PipeMaterial::PMT_NONE; pm.buffer = nullptr; pm.buffSize = 0; pm.former = this; return true; } RtspFace/PL_RTSPServer2.h
New file @@ -0,0 +1,36 @@ #ifndef _PL_RTSPSERVER_H_ #define _PL_RTSPSERVER_H_ #include "PipeLine.h" struct RTSPServerConfig { bool syncDeliverFrame; bool payWithAux; bool sendWithAux; RTSPServerConfig() : syncDeliverFrame(true), payWithAux(true), sendWithAux(false) { } }; class PL_RTSPServer : public PipeLineElem { public: PL_RTSPServer(); virtual ~PL_RTSPServer(); virtual bool init(void* args); virtual void finit(); virtual bool pay(const PipeMaterial& pm); virtual bool gain(PipeMaterial& pm); private: void* internal; }; PipeLineElem* create_PL_RTSPServer(); #endif RtspFace/PipeLine.h
@@ -11,6 +11,10 @@ #define PLGP_RTSP_WIDTH "RTSP_WIDTH" #define PLGP_RTSP_HEIGHT "RTSP_HEIGHT" #define PLGP_RTSP_FPS "RTSP_FPS" #define PLGP_DEC_SPS_B64 "DEC_SPS_B64" #define PLGP_DEC_PPS_B64 "DEC_PPS_B64" #define PLGP_ENC_SPS_B64 "ENC_SPS_B64" #define PLGP_ENC_PPS_B64 "ENC_PPS_B64" #define ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER @@ -53,8 +57,8 @@ *this = _temp; } int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, pm_breaker_func breaker, void* args = nullptr) const; int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, pm_breaker_func breaker, void* args = nullptr) const; int breake(int _selectMbfUsage, pm_breaker_func breaker, void* args = nullptr) const; //#todo assemble pm/mbf into this pm void assemble(); RtspFace/PreAllocBufferQueue.cpp
New file @@ -0,0 +1,82 @@ #include "PreAllocBufferQueue.h" PreAllocBufferQueue::PreAllocBufferQueue(const PreAllocBufferQueue::Config& _cfg) : cfg(_cfg), mtsMux(_cfg.multithreadSafe ? nullptr : nullptr) { //#todo mtsMux for (size_t i = 0; i < cfg.count; i++) { Buffer* qbuff = new Buffer(); qbuff->buffer = new uint8_t[cfg.maxBuffSize]; allBuffers.push_back(qbuff); freeBuffers.push_back(qbuff); } } PreAllocBufferQueue::~PreAllocBufferQueue() { if (!usedBuffers.empty()) { //#todo warning used } freeBuffers.clear(); while (!usedBuffers.empty()) usedBuffers.pop(); for (buffers_vec_t::iterator iter = allBuffers.begin(); iter != allBuffers.end(); ++iter) { Buffer* qbuff = *iter; delete qbuff->buffer; delete qbuff; } allBuffers.clear(); } PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Dequeue() { if (usedBuffers.empty()) return nullptr; Buffer* qbuff = usedBuffers.front(); usedBuffers.pop(); return qbuff; } void PreAllocBufferQueue::Release(PreAllocBufferQueue::Buffer* buffer) { if (buffer == nullptr) return; buffer->buffSize = 0; freeBuffers.push_back(buffer); } PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Enqueue() { if (cfg.fullQueueDropFront && Full()) Release(Dequeue()); if (freeBuffers.empty()) return nullptr; Buffer* qbuff = freeBuffers.back(); freeBuffers.pop_back(); usedBuffers.push(qbuff); return qbuff; } bool PreAllocBufferQueue::Empty() const { return usedBuffers.empty(); } bool PreAllocBufferQueue::Full() const { return freeBuffers.empty(); } RtspFace/PreAllocBufferQueue.h
New file @@ -0,0 +1,56 @@ #ifndef _PREALLOCBUFFERQUEUE_H_ #define _PREALLOCBUFFERQUEUE_H_ #include <cstdint> #include <vector> #include <queue> class PreAllocBufferQueue { public: struct Config { bool multithreadSafe; bool fullQueueDropFront; bool fullQueueSync; size_t count; size_t maxBuffSize; Config() : multithreadSafe(false), fullQueueDropFront(false), fullQueueSync(false), count(0), maxBuffSize(0) { } }; struct Buffer { uint8_t* buffer; size_t buffSize; Buffer() : buffer(nullptr), buffSize(0) { } }; PreAllocBufferQueue(const Config& _cfg); ~PreAllocBufferQueue(); Buffer* Dequeue(); void Release(Buffer* buffer); Buffer* Enqueue(); bool Empty() const; bool Full() const; private: const Config cfg; void* mtsMux; typedef std::vector<Buffer*> buffers_vec_t; buffers_vec_t allBuffers; buffers_vec_t freeBuffers; typedef std::queue<Buffer*> buffers_que_t; buffers_que_t usedBuffers; }; #endif