From 4ef430e946e717d72e923c4708a9120f94d55dbd Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期三, 28 十二月 2016 09:35:14 +0800 Subject: [PATCH] test h264 encoder --- RtspFace/make.sh | 5 RtspFace/PL_RTSPServer.cpp | 78 +++- RtspFace/PipeLine.h | 26 + RtspFace/PL_RTSPServer.h | 12 RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp | 114 ++++++- RtspFace/PL_Queue.h | 44 +++ RtspFace/main.cpp | 71 +++- RtspFace/PL_H264Encoder.cpp | 62 +++- RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h | 9 RtspFace/PL_Queue.cpp | 360 +++++++++++++++++++++++++ RtspFace/PL_AVFrameYUV420.cpp | 1 RtspFace/PipeLine.cpp | 25 + 12 files changed, 715 insertions(+), 92 deletions(-) diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp index 0b38140..9ea1bba 100644 --- a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp +++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp @@ -16,6 +16,13 @@ pthread_mutex_init(&outqueue_mutex,NULL); } + + FFmpegH264Encoder::~FFmpegH264Encoder() + { + pthread_mutex_init(&inqueue_mutex,NULL); + pthread_mutex_init(&outqueue_mutex,NULL); + + } void FFmpegH264Encoder::setCallbackFunctionFrameIsReady(std::function<void()> func) { @@ -45,13 +52,13 @@ pthread_mutex_unlock(&inqueue_mutex); if(frame != NULL) { - WriteFrame(frame); + WriteFrameRGB(frame); } } } } - void FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id) + bool FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id) { int ret; m_sws_flags = SWS_BICUBIC; @@ -60,14 +67,17 @@ avcodec_register_all(); av_register_all(); - avformat_alloc_output_context2(&m_oc, NULL, NULL, filename); + if (strlen(filename) == 0) + avformat_alloc_output_context2(&m_oc, NULL, "h264", filename); + else + avformat_alloc_output_context2(&m_oc, NULL, NULL, filename); if (!m_oc) { avformat_alloc_output_context2(&m_oc, NULL, "avi", filename); } if (!m_oc) { - return; + return false; } m_fmt = m_oc->oformat; @@ -79,13 +89,13 @@ m_video_codec = avcodec_find_encoder(m_fmt->video_codec); if (!(m_video_codec)) { - return; + return false; } st = avformat_new_stream(m_oc, m_video_codec); if (!st) { - return; + return false; } st->id = m_oc->nb_streams-1; @@ -112,7 +122,7 @@ ret = avcodec_open2(c, m_video_codec, NULL); if (ret < 0) { - return; + return false; } //ret = avpicture_alloc(&m_dst_picture, c->pix_fmt, c->width, c->height); @@ -126,7 +136,7 @@ ret = av_image_alloc(m_dst_picture->data, m_dst_picture->linesize, c->width, c->height, (AVPixelFormat)m_dst_picture->format, 32); if (ret < 0) { - return; + return false; } //ret = avpicture_alloc(&m_src_picture, AV_PIX_FMT_BGR24, c->width, c->height); @@ -135,7 +145,7 @@ ret = av_image_alloc(m_src_picture->data, m_src_picture->linesize, c->width, c->height, AV_PIX_FMT_BGR24, 24); if (ret < 0) { - return; + return false; } bufferSize = ret; @@ -145,27 +155,28 @@ if (!(m_fmt->flags & AVFMT_NOFILE)) { ret = avio_open(&m_oc->pb, filename, AVIO_FLAG_WRITE); if (ret < 0) { - return; + return false; } } ret = avformat_write_header(m_oc, NULL); if (ret < 0) { - return; + return false; } sws_ctx = sws_getContext(c->width, c->height, AV_PIX_FMT_BGR24, c->width, c->height, AV_PIX_FMT_YUV420P, SWS_BICUBIC, NULL, NULL, NULL); if (!sws_ctx) { - return; + return false; } + + return true; } - void FFmpegH264Encoder::WriteFrame(uint8_t * RGBFrame ) - { - + bool FFmpegH264Encoder::WriteFrameRGB(uint8_t * RGBFrame ) + { memcpy(m_src_picture->data[0], RGBFrame, bufferSize); sws_scale(sws_ctx, @@ -182,7 +193,7 @@ ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet); if (ret < 0) { - return; + return false; } if (!ret && got_packet && pkt.size) @@ -215,10 +226,75 @@ m_frame_count++; m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base); - onFrame(); + if (onFrame != nullptr) + onFrame(); + + return true; + } + + void copyAVFrame(AVFrame* dest, AVFrame* src) + { + int height = dest->height; + int width = dest->width; + + memcpy(dest->data[0], src->data[0], height * width); // Y + memcpy(dest->data[1], src->data[1], height * width / 4); // U + memcpy(dest->data[2], src->data[2], height * width / 4); // V + } + + bool FFmpegH264Encoder::WriteFrameYUV420(AVFrame * YUVFrame) + { + copyAVFrame(m_dst_picture, YUVFrame); + + AVPacket pkt = { 0 }; + int got_packet; + av_init_packet(&pkt); + + int ret = 0; + + ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet); + + if (ret < 0) { + return false; + } + + if (!ret && got_packet && pkt.size) + { + pkt.stream_index = m_video_st->index; + FrameStructure * frame = new FrameStructure(); + frame->dataPointer = new uint8_t[pkt.size]; + frame->dataSize = pkt.size-4; + frame->frameID = m_frame_count; + + memcpy(frame->dataPointer,pkt.data+4,pkt.size-4); + + pthread_mutex_lock(&outqueue_mutex); + + if(outqueue.size()<30) + { + outqueue.push(frame); + } + else + { + delete frame; + } + + pthread_mutex_unlock(&outqueue_mutex); + + } + + av_free_packet(&pkt); + + m_frame_count++; + m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base); + + if (onFrame != nullptr) + onFrame(); + + return true; } - void FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond) + bool FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond) { m_filename = filename; m_AVIMOV_WIDTH=Width; //Movie width @@ -227,7 +303,7 @@ m_AVIMOV_GOB=GOB; //I frames per no of P frames, see note below! m_AVIMOV_BPS=BitPerSecond; //Bits per second, if this is too low then movie will become garbled - SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264); + return SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264); } void FFmpegH264Encoder::CloseCodec() diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h index 31e345d..d07e113 100644 --- a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h +++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h @@ -49,18 +49,19 @@ { public: FFmpegH264Encoder(); - ~FFmpegH264Encoder(); + virtual ~FFmpegH264Encoder(); virtual void setCallbackFunctionFrameIsReady(std::function<void()> func); - void SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond); + bool SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond); void CloseVideo(); - void SetupCodec(const char *filename, int codec_id); + bool SetupCodec(const char *filename, int codec_id); void CloseCodec(); void SendNewFrame(uint8_t * RGBFrame); - void WriteFrame(uint8_t * RGBFrame); + bool WriteFrameRGB(uint8_t * RGBFrame); + bool WriteFrameYUV420(AVFrame * YUVFrame); virtual char ReleaseFrame(); void run(); diff --git a/RtspFace/PL_AVFrameYUV420.cpp b/RtspFace/PL_AVFrameYUV420.cpp index 1384218..fc85e4c 100644 --- a/RtspFace/PL_AVFrameYUV420.cpp +++ b/RtspFace/PL_AVFrameYUV420.cpp @@ -104,6 +104,7 @@ //in->buffer readly + //#test //static size_t f=0; //char fname[50]; //sprintf(fname, "%u.yuv420", ++f); diff --git a/RtspFace/PL_H264Encoder.cpp b/RtspFace/PL_H264Encoder.cpp index 9fc0a0b..e2fcdf2 100644 --- a/RtspFace/PL_H264Encoder.cpp +++ b/RtspFace/PL_H264Encoder.cpp @@ -19,11 +19,13 @@ AVCodecContext* pAVCodecContext; AVFrame* pAVFrame;//#todo delete + AVStream* pAVStream; + AVFormatContext* pAVFormatContext; H264Encoder_Internal() : buffSize(0), buffSizeMax(sizeof(buffer)), payError(true), ffmpegInited(false), frameCount(0), - pAVCodecContext(nullptr), pAVFrame(nullptr) + pAVCodecContext(nullptr), pAVFrame(nullptr), pAVStream(nullptr), pAVFormatContext(nullptr) { } @@ -41,6 +43,8 @@ pAVCodecContext = nullptr; pAVFrame = nullptr; + pAVStream = nullptr; + pAVFormatContext = nullptr; } }; @@ -88,7 +92,7 @@ in->pAVCodecContext = avcodec_alloc_context3(avCodec); - in->pAVCodecContext->bit_rate = 3*1024*1024*8; // 3MB + in->pAVCodecContext->bit_rate = 1*1024*1024*8; // 3MB in->pAVCodecContext->width = 1920; in->pAVCodecContext->height = 1080;//#todo from config in->pAVCodecContext->time_base.num=1; @@ -96,6 +100,9 @@ in->pAVCodecContext->gop_size = 20; in->pAVCodecContext->max_b_frames = 0; in->pAVCodecContext->pix_fmt = AV_PIX_FMT_YUV420P; + + //av_opt_set(c->priv_data, "preset", "superfast", 0); + //av_opt_set(c->priv_data, "tune", "zerolatency", 0); if(avcodec_open2(in->pAVCodecContext, avCodec, NULL) >= 0) { @@ -119,17 +126,36 @@ return false; } + //int ret = avformat_alloc_output_context2(&(in->pAVFormatContext), NULL, "avi", ""); + //if (ret < 0 || in->pAVFormatContext == nullptr) + //{ + // printf("avformat_alloc_output_context2 error\n"); + // return false; + //} + // + //in->pAVStream = avformat_new_stream(in->pAVFormatContext, avCodec); + //if (in->pAVStream == nullptr) + //{ + // printf("avformat_new_stream error\n"); + // return false; + //} + //in->pAVStream->id = in->pAVFormatContext->nb_streams-1; + return true; } void copyAVFrame(AVFrame* dest, AVFrame* src) { - int height = dest->height; - int width = dest->width; + dest->data[0] = src->data[0]; + dest->data[1] = src->data[1]; + dest->data[2] = src->data[2]; - memcpy(dest->data[0], src->data[0], height * width); // Y - memcpy(dest->data[1], src->data[1], height * width / 4); // U - memcpy(dest->data[2], src->data[2], height * width / 4); // V + //int height = dest->height; + //int width = dest->width; + // + //memcpy(dest->data[0], src->data[0], height * width); // Y + //memcpy(dest->data[1], src->data[1], height * width / 4); // U + //memcpy(dest->data[2], src->data[2], height * width / 4); // V } bool encodeH264(H264Encoder_Internal* in, AVFrame* pAVFrame, size_t buffSize) @@ -138,13 +164,16 @@ in->frameCount++; copyAVFrame(in->pAVFrame, pAVFrame); - in->pAVFrame->pts = in->frameCount; - + + //in->pAVFrame->pts = (1.0 / 25) * 90000 * in->frameCount; + in->pAVFrame->pts = time(nullptr); + AVPacket pAVPacket = {0}; av_init_packet(&pAVPacket); - + // encode the image int gotPacket = 0; + int ret = avcodec_encode_video2(in->pAVCodecContext, &pAVPacket, in->pAVFrame, &gotPacket); if (ret < 0) { @@ -154,9 +183,9 @@ if (gotPacket > 0) { - printf("Succeed to encode (1) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size); - memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size); - in->buffSize += pAVPacket.size; + printf("Succeed to encode (1) frame=%d, size=%d\n", in->frameCount, pAVPacket.size); + memcpy(in->buffer, pAVPacket.data, pAVPacket.size); + in->buffSize = pAVPacket.size; av_free_packet(&pAVPacket); } @@ -172,12 +201,13 @@ // } // if (gotPacket > 0) // { - // printf("Succeed to encode (2) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size); + // printf("Succeed to encode (2) frame=%d, size=%d\n", in->frameCount, pAVPacket.size); // memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size); // in->buffSize += pAVPacket.size; // av_free_packet(&pAVPacket); // } - //} + //} + //#test if (in->buffSize > 0) @@ -186,7 +216,7 @@ fwrite (in->buffer , sizeof(char), in->buffSize, pFile); fflush(pFile); } - + in->payError = (in->buffSize == 0); return !(in->payError); } diff --git a/RtspFace/PL_Queue.cpp b/RtspFace/PL_Queue.cpp new file mode 100644 index 0000000..4641eec --- /dev/null +++ b/RtspFace/PL_Queue.cpp @@ -0,0 +1,360 @@ +#include "PL_Queue.h" +#include <set> +#include <queue> +#include <string.h> + +struct QBlock +{ + uint8_t* data; + size_t maxSize; + size_t size; + + QBlock() : data(nullptr), maxSize(0), size(0) { } + + QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { } + + ~QBlock() + { + data = nullptr; + maxSize = 0; + size = 0; + } +}; + +struct QBlockPool +{ + uint8_t* blocksDataPool; + + typedef std::set<QBlock*> qpool_t; + qpool_t staticPool; + qpool_t freePool; + + QBlockPool() : blocksDataPool(nullptr) + { + } + + QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : + blocksDataPool(nullptr) + { + bp_init(maxBlockCount, maxBlockSize); + } + + void bp_init(size_t maxBlockCount, size_t maxBlockSize) + { + const size_t totalBytes = maxBlockCount * maxBlockSize; + blocksDataPool = new uint8_t[totalBytes]; + printf("QBlockPool allocate byte: %u\n", totalBytes); + + for (size_t i = 0; i < maxBlockCount; i++) + { + uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize]; + QBlock* qb = new QBlock(qbData, maxBlockSize); + staticPool.insert(qb); + freePool.insert(qb); + } + } + + ~QBlockPool() + { + if (freePool.size() != staticPool.size()) + { + printf("QBlockPool memory leakage\n"); + } + + for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter) + { + delete *iter; + } + + delete[] blocksDataPool; + blocksDataPool = nullptr; + } + + QBlock* bp_alloc() + { + if (freePool.empty()) + { + printf("QBlockPool bp_alloc empty\n"); + return nullptr; + } + + qpool_t::iterator iter = freePool.begin(); + QBlock* qb = *iter; + freePool.erase(iter); + return qb; + } + + void bp_free(QBlock* qb) + { + if (qb == nullptr) + return; + + qpool_t::iterator iter = staticPool.find(qb); + if (iter == staticPool.end()) + { + printf("QBlockPool bp_free not in pool\n"); + return; + } + + //iter = freePool.find(qb); + //if (iter != freePool.end()) + //{ + // printf("QBlockPool bp_free block is free"); + // return; + //} + + qb->size = 0; + freePool.insert(qb); + } + + bool bp_empty() const + { + return freePool.empty(); + } +}; + +struct PL_Queue_Internal +{ + PL_Queue_Config config; + QBlockPool pool; + + typedef std::queue<QBlock*> queue_t; + queue_t que; + + pthread_mutex_t* queue_pool_mutex; + pthread_mutex_t* sync_full_mutex; + pthread_mutex_t* sync_empty_mutex; + + PL_Queue_Internal() : + config(), pool(), que(), + queue_pool_mutex(new pthread_mutex_t), + sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t) + { + pthread_mutex_init(queue_pool_mutex, NULL); + pthread_mutex_init(sync_full_mutex, NULL); + pthread_mutex_init(sync_empty_mutex, NULL); + } + + ~PL_Queue_Internal() + { + if (queue_pool_mutex != nullptr) + { + pthread_mutex_destroy(queue_pool_mutex); + delete queue_pool_mutex; + queue_pool_mutex = nullptr; + } + + if (sync_full_mutex != nullptr) + { + pthread_mutex_destroy(sync_full_mutex); + delete sync_full_mutex; + sync_full_mutex = nullptr; + } + + if (sync_empty_mutex != nullptr) + { + pthread_mutex_destroy(sync_empty_mutex); + delete sync_empty_mutex; + sync_empty_mutex = nullptr; + } + } + + void reset() + { + if (queue_pool_mutex != nullptr) + { + pthread_mutex_destroy(queue_pool_mutex); + delete queue_pool_mutex; + queue_pool_mutex = nullptr; + } + + queue_pool_mutex = new pthread_mutex_t; + pthread_mutex_init(queue_pool_mutex, NULL); + + if (sync_full_mutex != nullptr) + { + pthread_mutex_destroy(sync_full_mutex); + delete sync_full_mutex; + sync_full_mutex = nullptr; + } + + sync_full_mutex = new pthread_mutex_t; + pthread_mutex_init(sync_full_mutex, NULL); + + if (sync_empty_mutex != nullptr) + { + pthread_mutex_destroy(sync_empty_mutex); + delete sync_empty_mutex; + sync_empty_mutex = nullptr; + } + + sync_empty_mutex = new pthread_mutex_t; + pthread_mutex_init(sync_empty_mutex, NULL); + + //#todo free que + } +}; + +PipeLineElem* create_PL_Queue() +{ + return new PL_Queue; +} + +PL_Queue::PL_Queue() : internal(new PL_Queue_Internal) +{ +} + +PL_Queue::~PL_Queue() +{ + delete (PL_Queue_Internal*)internal; + internal= nullptr; +} + +bool PL_Queue::init(void* args) +{ + PL_Queue_Internal* in = (PL_Queue_Internal*)internal; + PL_Queue_Config* config = (PL_Queue_Config*)args; + + in->reset(); + in->config = *config; + in->pool.bp_init(config->maxBlockCount, config->maxBlockSize); + + if (config->syncQueueFull) + { + int ret = pthread_mutex_lock(in->sync_full_mutex); + if(ret != 0) + { + printf("pthread_mutex_lock sync_full_mutex: %d\n", ret); + return false; + } + } + + if (config->syncQueueEmpty) + { + int ret = pthread_mutex_lock(in->sync_empty_mutex); + if(ret != 0) + { + printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret); + return false; + } + } + + return true; +} + +void PL_Queue::finit() +{ + PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete +} + +bool PL_Queue::pay(const PipeMaterial& pm) +{ + PL_Queue_Internal* in = (PL_Queue_Internal*)internal; + + QBlock* qb = nullptr; + if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty()) + { + // there is no available qb + + if (in->config.queueFullDropBlock) + { + printf("PL_Queue::pay queueFullDropBlock\n"); + + pthread_mutex_lock(in->queue_pool_mutex); + qb = in->que.front(); + in->que.pop(); + + in->pool.bp_free(qb); + qb = nullptr; + pthread_mutex_unlock(in->queue_pool_mutex); + } + else + { + if (in->config.syncQueueFull) + { + printf("PL_Queue::pay sync by sync_full_mutex\n"); + pthread_mutex_lock(in->sync_full_mutex); + } + } + } + + if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty()) + { + printf("PL_Queue::pay full\n"); + return false; + } + + pthread_mutex_lock(in->queue_pool_mutex); + qb = in->pool.bp_alloc(); + if (qb != nullptr) + in->que.push(qb); + pthread_mutex_unlock(in->queue_pool_mutex); + + if (qb == nullptr) + { + printf("PL_Queue::pay bp_alloc nullptr\n"); + return false; + } + + if (in->config.copyData) + { + memcpy(qb->data, pm.buffer, pm.buffSize); + qb->size = pm.buffSize; + } + + if (in->config.syncQueueEmpty) + pthread_mutex_unlock(in->sync_empty_mutex); + + return true; +} + +void PL_Queue::pm_deleter_qb(PipeMaterial* pm) +{ + if (pm->former == nullptr || pm->args == nullptr) + return; + + PL_Queue* _this = (PL_Queue*)pm->former; + QBlock* qb = (QBlock*)pm->args; + PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal; + + pthread_mutex_lock(in->queue_pool_mutex); + in->pool.bp_free(qb); + pthread_mutex_unlock(in->queue_pool_mutex); +} + +bool PL_Queue::gain(PipeMaterial& pm) +{ + PL_Queue_Internal* in = (PL_Queue_Internal*)internal; + + if (in->que.empty()) + { + if (in->config.syncQueueEmpty) + pthread_mutex_lock(in->sync_empty_mutex); + } + + if (in->que.empty()) + { + printf("PL_Queue::gain empty\n"); + pm.buffer = nullptr; + pm.buffSize = 0; + pm.former = this; + return false; + } + + QBlock* qb = nullptr; + pthread_mutex_lock(in->queue_pool_mutex); + qb = in->que.front(); + in->que.pop(); + pthread_mutex_unlock(in->queue_pool_mutex); + + if (in->config.syncQueueFull) + pthread_mutex_unlock(in->sync_full_mutex); + + pm.type = PMT_BYTES; + pm.buffer = qb->data; + pm.buffSize = qb->size; + pm.former = this; + pm.deleter = pm_deleter_qb; + pm.args = qb; + return true; +} diff --git a/RtspFace/PL_Queue.h b/RtspFace/PL_Queue.h new file mode 100644 index 0000000..72f01e2 --- /dev/null +++ b/RtspFace/PL_Queue.h @@ -0,0 +1,44 @@ +#ifndef _PL_QUEUE_H_ +#define _PL_QUEUE_H_ + +#include "PipeLine.h" + +struct PL_Queue_Config +{ + size_t maxBlockCount; + size_t maxBlockSize; + bool cacheEmptyBlock;//#todo not implement + + bool syncQueueFull; + bool syncQueueEmpty; + bool queueFullDropBlock; + bool copyData; //#todo not implement (copy ptr) + + PL_Queue_Config() : + maxBlockCount(100), maxBlockSize(300000), cacheEmptyBlock(false), + syncQueueFull(true), syncQueueEmpty(true), queueFullDropBlock(false), copyData(true) + { + } +}; + +class PL_Queue : public PipeLineElem +{ +public: + PL_Queue(); + virtual ~PL_Queue(); + + virtual bool init(void* args); + virtual void finit(); + + virtual bool pay(const PipeMaterial& pm); + virtual bool gain(PipeMaterial& pm); + +private: + static void pm_deleter_qb(PipeMaterial* pm); + + void* internal; +}; + +PipeLineElem* create_PL_Queue(); + +#endif diff --git a/RtspFace/PL_RTSPServer.cpp b/RtspFace/PL_RTSPServer.cpp index feb25ac..ad5d1f9 100644 --- a/RtspFace/PL_RTSPServer.cpp +++ b/RtspFace/PL_RTSPServer.cpp @@ -12,8 +12,11 @@ struct RTSPServer_Internal { - uint8_t* buffer; + uint8_t buffer[1920*1080*3]; size_t buffSize; + size_t buffSizeMax; + + RTSPServerConfig config; bool payError; pthread_t live_daemon_thid; @@ -24,7 +27,7 @@ MyEncoderStub * encoderStub; RTSPServer_Internal() : - buffer(nullptr), buffSize(0), + buffSize(0), buffSizeMax(sizeof(buffer)), config(), payError(true), live_daemon_thid(0), frame_mutex(new pthread_mutex_t), live_daemon_running(false), server(nullptr), encoderStub(nullptr) { @@ -43,8 +46,10 @@ void reset() { - buffer = nullptr; buffSize = 0; + + RTSPServerConfig _config; + config =_config; payError = true; @@ -84,32 +89,43 @@ virtual char GetFrame(u_int8_t** FrameBuffer, unsigned int *FrameSize) { - if (in.buffer != nullptr && in.buffSize > 0) - { - *FrameBuffer = in.buffer; - *FrameSize = in.buffSize; - - printf("send frame size=%u\n", in.buffSize); - - in.buffer = nullptr; - in.buffSize = 0; - - return 1; - } - else + if (in.buffer == nullptr || in.buffSize <= 0) { ReleaseFrame(); return 0; } + + uint8_t* pBuffer = in.buffer; + size_t newBufferSize = in.buffSize; + if (in.config.payWithAux) + { + if (newBufferSize <= 4) + { + ReleaseFrame(); + return 0; + } + pBuffer += 4; + newBufferSize -= 4; + } + + *FrameBuffer = pBuffer; + *FrameSize = newBufferSize; + + printf("send frame size=%u\n", in.buffSize); } virtual char ReleaseFrame() { - int ret = pthread_mutex_unlock(in.frame_mutex); - if(ret != 0) + in.buffSize = 0; + + if (in.config.syncDeliverFrame) { - printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret)); - return 0; + int ret = pthread_mutex_unlock(in.frame_mutex); + if(ret != 0) + { + printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret)); + return 0; + } } return 1; @@ -120,11 +136,14 @@ // write frame buffer of RTSPServer_Internal::buffer onFrame(); - int ret = pthread_mutex_lock(in.frame_mutex); - if(ret != 0) + if (in.config.syncDeliverFrame) { - printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret)); - return; + int ret = pthread_mutex_lock(in.frame_mutex); + if(ret != 0) + { + printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret)); + return; + } } } @@ -167,6 +186,12 @@ RTSPServer_Internal* in = (RTSPServer_Internal*)internal; in->reset(); + if (args) + { + RTSPServerConfig* config = (RTSPServerConfig*)args; + in->config = *config; + } + int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); if(ret != 0) { @@ -191,7 +216,10 @@ if (pm.buffer == nullptr || pm.buffSize <= 0) return false; - in->buffer = pm.buffer; + if (in->buffSize > 0) + printf("PL_RTSPServer::pay may lost data size=%u\n", in->buffSize); + + memcpy(in->buffer, pm.buffer, pm.buffSize); in->buffSize = pm.buffSize; if (in->encoderStub == nullptr) diff --git a/RtspFace/PL_RTSPServer.h b/RtspFace/PL_RTSPServer.h index 8b0afec..ea9e154 100644 --- a/RtspFace/PL_RTSPServer.h +++ b/RtspFace/PL_RTSPServer.h @@ -3,6 +3,18 @@ #include "PipeLine.h" +struct RTSPServerConfig +{ + bool syncDeliverFrame; + bool payWithAux; + bool sendWithAux; + + RTSPServerConfig() : + syncDeliverFrame(true), payWithAux(true), sendWithAux(false) + { + } +}; + class PL_RTSPServer : public PipeLineElem { public: diff --git a/RtspFace/PipeLine.cpp b/RtspFace/PipeLine.cpp index 16f5ee1..8b1ffea 100644 --- a/RtspFace/PipeLine.cpp +++ b/RtspFace/PipeLine.cpp @@ -1,7 +1,18 @@ #include "PipeLine.h" -PipeMaterial::PipeMaterial() : buffer(nullptr), buffSize(0), former(nullptr) +PipeMaterial::PipeMaterial() : + type(PMT__FIRST), buffer(nullptr), buffSize(0), + former(nullptr), deleter(nullptr), args(nullptr) { +} + +void PipeMaterial::exec_deleter() +{ + if (deleter != nullptr) + { + deleter(this); + deleter = nullptr; + } } PipeLine::PipeLine() : global_params_map(), elem_create_func_map(), elems() @@ -81,12 +92,16 @@ if (elems.size() == 1) { elem_begin->gain(*pm); + pm->exec_deleter(); return elem_begin; } else if (elems.size() == 2) { if (elem_begin->gain(*pm)) + { elem_last->pay(*pm); + pm->exec_deleter(); + } else return elem_begin; return elem_last; @@ -103,16 +118,22 @@ while (elem_begin != elem_last) { if (lastRet && (lastRet = elem_begin->pay(*pm)) ) + { + pm->exec_deleter(); lastRet = elem_begin->gain(*pm); + } else - return elem_begin; + return elem_begin;//#todo this may memory leakage in pm ++iter; elem_begin = *iter; } if (lastRet) + { elem_last->pay(*pm); + pm->exec_deleter(); + } return elem_last; } diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h index 898e1f6..2a59df7 100644 --- a/RtspFace/PipeLine.h +++ b/RtspFace/PipeLine.h @@ -12,13 +12,32 @@ class PipeLineElem; class PipeLine; +enum PipeMaterialBufferType +{ + PMT__FIRST, + PMT_BYTES, + PMT_TEXT, + PMT_IMAGE, + PMT_PM_LIST, + PMT_PTR_AVFRAME, + PMT__LAST +}; + +struct PipeMaterial; +typedef void (* pm_deleter_func)(PipeMaterial* pm); + struct PipeMaterial { + PipeMaterialBufferType type; uint8_t* buffer; size_t buffSize; PipeLineElem* former; + pm_deleter_func deleter; + void* args; PipeMaterial(); + + void exec_deleter(); }; class PipeLineElem @@ -41,9 +60,10 @@ typedef PipeLineElem* (*elem_create_func_t)(); // 0 (there is no elem). do nothing -// 1 (there is one elem). gain -// 2 (there is two elems). gain --> pay -// 3 (there is more than two elems). gain --> pay gain --> pay gain --> ... --> pay +// 1 (there is one elem). gain --> pm.deleter +// 2 (there is two elems). gain --> pay --> pm.deleter +// 3 (there is more than two elems). +// gain --> [pay --> pm.deleter --> gain -->] [pay --> pm.deleter --> gain -->] ... --> pay --> pm.deleter class PipeLine { public: diff --git a/RtspFace/main.cpp b/RtspFace/main.cpp index ea5505d..ef672a1 100644 --- a/RtspFace/main.cpp +++ b/RtspFace/main.cpp @@ -5,6 +5,7 @@ #include "PL_H264Encoder.h" #include "PL_AVFrameYUV420.h" #include "PL_AVFrameBGRA.h" +#include "PL_Queue.h" #include <iostream> using namespace std; @@ -18,33 +19,61 @@ pipeLine.register_elem_creator("PL_H264Decoder", create_PL_H264Decoder); pipeLine.register_elem_creator("PL_AVFrameYUV420", create_PL_AVFrameYUV420); pipeLine.register_elem_creator("PL_H264Encoder", create_PL_H264Encoder); + pipeLine.register_elem_creator("PL_Queue", create_PL_Queue); - PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient"); - RTSPConfig rtspConfig; - rtspConfig.progName = argv[0]; - rtspConfig.rtspURL = argv[1]; - rtspConfig.aux = false; // ffmpeg need aux - rtspConfig.verbosityLevel = 1; - rtspConfig.tunnelOverHTTPPortNum = 0; - rtspConfig.args = nullptr; - bool ret = rtspClient->init(&rtspConfig); - if (!ret) { - cout << "rtspClient.init error" << endl; - exit(EXIT_FAILURE); + PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient"); + RTSPConfig rtspConfig; + rtspConfig.progName = argv[0]; + rtspConfig.rtspURL = argv[1]; + rtspConfig.aux = true; // ffmpeg need aux, but live555 not + rtspConfig.verbosityLevel = 1; + rtspConfig.tunnelOverHTTPPortNum = 0; + rtspConfig.args = nullptr; + bool ret = rtspClient->init(&rtspConfig); + if (!ret) + { + cout << "rtspClient.init error" << endl; + exit(EXIT_FAILURE); + } } - //PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder"); - //h264Decoder->init(nullptr); + //{ + // PL_Queue_Config config; + // PL_Queue* queue1 = (PL_Queue*)pipeLine.push_elem("PL_Queue"); + // bool ret = queue1->init(&config); + // if (!ret) + // { + // cout << "queue1.init error" << endl; + // exit(EXIT_FAILURE); + // } + //} - //PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420"); - //avFrameYUV420->init(nullptr); + { + PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder"); + h264Decoder->init(nullptr); + } + + //{ + // PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420"); + // avFrameYUV420->init(nullptr); + //} + + { + PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder"); + h264Encoder->init(nullptr); + } - //PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder"); - //h264Encoder->init(nullptr); - - PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer"); - rtspServer->init(nullptr); + //{ + // RTSPServerConfig config; + // PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer"); + // bool ret = rtspServer->init(&config); + // if (!ret) + // { + // cout << "rtspServer.init error" << endl; + // exit(EXIT_FAILURE); + // } + //} while(true) { diff --git a/RtspFace/make.sh b/RtspFace/make.sh index 6a122db..704fa7d 100644 --- a/RtspFace/make.sh +++ b/RtspFace/make.sh @@ -36,15 +36,16 @@ g++ -g -c -std=c++11 PL_H264Encoder.cpp $CFLAGS $CPPFLAGS g++ -g -c -std=c++11 PL_AVFrameYUV420.cpp $CFLAGS $CPPFLAGS g++ -g -c -std=c++11 PL_AVFrameBGRA.cpp $CFLAGS $CPPFLAGS +g++ -g -c -std=c++11 PL_Queue.cpp $CFLAGS $CPPFLAGS g++ -g -c -std=c++11 PipeLine.cpp $CFLAGS $CPPFLAGS -g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/FFmpegH264Source.cpp $CFLAGS $CPPFLAGS +g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS g++ -g -std=c++11 \ main.o PipeLine.o \ - PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o \ + PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o \ $FFMPEGRTSPSERVER_OBJ PL_RTSPServer.o \ $LDFLAGS -o rtsp_face -- Gitblit v1.8.0