houxiao
2017-08-09 d9ffa50c7e8d6b8c3157690aef8e2a70af1d1695
rtps server (not ok)

git-svn-id: http://192.168.1.226/svn/proxy@992 454eff88-639b-444f-9e54-f578c98de674
4个文件已添加
11个文件已修改
642 ■■■■■ 已修改文件
RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/LiveRTSPServer.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/MaterialBuffer.h 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/MediaHelper.cpp 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/MediaHelper.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_AndroidMediaCodecDecoder.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_AndroidMediaCodecEncoder.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_RTSPServer2.cpp 305 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_RTSPServer2.h 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PipeLine.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PreAllocBufferQueue.cpp 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PreAllocBufferQueue.h 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/FFmpegH264Source.cpp
@@ -49,13 +49,18 @@
        static unsigned newFrameSize = 0;
        /* get the data frame from the Encoding thread.. */
        if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize)){
            if (newFrameDataStart!=NULL) {
        if (Encoding_Source->GetFrame(&newFrameDataStart, &newFrameSize) != 0)
        {
            if (newFrameDataStart != NULL && newFrameSize > 0)
            {
                /* This should never happen, but check anyway.. */
                if (newFrameSize > fMaxSize) {
                if (newFrameSize > fMaxSize)
                {
                    fFrameSize = fMaxSize;
                    fNumTruncatedBytes = newFrameSize - fMaxSize;
                } else {
                }
                else
                {
                    fFrameSize = newFrameSize;
                }
@@ -67,12 +72,14 @@
                
                Encoding_Source->ReleaseFrame();
            }
            else {
            else
            {
                fFrameSize=0;
                fTo=NULL;
                handleClosure(this);
            }
        }else
        }
        else
        {
            fFrameSize = 0;
        }
RtspFace/FFmpegRTSPServer/LiveRTSPServer.cpp
@@ -6,12 +6,14 @@
//  Copyright (c) 2015 Mina Saad. All rights reserved.
//
#include "../logger.h"
#include "LiveRTSPServer.h"
namespace MESAI
{
    LiveRTSPServer::LiveRTSPServer( IEncoder * a_Encoder, int port, int httpPort )
        : m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort)
        : env(nullptr), framedSource(nullptr),
        m_Encoder (a_Encoder), portNumber(port), httpTunnelingPort(httpPort)
    {
        quit = 0;
    }
@@ -21,15 +23,19 @@
    }
    void LiveRTSPServer::init()
    {
        TaskScheduler* scheduler = BasicTaskScheduler::createNew();
        env = BasicUsageEnvironment::createNew(*scheduler);
    }
    void LiveRTSPServer::run()
    {
        TaskScheduler    *scheduler;
        UsageEnvironment *env ;
        if (env == nullptr)
            init();
        char RTSP_Address[1024];
        RTSP_Address[0]=0x00;
        scheduler = BasicTaskScheduler::createNew();
        env = BasicUsageEnvironment::createNew(*scheduler);
        
        UserAuthenticationDatabase* authDB = NULL;
        
@@ -43,11 +49,10 @@
        
        if (rtspServer == NULL)
        {
            *env <<"LIVE555: Failed to create RTSP server: %s\n", env->getResultMsg();
            LOG_ERROR <<"LIVE555: Failed to create RTSP server: " << env->getResultMsg() << LOG_ENDL;
        }
        else {
        else
        {
            if(httpTunnelingPort)
            {
                rtspServer->setUpTunnelingOverHTTP(httpTunnelingPort);
@@ -55,15 +60,16 @@
            
            char const* descriptionString = "MESAI Streaming Session";
            
            FFmpegH264Source * source = FFmpegH264Source::createNew(*env,m_Encoder);
            StreamReplicator * inputDevice = StreamReplicator::createNew(*env, source, false);
            if (framedSource == nullptr)
                framedSource = FFmpegH264Source::createNew(*env,m_Encoder);
            StreamReplicator * inputDevice = StreamReplicator::createNew(*env, framedSource, false);
            
            ServerMediaSession* sms = ServerMediaSession::createNew(*env, RTSP_Address, RTSP_Address, descriptionString);
            sms->addSubsession(MESAI::LiveServerMediaSubsession::createNew(*env, inputDevice));
            rtspServer->addServerMediaSession(sms);
            
            char* url = rtspServer->rtspURL(sms);
            *env << "Play this stream using the URL \"" << url << "\"\n";
            LOG_INFO << "Play this stream using the URL " << url << LOG_ENDL;
            delete [] url;
            
            //signal(SIGNIT,sighandler);
@@ -74,6 +80,6 @@
        }
        
        env->reclaim();
        delete scheduler;
        //delete scheduler; // #todo
    }
}
RtspFace/FFmpegRTSPServer/LiveRTSPServer.h
@@ -22,17 +22,21 @@
    class LiveRTSPServer
    {
    public:
        LiveRTSPServer(IEncoder  * a_Encoder, int port, int httpPort );
        ~LiveRTSPServer();
        void init();
        void run();
    public:
        UsageEnvironment* env;
        FramedSource* framedSource;
    private:
        int portNumber;
        int httpTunnelingPort;
        IEncoder * m_Encoder;
        char quit;
    };
}
RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.cpp
@@ -7,9 +7,11 @@
//
#include "LiveServerMediaSubsession.h"
#include "H264FramedSource.h"
namespace MESAI
{
    LiveServerMediaSubsession * LiveServerMediaSubsession::createNew(UsageEnvironment& env, StreamReplicator* replicator)
    { 
        return new LiveServerMediaSubsession(env,replicator);
@@ -18,6 +20,7 @@
    FramedSource* LiveServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
    {
        FramedSource* source = m_replicator->createStreamReplica();
    estBitrate = 5000;//#todo
        return H264VideoStreamDiscreteFramer::createNew(envir(), source);
    }
        
@@ -26,4 +29,25 @@
        return H264VideoRTPSink::createNew(envir(), rtpGroupsock,rtpPayloadTypeIfDynamic);
    }
char const* LiveServerMediaSubsession::sdpLines()
{
    if (m_SDPLines.empty())
    {
        m_SDPLines.assign(OnDemandServerMediaSubsession::sdpLines());
        H264FramedSource* framedSource = nullptr;
        {
            FramedSource* _framedSource = m_replicator->inputSource();
            framedSource = dynamic_cast<H264FramedSource*>(_framedSource);
        };
        if (framedSource != nullptr)
        {
            m_SDPLines.append(framedSource->getAuxLine());
        }
    }
    return m_SDPLines.c_str();
}
}
RtspFace/FFmpegRTSPServer/LiveServerMediaSubsession.h
@@ -16,6 +16,7 @@
#include <liveMedia/H264VideoStreamDiscreteFramer.hh>
#include <UsageEnvironment/UsageEnvironment.hh>
#include <groupsock/Groupsock.hh>
#include <string>
namespace MESAI 
{
@@ -27,12 +28,16 @@
    
    protected:
      LiveServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator)
          : OnDemandServerMediaSubsession(env, False), m_replicator(replicator) {};
            : OnDemandServerMediaSubsession(env, False), m_replicator(replicator), m_SDPLines()
    {}
      
      virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate);
      virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,  unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource);    
    virtual char const* sdpLines();
    //virtual char const* getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource);
      StreamReplicator * m_replicator;
    std::string m_SDPLines;
  };
}
RtspFace/MaterialBuffer.h
@@ -48,6 +48,17 @@
        MBFT__LAST
    };
    enum MBFUsage
    {
        MBFU__FIRST,
        MBFU_ORIGIN_IMAGE,
        MBFU_PROCESSED_IMAGE,
        MBFU_INFORMATION,
        MBFU__LAST
    };
    MBFType type;
    void* buffer;
    size_t buffSize;
