houxiao
2016-12-28 4ef430e946e717d72e923c4708a9120f94d55dbd
test h264 encoder

git-svn-id: http://192.168.1.226/svn/proxy@36 454eff88-639b-444f-9e54-f578c98de674
2个文件已添加
10个文件已修改
807 ■■■■ 已修改文件
RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp 114 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_AVFrameYUV420.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_H264Encoder.cpp 62 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_Queue.cpp 360 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_Queue.h 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_RTSPServer.cpp 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PL_RTSPServer.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PipeLine.cpp 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/PipeLine.h 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/main.cpp 71 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/make.sh 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
@@ -16,6 +16,13 @@
        pthread_mutex_init(&outqueue_mutex,NULL);
    }
    FFmpegH264Encoder::~FFmpegH264Encoder()
    {
        pthread_mutex_init(&inqueue_mutex,NULL);
        pthread_mutex_init(&outqueue_mutex,NULL);
    }
    void FFmpegH264Encoder::setCallbackFunctionFrameIsReady(std::function<void()> func)
    {
@@ -45,13 +52,13 @@
                pthread_mutex_unlock(&inqueue_mutex);
                if(frame != NULL)
                {
                    WriteFrame(frame);
                    WriteFrameRGB(frame);
                }
            }
        }
    }
    void FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id)
    bool FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id)
    {
        int ret;
        m_sws_flags = SWS_BICUBIC;
@@ -60,14 +67,17 @@
        avcodec_register_all();
        av_register_all();
        
        avformat_alloc_output_context2(&m_oc, NULL, NULL, filename);
        if (strlen(filename) == 0)
            avformat_alloc_output_context2(&m_oc, NULL, "h264", filename);
        else
            avformat_alloc_output_context2(&m_oc, NULL, NULL, filename);
        
        if (!m_oc) {
            avformat_alloc_output_context2(&m_oc, NULL, "avi", filename);
        }
        if (!m_oc) {
            return;
            return false;
        }
        m_fmt = m_oc->oformat;
@@ -79,13 +89,13 @@
        m_video_codec = avcodec_find_encoder(m_fmt->video_codec);
        if (!(m_video_codec)) {
                return;
                return false;
        }
        st = avformat_new_stream(m_oc, m_video_codec);
        if (!st) {
                return;
                return false;
        }
        st->id = m_oc->nb_streams-1;
@@ -112,7 +122,7 @@
        ret = avcodec_open2(c, m_video_codec, NULL);
        if (ret < 0) {
            return;
            return false;
        }
        //ret = avpicture_alloc(&m_dst_picture, c->pix_fmt, c->width, c->height);
@@ -126,7 +136,7 @@
        ret = av_image_alloc(m_dst_picture->data, m_dst_picture->linesize, c->width, c->height, (AVPixelFormat)m_dst_picture->format, 32);
        if (ret < 0) {
            return;
            return false;
        }
        //ret = avpicture_alloc(&m_src_picture, AV_PIX_FMT_BGR24, c->width, c->height);
@@ -135,7 +145,7 @@
        ret = av_image_alloc(m_src_picture->data, m_src_picture->linesize, c->width, c->height, AV_PIX_FMT_BGR24, 24);
        if (ret < 0) {
            return;
            return false;
        }
        bufferSize = ret;
@@ -145,27 +155,28 @@
        if (!(m_fmt->flags & AVFMT_NOFILE)) {
            ret = avio_open(&m_oc->pb, filename, AVIO_FLAG_WRITE);
            if (ret < 0) {
                return;
                return false;
            }
        }
        
        ret = avformat_write_header(m_oc, NULL);
        
        if (ret < 0) {
            return;
            return false;
        }
        sws_ctx = sws_getContext(c->width, c->height, AV_PIX_FMT_BGR24,
                                 c->width, c->height, AV_PIX_FMT_YUV420P,
                                 SWS_BICUBIC, NULL, NULL, NULL);
        if (!sws_ctx) {
            return;
            return false;
        }
        return true;
    }
    void FFmpegH264Encoder::WriteFrame(uint8_t * RGBFrame )
    {
    bool FFmpegH264Encoder::WriteFrameRGB(uint8_t * RGBFrame )
    {
        memcpy(m_src_picture->data[0], RGBFrame, bufferSize);
        sws_scale(sws_ctx,
@@ -182,7 +193,7 @@
        ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet);
        
        if (ret < 0) {
            return;
            return false;
        }
        if (!ret && got_packet && pkt.size) 
@@ -215,10 +226,75 @@
        m_frame_count++;
        m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base);
        
        onFrame();
        if (onFrame != nullptr)
            onFrame();
        return true;
    }
    void copyAVFrame(AVFrame* dest, AVFrame* src)
    {
        int height = dest->height;
        int width = dest->width;
        memcpy(dest->data[0], src->data[0], height * width); // Y
        memcpy(dest->data[1], src->data[1], height * width / 4); // U
        memcpy(dest->data[2], src->data[2], height * width / 4); // V
    }
    bool FFmpegH264Encoder::WriteFrameYUV420(AVFrame * YUVFrame)
    {
        copyAVFrame(m_dst_picture, YUVFrame);
        AVPacket pkt = { 0 };
        int got_packet;
        av_init_packet(&pkt);
        int ret = 0;
        ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet);
        if (ret < 0) {
            return false;
        }
        if (!ret && got_packet && pkt.size)
        {
            pkt.stream_index = m_video_st->index;
            FrameStructure * frame = new FrameStructure();
            frame->dataPointer = new uint8_t[pkt.size];
            frame->dataSize = pkt.size-4;
            frame->frameID = m_frame_count;
            memcpy(frame->dataPointer,pkt.data+4,pkt.size-4);
            pthread_mutex_lock(&outqueue_mutex);
            if(outqueue.size()<30)
            {
                outqueue.push(frame);
            }
            else
            {
                delete frame;
            }
            pthread_mutex_unlock(&outqueue_mutex);
        }
        av_free_packet(&pkt);
        m_frame_count++;
        m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base);
        if (onFrame != nullptr)
            onFrame();
        return true;
    }
    void FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond)
    bool FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond)
    {
        m_filename = filename;
        m_AVIMOV_WIDTH=Width;    //Movie width
@@ -227,7 +303,7 @@
        m_AVIMOV_GOB=GOB;        //I frames per no of P frames, see note below!
        m_AVIMOV_BPS=BitPerSecond; //Bits per second, if this is too low then movie will become garbled
        
        SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264);
        return SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264);
    }
    void FFmpegH264Encoder::CloseCodec()
RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
@@ -49,18 +49,19 @@
    {
    public:
        FFmpegH264Encoder();
        ~FFmpegH264Encoder();
        virtual ~FFmpegH264Encoder();
        
        virtual void setCallbackFunctionFrameIsReady(std::function<void()> func);
        
        void SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond);
        bool SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond);
        void CloseVideo();
        void SetupCodec(const char *filename, int codec_id);
        bool SetupCodec(const char *filename, int codec_id);
        void CloseCodec();
        
        void SendNewFrame(uint8_t * RGBFrame);        
        void WriteFrame(uint8_t * RGBFrame);
        bool WriteFrameRGB(uint8_t * RGBFrame);
        bool WriteFrameYUV420(AVFrame * YUVFrame);
        virtual char ReleaseFrame();
        void run();    
RtspFace/PL_AVFrameYUV420.cpp
@@ -104,6 +104,7 @@
    //in->buffer readly
    //#test
    //static size_t f=0;
    //char fname[50];
    //sprintf(fname, "%u.yuv420", ++f);
RtspFace/PL_H264Encoder.cpp
@@ -19,11 +19,13 @@
    AVCodecContext* pAVCodecContext;
    AVFrame* pAVFrame;//#todo delete
    AVStream* pAVStream;
    AVFormatContext* pAVFormatContext;
    
    H264Encoder_Internal() : 
        buffSize(0), buffSizeMax(sizeof(buffer)), 
        payError(true), ffmpegInited(false), frameCount(0), 
        pAVCodecContext(nullptr), pAVFrame(nullptr)
        pAVCodecContext(nullptr), pAVFrame(nullptr), pAVStream(nullptr), pAVFormatContext(nullptr)
        
    {
    }