RtspFace/MediaHelper.cpp
@@ -3,6 +3,21 @@
#include <liveMedia/liveMedia.hh>
#include <liveMedia/Base64.hh>
uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros)
{
    unsigned _resultSize = resultSize;
    Boolean _trimTrailingZeros = trimTrailingZeros;
    unsigned char* ret = base64Decode(in, inSize, _resultSize, _trimTrailingZeros);
    resultSize = _resultSize;
    return ret;
}
char* base64_encode(char const* orig, size_t origLength)
{
    unsigned _origLength = origLength;
    return base64Encode(orig, _origLength);
}
SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords)
{  
  // Make a copy of the input string, so we can replace the commas with '\0's:  
RtspFace/MediaHelper.h
@@ -64,6 +64,20 @@
    }
};
template<typename T>
struct ScopeLocker;
template<>
struct ScopeLocker<pthread_mutex_t>
{
    pthread_mutex_t* mut;
    ScopeLocker(pthread_mutex_t* _mut) : mut(_mut) { if (mut) pthread_mutex_lock(mut); }
    ~ScopeLocker(){ if (mut) pthread_mutex_unlock(mut); }
};
uint8_t* base64_decode(char const* in, size_t inSize, size_t& resultSize, bool trimTrailingZeros = true);
char* base64_encode(char const* orig, size_t origLength);
class SPropRecord;
SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, int& numSPropRecords);
RtspFace/PL_AndroidMediaCodecDecoder.cpp
@@ -56,7 +56,7 @@
        PL_AndroidMediaCodecDecoder_Config _config;
        config = _config;
        
        codec = nullptr;
        codec = nullptr;//#todo destory
    }
};
RtspFace/PL_AndroidMediaCodecEncoder.cpp
@@ -91,6 +91,7 @@
    AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, config->ak_bit_rate);
    AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_FRAME_RATE, config->ak_frame_rate);
    AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_I_FRAME_INTERVAL, config->ak_i_frame_interval);
    //AMediaFormat_setInt32(format, "profile", 0x00000100);