@@ -41,6 +43,8 @@
        
        pAVCodecContext = nullptr;
        pAVFrame = nullptr;
        pAVStream = nullptr;
        pAVFormatContext = nullptr;
    }
};
@@ -88,7 +92,7 @@
    in->pAVCodecContext = avcodec_alloc_context3(avCodec);
    in->pAVCodecContext->bit_rate = 3*1024*1024*8; // 3MB
    in->pAVCodecContext->bit_rate = 1*1024*1024*8; // 3MB
    in->pAVCodecContext->width = 1920;
    in->pAVCodecContext->height = 1080;//#todo from config
    in->pAVCodecContext->time_base.num=1;
@@ -96,6 +100,9 @@
    in->pAVCodecContext->gop_size = 20;
    in->pAVCodecContext->max_b_frames = 0;
    in->pAVCodecContext->pix_fmt = AV_PIX_FMT_YUV420P;
    //av_opt_set(c->priv_data, "preset", "superfast", 0);
    //av_opt_set(c->priv_data, "tune", "zerolatency", 0);
    if(avcodec_open2(in->pAVCodecContext, avCodec, NULL) >= 0)
    {
@@ -119,17 +126,36 @@
        return false;
    }
    
    //int ret = avformat_alloc_output_context2(&(in->pAVFormatContext), NULL, "avi", "");
    //if (ret < 0 || in->pAVFormatContext == nullptr)
    //{
    //    printf("avformat_alloc_output_context2 error\n");
    //    return false;
    //}
    //
    //in->pAVStream = avformat_new_stream(in->pAVFormatContext, avCodec);
    //if (in->pAVStream == nullptr)
    //{
    //    printf("avformat_new_stream error\n");
    //    return false;
    //}
    //in->pAVStream->id = in->pAVFormatContext->nb_streams-1;
    return true;
}
void copyAVFrame(AVFrame* dest, AVFrame* src)
{
    int height = dest->height;
    int width = dest->width;
    dest->data[0] = src->data[0];
    dest->data[1] = src->data[1];
    dest->data[2] = src->data[2];
    
    memcpy(dest->data[0], src->data[0], height * width); // Y
    memcpy(dest->data[1], src->data[1], height * width / 4); // U
    memcpy(dest->data[2], src->data[2], height * width / 4); // V
    //int height = dest->height;
    //int width = dest->width;
    //
    //memcpy(dest->data[0], src->data[0], height * width); // Y
    //memcpy(dest->data[1], src->data[1], height * width / 4); // U
    //memcpy(dest->data[2], src->data[2], height * width / 4); // V
}
bool encodeH264(H264Encoder_Internal* in, AVFrame* pAVFrame, size_t buffSize)  
@@ -138,13 +164,16 @@
    in->frameCount++;
    copyAVFrame(in->pAVFrame, pAVFrame);
    in->pAVFrame->pts = in->frameCount;
    //in->pAVFrame->pts = (1.0 / 25) * 90000 * in->frameCount;
    in->pAVFrame->pts = time(nullptr);
    AVPacket pAVPacket = {0};
    av_init_packet(&pAVPacket);
    // encode the image
    int gotPacket = 0;
    int ret = avcodec_encode_video2(in->pAVCodecContext, &pAVPacket, in->pAVFrame, &gotPacket);  
    if (ret < 0)
    {
@@ -154,9 +183,9 @@
    
    if (gotPacket > 0)
    {
        printf("Succeed to encode (1) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size);
        memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size);
        in->buffSize += pAVPacket.size;
        printf("Succeed to encode (1) frame=%d, size=%d\n", in->frameCount, pAVPacket.size);
        memcpy(in->buffer, pAVPacket.data, pAVPacket.size);
        in->buffSize = pAVPacket.size;
        av_free_packet(&pAVPacket);
    }
    
@@ -172,12 +201,13 @@
    //    }  
    //    if (gotPacket > 0)
    //    {  
    //        printf("Succeed to encode (2) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size);
    //        printf("Succeed to encode (2) frame=%d, size=%d\n", in->frameCount, pAVPacket.size);
    //        memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size);
    //        in->buffSize += pAVPacket.size; 
    //        av_free_packet(&pAVPacket);  
    //    }  
    //}
    //}
    
    //#test
    if (in->buffSize > 0)
@@ -186,7 +216,7 @@
        fwrite (in->buffer , sizeof(char), in->buffSize, pFile);
        fflush(pFile);
    }
    in->payError = (in->buffSize == 0);
    return !(in->payError);
}
RtspFace/PL_Queue.cpp
New file
@@ -0,0 +1,360 @@
#include "PL_Queue.h"
#include <set>
#include <queue>
#include <string.h>
struct QBlock
{
    uint8_t* data;
    size_t maxSize;
    size_t size;
    QBlock() : data(nullptr), maxSize(0), size(0) { }
    QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
    ~QBlock()
    {
        data = nullptr;
        maxSize = 0;
        size = 0;
    }
};
struct QBlockPool
{
    uint8_t* blocksDataPool;
    typedef std::set<QBlock*> qpool_t;
    qpool_t staticPool;
    qpool_t freePool;
    QBlockPool() : blocksDataPool(nullptr)
    {
    }
    QBlockPool(size_t maxBlockCount, size_t maxBlockSize) :
        blocksDataPool(nullptr)
    {
        bp_init(maxBlockCount, maxBlockSize);
    }
    void bp_init(size_t maxBlockCount, size_t maxBlockSize)
    {
        const size_t totalBytes = maxBlockCount * maxBlockSize;
        blocksDataPool = new uint8_t[totalBytes];
        printf("QBlockPool allocate byte: %u\n", totalBytes);
        for (size_t i = 0; i < maxBlockCount; i++)
        {
            uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
            QBlock* qb = new QBlock(qbData, maxBlockSize);
            staticPool.insert(qb);
            freePool.insert(qb);
        }
    }
    ~QBlockPool()
    {
        if (freePool.size() != staticPool.size())
        {
            printf("QBlockPool memory leakage\n");
        }
        for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
        {
            delete *iter;
        }
        delete[] blocksDataPool;
        blocksDataPool = nullptr;
    }
    QBlock* bp_alloc()
    {
        if (freePool.empty())
        {
            printf("QBlockPool bp_alloc empty\n");
            return nullptr;
        }
        qpool_t::iterator iter = freePool.begin();
        QBlock* qb = *iter;
        freePool.erase(iter);
        return qb;
    }
    void bp_free(QBlock* qb)
    {
        if (qb == nullptr)
            return;
        qpool_t::iterator iter = staticPool.find(qb);
        if (iter == staticPool.end())
        {
            printf("QBlockPool bp_free not in pool\n");
            return;
        }
        //iter = freePool.find(qb);
        //if (iter != freePool.end())
        //{
        //    printf("QBlockPool bp_free block is free");
        //    return;
        //}
        qb->size = 0;
        freePool.insert(qb);
    }
    bool bp_empty() const
    {
        return freePool.empty();
    }
};
struct PL_Queue_Internal
{
    PL_Queue_Config config;
    QBlockPool pool;
    typedef std::queue<QBlock*> queue_t;
    queue_t que;
    pthread_mutex_t* queue_pool_mutex;
    pthread_mutex_t* sync_full_mutex;
    pthread_mutex_t* sync_empty_mutex;
    PL_Queue_Internal() :
        config(), pool(), que(),
        queue_pool_mutex(new pthread_mutex_t),
        sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
    {
        pthread_mutex_init(queue_pool_mutex, NULL);
        pthread_mutex_init(sync_full_mutex, NULL);
        pthread_mutex_init(sync_empty_mutex, NULL);
    }
    ~PL_Queue_Internal()
    {
        if (queue_pool_mutex != nullptr)
        {
            pthread_mutex_destroy(queue_pool_mutex);
            delete queue_pool_mutex;
            queue_pool_mutex = nullptr;
        }
        if (sync_full_mutex != nullptr)
        {
            pthread_mutex_destroy(sync_full_mutex);
            delete sync_full_mutex;
            sync_full_mutex = nullptr;
        }
        if (sync_empty_mutex != nullptr)
        {
            pthread_mutex_destroy(sync_empty_mutex);
            delete sync_empty_mutex;
            sync_empty_mutex = nullptr;
        }
    }
    void reset()
    {
        if (queue_pool_mutex != nullptr)
        {
            pthread_mutex_destroy(queue_pool_mutex);
            delete queue_pool_mutex;
            queue_pool_mutex = nullptr;
        }
        queue_pool_mutex = new pthread_mutex_t;
        pthread_mutex_init(queue_pool_mutex, NULL);
        if (sync_full_mutex != nullptr)
        {
            pthread_mutex_destroy(sync_full_mutex);
            delete sync_full_mutex;
            sync_full_mutex = nullptr;
        }
        sync_full_mutex = new pthread_mutex_t;
        pthread_mutex_init(sync_full_mutex, NULL);
        if (sync_empty_mutex != nullptr)
        {
            pthread_mutex_destroy(sync_empty_mutex);
            delete sync_empty_mutex;
            sync_empty_mutex = nullptr;
        }
        sync_empty_mutex = new pthread_mutex_t;
        pthread_mutex_init(sync_empty_mutex, NULL);
        //#todo free que
    }
};
PipeLineElem* create_PL_Queue()
{
    return new PL_Queue;
}
PL_Queue::PL_Queue() : internal(new PL_Queue_Internal)
{
}
PL_Queue::~PL_Queue()
{
    delete (PL_Queue_Internal*)internal;
    internal= nullptr;
}
bool PL_Queue::init(void* args)
{
    PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
    PL_Queue_Config* config = (PL_Queue_Config*)args;
    in->reset();
    in->config = *config;
    in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
    if (config->syncQueueFull)
    {
        int ret = pthread_mutex_lock(in->sync_full_mutex);
        if(ret != 0)
        {
            printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
            return false;
        }
    }
    if (config->syncQueueEmpty)
    {
        int ret = pthread_mutex_lock(in->sync_empty_mutex);
        if(ret != 0)
        {
            printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
            return false;
        }
    }
    return true;
}
void PL_Queue::finit()
{
    PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete
}
bool PL_Queue::pay(const PipeMaterial& pm)
{
    PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
    QBlock* qb = nullptr;
    if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
    {
        // there is no available qb
        if (in->config.queueFullDropBlock)
        {
            printf("PL_Queue::pay queueFullDropBlock\n");
            pthread_mutex_lock(in->queue_pool_mutex);
            qb = in->que.front();
            in->que.pop();
            in->pool.bp_free(qb);
            qb = nullptr;
            pthread_mutex_unlock(in->queue_pool_mutex);
        }
        else
        {
            if (in->config.syncQueueFull)
            {
                printf("PL_Queue::pay sync by sync_full_mutex\n");
                pthread_mutex_lock(in->sync_full_mutex);
            }
        }
    }
    if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
    {
        printf("PL_Queue::pay full\n");
        return false;
    }
    pthread_mutex_lock(in->queue_pool_mutex);
    qb = in->pool.bp_alloc();
    if (qb != nullptr)
        in->que.push(qb);
    pthread_mutex_unlock(in->queue_pool_mutex);
    if (qb == nullptr)
    {
        printf("PL_Queue::pay bp_alloc nullptr\n");
        return false;
    }
    if (in->config.copyData)
    {
        memcpy(qb->data, pm.buffer, pm.buffSize);
        qb->size = pm.buffSize;
    }
    if (in->config.syncQueueEmpty)
        pthread_mutex_unlock(in->sync_empty_mutex);
    return true;
}
void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
{
    if (pm->former == nullptr || pm->args == nullptr)
        return;
    PL_Queue* _this = (PL_Queue*)pm->former;
    QBlock* qb = (QBlock*)pm->args;
    PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
    pthread_mutex_lock(in->queue_pool_mutex);
    in->pool.bp_free(qb);
    pthread_mutex_unlock(in->queue_pool_mutex);
}
bool PL_Queue::gain(PipeMaterial& pm)
{
    PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
    if (in->que.empty())
    {
        if (in->config.syncQueueEmpty)
            pthread_mutex_lock(in->sync_empty_mutex);
    }
    if (in->que.empty())
    {
        printf("PL_Queue::gain empty\n");
        pm.buffer = nullptr;
        pm.buffSize = 0;
        pm.former = this;
        return false;
    }
    QBlock* qb = nullptr;
    pthread_mutex_lock(in->queue_pool_mutex);
    qb = in->que.front();
    in->que.pop();
    pthread_mutex_unlock(in->queue_pool_mutex);
    if (in->config.syncQueueFull)
        pthread_mutex_unlock(in->sync_full_mutex);
    pm.type = PMT_BYTES;
    pm.buffer = qb->data;
    pm.buffSize = qb->size;
    pm.former = this;
    pm.deleter = pm_deleter_qb;
    pm.args = qb;
    return true;
}
RtspFace/PL_Queue.h
New file
@@ -0,0 +1,44 @@
#ifndef _PL_QUEUE_H_
#define _PL_QUEUE_H_
#include "PipeLine.h"
struct PL_Queue_Config
{
    size_t maxBlockCount;
    size_t maxBlockSize;
    bool cacheEmptyBlock;//#todo not implement
    bool syncQueueFull;
    bool syncQueueEmpty;
    bool queueFullDropBlock;
    bool copyData; //#todo not implement (copy ptr)
    PL_Queue_Config() :
        maxBlockCount(100), maxBlockSize(300000), cacheEmptyBlock(false),
        syncQueueFull(true), syncQueueEmpty(true), queueFullDropBlock(false), copyData(true)
    {
    }
};
class PL_Queue : public PipeLineElem
{
public:
    PL_Queue();
    virtual ~PL_Queue();
    virtual bool init(void* args);
    virtual void finit();
    virtual bool pay(const PipeMaterial& pm);
    virtual bool gain(PipeMaterial& pm);
private:
    static void pm_deleter_qb(PipeMaterial* pm);
    void* internal;
};
PipeLineElem* create_PL_Queue();
#endif
RtspFace/PL_RTSPServer.cpp
@@ -12,8 +12,11 @@
struct RTSPServer_Internal
{
    uint8_t* buffer;
    uint8_t buffer[1920*1080*3];
    size_t buffSize;
    size_t buffSizeMax;
    RTSPServerConfig config;
    bool payError;
    pthread_t live_daemon_thid;
@@ -24,7 +27,7 @@
    MyEncoderStub * encoderStub;
    RTSPServer_Internal() : 
        buffer(nullptr), buffSize(0),
        buffSize(0), buffSizeMax(sizeof(buffer)), config(),
        payError(true), live_daemon_thid(0), frame_mutex(new pthread_mutex_t), live_daemon_running(false), 
        server(nullptr), encoderStub(nullptr)
    {
@@ -43,8 +46,10 @@
    
    void reset()
    {
        buffer = nullptr;
        buffSize = 0;
        RTSPServerConfig _config;
        config =_config;
        payError = true;
@@ -84,32 +89,43 @@
    
    virtual char GetFrame(u_int8_t** FrameBuffer, unsigned int *FrameSize)
    {
        if (in.buffer != nullptr && in.buffSize > 0)
        {
            *FrameBuffer = in.buffer;
            *FrameSize = in.buffSize;
            printf("send frame size=%u\n", in.buffSize);
            in.buffer = nullptr;
            in.buffSize = 0;
            return 1;
        }
        else
        if (in.buffer == nullptr || in.buffSize <= 0)
        {
            ReleaseFrame();
            return 0;
        }
        uint8_t* pBuffer = in.buffer;
        size_t newBufferSize = in.buffSize;
        if (in.config.payWithAux)
        {
            if (newBufferSize <= 4)
            {
                ReleaseFrame();
                return 0;
            }
            pBuffer += 4;
            newBufferSize -= 4;
        }
        *FrameBuffer = pBuffer;
        *FrameSize = newBufferSize;
        printf("send frame size=%u\n", in.buffSize);
    }
    
    virtual char ReleaseFrame()
    {
        int ret = pthread_mutex_unlock(in.frame_mutex);
        if(ret != 0)
        in.buffSize = 0;
        if (in.config.syncDeliverFrame)
        {
            printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
            return 0;
            int ret = pthread_mutex_unlock(in.frame_mutex);
            if(ret != 0)
            {
                printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
                return 0;
            }
        }
        
        return 1;
@@ -120,11 +136,14 @@
        // write frame buffer of RTSPServer_Internal::buffer
        onFrame();
        int ret = pthread_mutex_lock(in.frame_mutex);
        if(ret != 0)
        if (in.config.syncDeliverFrame)
        {
            printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret));
            return;
            int ret = pthread_mutex_lock(in.frame_mutex);
            if(ret != 0)
            {
                printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret));
                return;
            }
        }
    }