// see: https://developer.android.com/reference/android/media/MediaCodecInfo.CodecCapabilities.html#COLOR_FormatYUV420Flexible
#define AMEDIA_COLOR_FormatYUV420Flexible 0x7f420888
@@ -231,7 +232,7 @@
            pm.buffSize = 0;
            //static size_t f = 0;
            //static FILE *pFile = fopen("/sdcard/aa.264", "wb");
            //static FILE *pFile = fopen("/data/aa.264", "wb");
            //fwrite(in->buffer, sizeof(char), in->buffSize, pFile);
            //if (++f > 400){
            //    fclose(pFile);
@@ -251,6 +252,24 @@
    {
        auto format = AMediaCodec_getOutputFormat(in->codec);
        LOGP(INFO, "format changed to: %s", AMediaFormat_toString(format));
        uint8_t* sps = nullptr;
        size_t spsSize = 0;
        uint8_t* pps = nullptr;
        size_t ppsSize = 0;
        AMediaFormat_getBuffer(format, "csd-0", (void**)&sps, &spsSize); // sps
        AMediaFormat_getBuffer(format, "csd-1", (void**)&pps, &ppsSize); // pps
        if (spsSize != 0)
        {
            std::string spsStr = base64_encode(((const char*)sps) + 4, spsSize - 4);//#todo aux
            std::string ppsStr = base64_encode(((const char*)pps) + 4, ppsSize - 4);
            this->manager->set_param(PLGP_ENC_SPS_B64, spsStr);
            this->manager->set_param(PLGP_ENC_PPS_B64, ppsStr);
        }
        AMediaFormat_delete(format);
    }
    else if (outputBuffIdx == AMEDIACODEC_INFO_TRY_AGAIN_LATER)