@@ -167,6 +186,12 @@
    RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
    in->reset();
    if (args)
    {
        RTSPServerConfig* config = (RTSPServerConfig*)args;
        in->config = *config;
    }
    int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
    if(ret != 0)
    {
@@ -191,7 +216,10 @@
    if (pm.buffer == nullptr || pm.buffSize <= 0)
        return false;
    
    in->buffer = pm.buffer;
    if (in->buffSize > 0)
        printf("PL_RTSPServer::pay may lost data size=%u\n", in->buffSize);
    memcpy(in->buffer, pm.buffer, pm.buffSize);
    in->buffSize = pm.buffSize;
    
    if (in->encoderStub == nullptr)
RtspFace/PL_RTSPServer.h
@@ -3,6 +3,18 @@
#include "PipeLine.h"
struct RTSPServerConfig
{
    bool syncDeliverFrame;
    bool payWithAux;
    bool sendWithAux;
    RTSPServerConfig() :
        syncDeliverFrame(true), payWithAux(true), sendWithAux(false)
    {
    }
};
class PL_RTSPServer : public PipeLineElem
{
public:
RtspFace/PipeLine.cpp
@@ -1,7 +1,18 @@
#include "PipeLine.h"
PipeMaterial::PipeMaterial() : buffer(nullptr), buffSize(0), former(nullptr)
PipeMaterial::PipeMaterial() :
    type(PMT__FIRST), buffer(nullptr), buffSize(0),
    former(nullptr), deleter(nullptr), args(nullptr)
{
}
void PipeMaterial::exec_deleter()
{
    if (deleter != nullptr)
    {
        deleter(this);
        deleter = nullptr;
    }
}
PipeLine::PipeLine() : global_params_map(), elem_create_func_map(), elems()
@@ -81,12 +92,16 @@
    if (elems.size() == 1)
    {
        elem_begin->gain(*pm);
        pm->exec_deleter();
        return elem_begin;
    }
    else if (elems.size() == 2)
    {
        if (elem_begin->gain(*pm))
        {
            elem_last->pay(*pm);
            pm->exec_deleter();
        }
        else
            return elem_begin;
        return elem_last;
@@ -103,16 +118,22 @@
        while (elem_begin != elem_last)
        {
            if (lastRet && (lastRet = elem_begin->pay(*pm)) )
            {
                pm->exec_deleter();
                lastRet = elem_begin->gain(*pm);
            }
            else
                return elem_begin;
                return elem_begin;//#todo this may memory leakage in pm
            
            ++iter;
            elem_begin = *iter;
        }
    
        if (lastRet)
        {
            elem_last->pay(*pm);
            pm->exec_deleter();
        }
        return elem_last;
    }
    
RtspFace/PipeLine.h
@@ -12,13 +12,32 @@
class PipeLineElem;
class PipeLine;
enum PipeMaterialBufferType
{
    PMT__FIRST,
    PMT_BYTES,
    PMT_TEXT,
    PMT_IMAGE,
    PMT_PM_LIST,
    PMT_PTR_AVFRAME,
    PMT__LAST
};
struct PipeMaterial;
typedef void (* pm_deleter_func)(PipeMaterial* pm);
struct PipeMaterial
{
    PipeMaterialBufferType type;
    uint8_t* buffer;
    size_t buffSize;
    PipeLineElem* former;
    pm_deleter_func deleter;
    void* args;
    
    PipeMaterial();
    void exec_deleter();
};
class PipeLineElem
@@ -41,9 +60,10 @@
typedef PipeLineElem* (*elem_create_func_t)();
// 0 (there is no elem). do nothing
// 1 (there is one elem). gain
// 2 (there is two elems). gain --> pay
// 3 (there is more than two elems). gain --> pay gain --> pay gain --> ... --> pay
// 1 (there is one elem). gain --> pm.deleter
// 2 (there is two elems). gain --> pay --> pm.deleter
// 3 (there is more than two elems).
//    gain --> [pay --> pm.deleter --> gain -->] [pay --> pm.deleter --> gain -->] ... --> pay --> pm.deleter
class PipeLine
{
public:
RtspFace/main.cpp
@@ -5,6 +5,7 @@
#include "PL_H264Encoder.h"
#include "PL_AVFrameYUV420.h"
#include "PL_AVFrameBGRA.h"
#include "PL_Queue.h"
#include <iostream>
using namespace std;
@@ -18,33 +19,61 @@
    pipeLine.register_elem_creator("PL_H264Decoder", create_PL_H264Decoder);
    pipeLine.register_elem_creator("PL_AVFrameYUV420", create_PL_AVFrameYUV420);
    pipeLine.register_elem_creator("PL_H264Encoder", create_PL_H264Encoder);
    pipeLine.register_elem_creator("PL_Queue", create_PL_Queue);
    
    PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
    RTSPConfig rtspConfig;
    rtspConfig.progName = argv[0];
    rtspConfig.rtspURL = argv[1];
    rtspConfig.aux = false; // ffmpeg need aux
    rtspConfig.verbosityLevel = 1;
    rtspConfig.tunnelOverHTTPPortNum = 0;
    rtspConfig.args = nullptr;
    bool ret = rtspClient->init(&rtspConfig);
    if (!ret)
    {
        cout << "rtspClient.init error" << endl;
        exit(EXIT_FAILURE);
        PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
        RTSPConfig rtspConfig;
        rtspConfig.progName = argv[0];
        rtspConfig.rtspURL = argv[1];
        rtspConfig.aux = true; // ffmpeg need aux, but live555 not
        rtspConfig.verbosityLevel = 1;
        rtspConfig.tunnelOverHTTPPortNum = 0;
        rtspConfig.args = nullptr;
        bool ret = rtspClient->init(&rtspConfig);
        if (!ret)
        {
            cout << "rtspClient.init error" << endl;
            exit(EXIT_FAILURE);
        }
    }
    
    //PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
    //h264Decoder->init(nullptr);
    //{
    //    PL_Queue_Config config;
    //    PL_Queue* queue1 = (PL_Queue*)pipeLine.push_elem("PL_Queue");
    //    bool ret = queue1->init(&config);
    //    if (!ret)
    //    {
    //        cout << "queue1.init error" << endl;
    //        exit(EXIT_FAILURE);
    //    }
    //}
    
    //PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
    //avFrameYUV420->init(nullptr);
    {
        PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
        h264Decoder->init(nullptr);
    }
    //{
    //    PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
    //    avFrameYUV420->init(nullptr);
    //}
    {
        PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder");
        h264Encoder->init(nullptr);
    }
    
    //PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder");
    //h264Encoder->init(nullptr);
    PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer");
    rtspServer->init(nullptr);
    //{
    //    RTSPServerConfig config;
    //    PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer");
    //    bool ret = rtspServer->init(&config);
    //    if (!ret)
    //    {
    //        cout << "rtspServer.init error" << endl;
    //        exit(EXIT_FAILURE);
    //    }
    //}
    
    while(true)
    {
RtspFace/make.sh
@@ -36,15 +36,16 @@
g++ -g -c -std=c++11 PL_H264Encoder.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PL_AVFrameYUV420.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PL_AVFrameBGRA.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PL_Queue.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PipeLine.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/FFmpegH264Source.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS
g++ -g -std=c++11 \
  main.o PipeLine.o \
  PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o \
  PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o \
  $FFMPEGRTSPSERVER_OBJ PL_RTSPServer.o \
  $LDFLAGS -o rtsp_face