RtspFace/PL_RTSPServer2.cpp
New file
@@ -0,0 +1,305 @@
#include "PL_RTSPServer.h"
#include "MaterialBuffer.h"
#include "logger.h"
#include <liveMedia/liveMedia.hh>
#include <BasicUsageEnvironment/BasicUsageEnvironment.hh>
#include "FFmpegRTSPServer/IEncoder.h"
#include "FFmpegRTSPServer/LiveRTSPServer.h"
#include "FFmpegRTSPServer/H264FramedSource.h"
#include "FFmpegRTSPServer/LiveServerMediaSubsession.h"
#include "PreAllocBufferQueue.h"
#include "MediaHelper.h"
struct RTSPServer_Internal
{
    RTSPServerConfig config;
    pthread_t live_daemon_thid;
    bool live_daemon_running;
    MESAI::LiveRTSPServer* server;
    PreAllocBufferQueue* frameQueue;
    pthread_mutex_t* queue_mutex;
    pthread_mutex_t* queue_empty_mutex;
    bool auxLineSet;
    RTSPServer_Internal() :
        config(),
        live_daemon_thid(0), live_daemon_running(false),
        server(nullptr),
        frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
        auxLineSet(false)
    {
        pthread_mutex_init(queue_mutex, NULL);
    }
    ~RTSPServer_Internal()
    {
        reset();
    }
    void reset()
    {
        RTSPServerConfig _config;
        config =_config;
        if (frameQueue != nullptr)
        {
            delete frameQueue;
            frameQueue = nullptr;
        }
        if (queue_mutex != nullptr)
        {
            pthread_mutex_destroy(queue_mutex);
            delete queue_mutex;
            queue_mutex = nullptr;
        }
        queue_mutex = new pthread_mutex_t;
        pthread_mutex_init(queue_mutex, NULL);
        if (queue_empty_mutex != nullptr)
        {
            pthread_mutex_destroy(queue_empty_mutex);
            delete queue_empty_mutex;
            queue_empty_mutex = nullptr;
        }
        queue_empty_mutex = new pthread_mutex_t;
        pthread_mutex_init(queue_empty_mutex, NULL);
        live_daemon_thid = 0;
        live_daemon_running = false;
        server = nullptr; //#todo delete
        auxLineSet = false;
    }
};
PipeLineElem* create_PL_RTSPServer()
{
    return new PL_RTSPServer;
}
PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal)
{
}
PL_RTSPServer::~PL_RTSPServer()
{
    delete (RTSPServer_Internal*)internal;
    internal = nullptr;
}
struct DeliverFrameCallback
{
    RTSPServer_Internal* in;
    PreAllocBufferQueue::Buffer* lastBuffer;
    DeliverFrameCallback(RTSPServer_Internal* _in)
            : in(_in) , lastBuffer(nullptr)
    {
    }
    ~DeliverFrameCallback()
    {
        if (lastBuffer != nullptr)
        {
            in->frameQueue->Release(lastBuffer);
            lastBuffer = nullptr;
        }
    }
    static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts)
    {
        DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
        if (_this->in->frameQueue->Empty())
        {
            int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
            if (ret != 0)
            {
                LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
            }
        }
        ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
        if (_this->lastBuffer != nullptr)
        {
            // this can not happen
            _this->in->frameQueue->Release(_this->lastBuffer);
            _this->lastBuffer = nullptr;
        }
        _this->lastBuffer = _this->in->frameQueue->Dequeue();
        if (_this->lastBuffer == nullptr)
            return false;
        buffer = _this->lastBuffer->buffer;
        buffSize = _this->lastBuffer->buffSize;
        LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
        //static size_t f = 0;
        //static FILE *pFile = fopen("/data/bb.264", "wb");
        //fwrite(buffer, sizeof(char), buffSize, pFile);
        //if (++f > 30){
        //    fclose(pFile);
        //    exit(0);
        //}
        gettimeofday(&pts, NULL);
        return (_this->lastBuffer != nullptr);
    }
    static void releaseFrame(void* args)
    {
        DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
        if (_this->lastBuffer != nullptr)
        {
            ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
            _this->in->frameQueue->Release(_this->lastBuffer);
            _this->lastBuffer = nullptr;
        }
    }
};
static void* live_daemon_thd(void* arg)
{
    RTSPServer_Internal* in = (RTSPServer_Internal*)arg;
    in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080);
    in->server->init();
    MESAI::H264FramedSource::FrameCallbacks cbs;
    cbs.args = new DeliverFrameCallback(in);//#todo delete
    cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame;
    cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame;
    in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs);
    in->live_daemon_running = true;
    in->server->run(); // does not return
    //#todo delete framedSource
    in->live_daemon_running = false;
}
bool PL_RTSPServer::init(void* args)
{
    RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
    if (args)
    {
        RTSPServerConfig* config = (RTSPServerConfig*)args;
        in->config = *config;
    }
    PreAllocBufferQueue::Config qcfg;
    qcfg.multithreadSafe = false;
    qcfg.fullQueueDropFront = true;
    qcfg.fullQueueSync = false;
    qcfg.count = 32;
    qcfg.maxBuffSize = 100000;
    in->frameQueue = new PreAllocBufferQueue(qcfg);
    int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
    if(ret != 0)
    {
        LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
        return false;
    }
    return true;
}
void PL_RTSPServer::finit()
{
    RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
    pthread_join(in->live_daemon_thid, NULL);
}
bool PL_RTSPServer::pay(const PipeMaterial& pm)
{
    RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
    if (pm.buffer == nullptr)
        return false;
    if (pm.type != PipeMaterial::PMT_FRAME)
    {
        LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl;
        return false;
    }
    if (!in->auxLineSet)
    {
        std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64));
        std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64));
        if (!spsStr.empty() && !ppsStr.empty())
        {
            MESAI::H264FramedSource* framedSource = dynamic_cast<MESAI::H264FramedSource*>(in->server->framedSource);
            framedSource->spsBase64 = spsStr;
            framedSource->ppsBase64 = ppsStr;
            in->auxLineSet = true;
        }
    }
    MB_Frame* frame = (MB_Frame*)pm.buffer;
    if (frame->buffer == nullptr || frame->buffSize == 0)
        return false;
    ScopeLocker<pthread_mutex_t>(in->queue_mutex);
    //if (in->frameQueue->Full())
    //    LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl;
    PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
    if (qbuff == nullptr)
    {
        LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl;
        int ret = pthread_mutex_unlock(in->queue_empty_mutex);
        if (ret != 0)
        {
            LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
        }
        return false;
    }
    memcpy(qbuff->buffer, frame->buffer, frame->buffSize);
    qbuff->buffSize = frame->buffSize;
    //static size_t f = 0;
    //static FILE *pFile = fopen("/data/aa.264", "wb");
    //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
    //if (++f > 400){
    //    fclose(pFile);
    //    exit(0);
    //}
    int ret = pthread_mutex_unlock(in->queue_empty_mutex);
    if (ret != 0)
    {
        LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
    }
    return true;
}
bool PL_RTSPServer::gain(PipeMaterial& pm)
{
    RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
    pm.type = PipeMaterial::PMT_NONE;
    pm.buffer = nullptr;
    pm.buffSize = 0;
    pm.former = this;
    return true;
}
RtspFace/PL_RTSPServer2.h
New file
@@ -0,0 +1,36 @@
#ifndef _PL_RTSPSERVER_H_
#define _PL_RTSPSERVER_H_
#include "PipeLine.h"
struct RTSPServerConfig
{
    bool syncDeliverFrame;
    bool payWithAux;
    bool sendWithAux;
    RTSPServerConfig() :
        syncDeliverFrame(true), payWithAux(true), sendWithAux(false)
    {
    }
};
class PL_RTSPServer : public PipeLineElem
{
public:
    PL_RTSPServer();
    virtual ~PL_RTSPServer();
    virtual bool init(void* args);
    virtual void finit();
    virtual bool pay(const PipeMaterial& pm);
    virtual bool gain(PipeMaterial& pm);
private:
    void* internal;
};
PipeLineElem* create_PL_RTSPServer();
#endif
RtspFace/PipeLine.h
@@ -11,6 +11,10 @@
#define PLGP_RTSP_WIDTH "RTSP_WIDTH"
#define PLGP_RTSP_HEIGHT "RTSP_HEIGHT"
#define PLGP_RTSP_FPS "RTSP_FPS"
#define PLGP_DEC_SPS_B64 "DEC_SPS_B64"
#define PLGP_DEC_PPS_B64 "DEC_PPS_B64"
#define PLGP_ENC_SPS_B64 "ENC_SPS_B64"
#define PLGP_ENC_PPS_B64 "ENC_PPS_B64"
#define ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER
@@ -53,8 +57,8 @@
        *this = _temp;
    }
    int breake(PipeMaterialBufferType selectPmType, int _selectMbfType,
        pm_breaker_func breaker, void* args = nullptr) const;
    int breake(PipeMaterialBufferType selectPmType, int _selectMbfType, pm_breaker_func breaker, void* args = nullptr) const;
    int breake(int _selectMbfUsage, pm_breaker_func breaker, void* args = nullptr) const;
    //#todo assemble pm/mbf into this pm
    void assemble();
RtspFace/PreAllocBufferQueue.cpp
New file
@@ -0,0 +1,82 @@
#include "PreAllocBufferQueue.h"
PreAllocBufferQueue::PreAllocBufferQueue(const PreAllocBufferQueue::Config& _cfg)
    : cfg(_cfg), mtsMux(_cfg.multithreadSafe ? nullptr : nullptr)
{
//#todo mtsMux
    for (size_t i = 0; i < cfg.count; i++)
    {
        Buffer* qbuff = new Buffer();
        qbuff->buffer = new uint8_t[cfg.maxBuffSize];
        allBuffers.push_back(qbuff);
        freeBuffers.push_back(qbuff);
    }
}
PreAllocBufferQueue::~PreAllocBufferQueue()
{
    if (!usedBuffers.empty())
    {
        //#todo warning used
    }
    freeBuffers.clear();
    while (!usedBuffers.empty()) usedBuffers.pop();
    for (buffers_vec_t::iterator iter = allBuffers.begin(); iter != allBuffers.end(); ++iter)
    {
        Buffer* qbuff = *iter;
        delete qbuff->buffer;
        delete qbuff;
    }
    allBuffers.clear();
}
PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Dequeue()
{
    if (usedBuffers.empty())
        return nullptr;
    Buffer* qbuff = usedBuffers.front();
    usedBuffers.pop();
    return qbuff;
}
void PreAllocBufferQueue::Release(PreAllocBufferQueue::Buffer* buffer)
{
    if (buffer == nullptr)
        return;
    buffer->buffSize = 0;
    freeBuffers.push_back(buffer);
}
PreAllocBufferQueue::Buffer* PreAllocBufferQueue::Enqueue()
{
    if (cfg.fullQueueDropFront && Full())
        Release(Dequeue());
    if (freeBuffers.empty())
        return nullptr;
    Buffer* qbuff = freeBuffers.back();
    freeBuffers.pop_back();
    usedBuffers.push(qbuff);
    return qbuff;
}
bool PreAllocBufferQueue::Empty() const
{
    return usedBuffers.empty();
}
bool PreAllocBufferQueue::Full() const
{
    return freeBuffers.empty();
}
RtspFace/PreAllocBufferQueue.h
New file
@@ -0,0 +1,56 @@
#ifndef _PREALLOCBUFFERQUEUE_H_
#define _PREALLOCBUFFERQUEUE_H_
#include <cstdint>
#include <vector>
#include <queue>
class PreAllocBufferQueue
{
public:
    struct Config
    {
        bool multithreadSafe;
        bool fullQueueDropFront;
        bool fullQueueSync;
        size_t count;
        size_t maxBuffSize;
        Config()
            : multithreadSafe(false), fullQueueDropFront(false), fullQueueSync(false), count(0), maxBuffSize(0)
        { }
    };
    struct Buffer
    {
        uint8_t* buffer;
        size_t buffSize;
        Buffer() : buffer(nullptr), buffSize(0) { }
    };
    PreAllocBufferQueue(const Config& _cfg);
    ~PreAllocBufferQueue();
    Buffer* Dequeue();
    void Release(Buffer* buffer);
    Buffer* Enqueue();
    bool Empty() const;
    bool Full() const;
private:
    const Config cfg;
    void* mtsMux;
    typedef std::vector<Buffer*> buffers_vec_t;
    buffers_vec_t allBuffers;
    buffers_vec_t freeBuffers;
    typedef std::queue<Buffer*> buffers_que_t;
    buffers_que_t usedBuffers;
};
#endif