From 4ef430e946e717d72e923c4708a9120f94d55dbd Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期三, 28 十二月 2016 09:35:14 +0800
Subject: [PATCH] test h264 encoder
---
RtspFace/make.sh | 5
RtspFace/PL_RTSPServer.cpp | 78 +++-
RtspFace/PipeLine.h | 26 +
RtspFace/PL_RTSPServer.h | 12
RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp | 114 ++++++-
RtspFace/PL_Queue.h | 44 +++
RtspFace/main.cpp | 71 +++-
RtspFace/PL_H264Encoder.cpp | 62 +++-
RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h | 9
RtspFace/PL_Queue.cpp | 360 +++++++++++++++++++++++++
RtspFace/PL_AVFrameYUV420.cpp | 1
RtspFace/PipeLine.cpp | 25 +
12 files changed, 715 insertions(+), 92 deletions(-)
diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
index 0b38140..9ea1bba 100644
--- a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
+++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.cpp
@@ -16,6 +16,13 @@
pthread_mutex_init(&outqueue_mutex,NULL);
}
+
+ FFmpegH264Encoder::~FFmpegH264Encoder()
+ {
+ pthread_mutex_init(&inqueue_mutex,NULL);
+ pthread_mutex_init(&outqueue_mutex,NULL);
+
+ }
void FFmpegH264Encoder::setCallbackFunctionFrameIsReady(std::function<void()> func)
{
@@ -45,13 +52,13 @@
pthread_mutex_unlock(&inqueue_mutex);
if(frame != NULL)
{
- WriteFrame(frame);
+ WriteFrameRGB(frame);
}
}
}
}
- void FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id)
+ bool FFmpegH264Encoder::SetupCodec(const char *filename, int codec_id)
{
int ret;
m_sws_flags = SWS_BICUBIC;
@@ -60,14 +67,17 @@
avcodec_register_all();
av_register_all();
- avformat_alloc_output_context2(&m_oc, NULL, NULL, filename);
+ if (strlen(filename) == 0)
+ avformat_alloc_output_context2(&m_oc, NULL, "h264", filename);
+ else
+ avformat_alloc_output_context2(&m_oc, NULL, NULL, filename);
if (!m_oc) {
avformat_alloc_output_context2(&m_oc, NULL, "avi", filename);
}
if (!m_oc) {
- return;
+ return false;
}
m_fmt = m_oc->oformat;
@@ -79,13 +89,13 @@
m_video_codec = avcodec_find_encoder(m_fmt->video_codec);
if (!(m_video_codec)) {
- return;
+ return false;
}
st = avformat_new_stream(m_oc, m_video_codec);
if (!st) {
- return;
+ return false;
}
st->id = m_oc->nb_streams-1;
@@ -112,7 +122,7 @@
ret = avcodec_open2(c, m_video_codec, NULL);
if (ret < 0) {
- return;
+ return false;
}
//ret = avpicture_alloc(&m_dst_picture, c->pix_fmt, c->width, c->height);
@@ -126,7 +136,7 @@
ret = av_image_alloc(m_dst_picture->data, m_dst_picture->linesize, c->width, c->height, (AVPixelFormat)m_dst_picture->format, 32);
if (ret < 0) {
- return;
+ return false;
}
//ret = avpicture_alloc(&m_src_picture, AV_PIX_FMT_BGR24, c->width, c->height);
@@ -135,7 +145,7 @@
ret = av_image_alloc(m_src_picture->data, m_src_picture->linesize, c->width, c->height, AV_PIX_FMT_BGR24, 24);
if (ret < 0) {
- return;
+ return false;
}
bufferSize = ret;
@@ -145,27 +155,28 @@
if (!(m_fmt->flags & AVFMT_NOFILE)) {
ret = avio_open(&m_oc->pb, filename, AVIO_FLAG_WRITE);
if (ret < 0) {
- return;
+ return false;
}
}
ret = avformat_write_header(m_oc, NULL);
if (ret < 0) {
- return;
+ return false;
}
sws_ctx = sws_getContext(c->width, c->height, AV_PIX_FMT_BGR24,
c->width, c->height, AV_PIX_FMT_YUV420P,
SWS_BICUBIC, NULL, NULL, NULL);
if (!sws_ctx) {
- return;
+ return false;
}
+
+ return true;
}
- void FFmpegH264Encoder::WriteFrame(uint8_t * RGBFrame )
- {
-
+ bool FFmpegH264Encoder::WriteFrameRGB(uint8_t * RGBFrame )
+ {
memcpy(m_src_picture->data[0], RGBFrame, bufferSize);
sws_scale(sws_ctx,
@@ -182,7 +193,7 @@
ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet);
if (ret < 0) {
- return;
+ return false;
}
if (!ret && got_packet && pkt.size)
@@ -215,10 +226,75 @@
m_frame_count++;
m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base);
- onFrame();
+ if (onFrame != nullptr)
+ onFrame();
+
+ return true;
+ }
+
+ void copyAVFrame(AVFrame* dest, AVFrame* src)
+ {
+ int height = dest->height;
+ int width = dest->width;
+
+ memcpy(dest->data[0], src->data[0], height * width); // Y
+ memcpy(dest->data[1], src->data[1], height * width / 4); // U
+ memcpy(dest->data[2], src->data[2], height * width / 4); // V
+ }
+
+ bool FFmpegH264Encoder::WriteFrameYUV420(AVFrame * YUVFrame)
+ {
+ copyAVFrame(m_dst_picture, YUVFrame);
+
+ AVPacket pkt = { 0 };
+ int got_packet;
+ av_init_packet(&pkt);
+
+ int ret = 0;
+
+ ret = avcodec_encode_video2(m_c, &pkt, m_dst_picture, &got_packet);
+
+ if (ret < 0) {
+ return false;
+ }
+
+ if (!ret && got_packet && pkt.size)
+ {
+ pkt.stream_index = m_video_st->index;
+ FrameStructure * frame = new FrameStructure();
+ frame->dataPointer = new uint8_t[pkt.size];
+ frame->dataSize = pkt.size-4;
+ frame->frameID = m_frame_count;
+
+ memcpy(frame->dataPointer,pkt.data+4,pkt.size-4);
+
+ pthread_mutex_lock(&outqueue_mutex);
+
+ if(outqueue.size()<30)
+ {
+ outqueue.push(frame);
+ }
+ else
+ {
+ delete frame;
+ }
+
+ pthread_mutex_unlock(&outqueue_mutex);
+
+ }
+
+ av_free_packet(&pkt);
+
+ m_frame_count++;
+ m_dst_picture->pts += av_rescale_q(1, m_video_st->codec->time_base, m_video_st->time_base);
+
+ if (onFrame != nullptr)
+ onFrame();
+
+ return true;
}
- void FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond)
+ bool FFmpegH264Encoder::SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond)
{
m_filename = filename;
m_AVIMOV_WIDTH=Width; //Movie width
@@ -227,7 +303,7 @@
m_AVIMOV_GOB=GOB; //I frames per no of P frames, see note below!
m_AVIMOV_BPS=BitPerSecond; //Bits per second, if this is too low then movie will become garbled
- SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264);
+ return SetupCodec(m_filename.c_str(),AV_CODEC_ID_H264);
}
void FFmpegH264Encoder::CloseCodec()
diff --git a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
index 31e345d..d07e113 100644
--- a/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
+++ b/RtspFace/FFmpegRTSPServer/FFmpegH264Encoder.h
@@ -49,18 +49,19 @@
{
public:
FFmpegH264Encoder();
- ~FFmpegH264Encoder();
+ virtual ~FFmpegH264Encoder();
virtual void setCallbackFunctionFrameIsReady(std::function<void()> func);
- void SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond);
+ bool SetupVideo(std::string filename, int Width, int Height, int FPS, int GOB, int BitPerSecond);
void CloseVideo();
- void SetupCodec(const char *filename, int codec_id);
+ bool SetupCodec(const char *filename, int codec_id);
void CloseCodec();
void SendNewFrame(uint8_t * RGBFrame);
- void WriteFrame(uint8_t * RGBFrame);
+ bool WriteFrameRGB(uint8_t * RGBFrame);
+ bool WriteFrameYUV420(AVFrame * YUVFrame);
virtual char ReleaseFrame();
void run();
diff --git a/RtspFace/PL_AVFrameYUV420.cpp b/RtspFace/PL_AVFrameYUV420.cpp
index 1384218..fc85e4c 100644
--- a/RtspFace/PL_AVFrameYUV420.cpp
+++ b/RtspFace/PL_AVFrameYUV420.cpp
@@ -104,6 +104,7 @@
//in->buffer readly
+ //#test
//static size_t f=0;
//char fname[50];
//sprintf(fname, "%u.yuv420", ++f);
diff --git a/RtspFace/PL_H264Encoder.cpp b/RtspFace/PL_H264Encoder.cpp
index 9fc0a0b..e2fcdf2 100644
--- a/RtspFace/PL_H264Encoder.cpp
+++ b/RtspFace/PL_H264Encoder.cpp
@@ -19,11 +19,13 @@
AVCodecContext* pAVCodecContext;
AVFrame* pAVFrame;//#todo delete
+ AVStream* pAVStream;
+ AVFormatContext* pAVFormatContext;
H264Encoder_Internal() :
buffSize(0), buffSizeMax(sizeof(buffer)),
payError(true), ffmpegInited(false), frameCount(0),
- pAVCodecContext(nullptr), pAVFrame(nullptr)
+ pAVCodecContext(nullptr), pAVFrame(nullptr), pAVStream(nullptr), pAVFormatContext(nullptr)
{
}
@@ -41,6 +43,8 @@
pAVCodecContext = nullptr;
pAVFrame = nullptr;
+ pAVStream = nullptr;
+ pAVFormatContext = nullptr;
}
};
@@ -88,7 +92,7 @@
in->pAVCodecContext = avcodec_alloc_context3(avCodec);
- in->pAVCodecContext->bit_rate = 3*1024*1024*8; // 3MB
+ in->pAVCodecContext->bit_rate = 1*1024*1024*8; // 3MB
in->pAVCodecContext->width = 1920;
in->pAVCodecContext->height = 1080;//#todo from config
in->pAVCodecContext->time_base.num=1;
@@ -96,6 +100,9 @@
in->pAVCodecContext->gop_size = 20;
in->pAVCodecContext->max_b_frames = 0;
in->pAVCodecContext->pix_fmt = AV_PIX_FMT_YUV420P;
+
+ //av_opt_set(c->priv_data, "preset", "superfast", 0);
+ //av_opt_set(c->priv_data, "tune", "zerolatency", 0);
if(avcodec_open2(in->pAVCodecContext, avCodec, NULL) >= 0)
{
@@ -119,17 +126,36 @@
return false;
}
+ //int ret = avformat_alloc_output_context2(&(in->pAVFormatContext), NULL, "avi", "");
+ //if (ret < 0 || in->pAVFormatContext == nullptr)
+ //{
+ // printf("avformat_alloc_output_context2 error\n");
+ // return false;
+ //}
+ //
+ //in->pAVStream = avformat_new_stream(in->pAVFormatContext, avCodec);
+ //if (in->pAVStream == nullptr)
+ //{
+ // printf("avformat_new_stream error\n");
+ // return false;
+ //}
+ //in->pAVStream->id = in->pAVFormatContext->nb_streams-1;
+
return true;
}
void copyAVFrame(AVFrame* dest, AVFrame* src)
{
- int height = dest->height;
- int width = dest->width;
+ dest->data[0] = src->data[0];
+ dest->data[1] = src->data[1];
+ dest->data[2] = src->data[2];
- memcpy(dest->data[0], src->data[0], height * width); // Y
- memcpy(dest->data[1], src->data[1], height * width / 4); // U
- memcpy(dest->data[2], src->data[2], height * width / 4); // V
+ //int height = dest->height;
+ //int width = dest->width;
+ //
+ //memcpy(dest->data[0], src->data[0], height * width); // Y
+ //memcpy(dest->data[1], src->data[1], height * width / 4); // U
+ //memcpy(dest->data[2], src->data[2], height * width / 4); // V
}
bool encodeH264(H264Encoder_Internal* in, AVFrame* pAVFrame, size_t buffSize)
@@ -138,13 +164,16 @@
in->frameCount++;
copyAVFrame(in->pAVFrame, pAVFrame);
- in->pAVFrame->pts = in->frameCount;
-
+
+ //in->pAVFrame->pts = (1.0 / 25) * 90000 * in->frameCount;
+ in->pAVFrame->pts = time(nullptr);
+
AVPacket pAVPacket = {0};
av_init_packet(&pAVPacket);
-
+
// encode the image
int gotPacket = 0;
+
int ret = avcodec_encode_video2(in->pAVCodecContext, &pAVPacket, in->pAVFrame, &gotPacket);
if (ret < 0)
{
@@ -154,9 +183,9 @@
if (gotPacket > 0)
{
- printf("Succeed to encode (1) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size);
- memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size);
- in->buffSize += pAVPacket.size;
+ printf("Succeed to encode (1) frame=%d, size=%d\n", in->frameCount, pAVPacket.size);
+ memcpy(in->buffer, pAVPacket.data, pAVPacket.size);
+ in->buffSize = pAVPacket.size;
av_free_packet(&pAVPacket);
}
@@ -172,12 +201,13 @@
// }
// if (gotPacket > 0)
// {
- // printf("Succeed to encode (2) frame=%d, size=%d\n", in->pAVFrame->pts, pAVPacket.size);
+ // printf("Succeed to encode (2) frame=%d, size=%d\n", in->frameCount, pAVPacket.size);
// memcpy(in->buffer + in->buffSize, pAVPacket.data, pAVPacket.size);
// in->buffSize += pAVPacket.size;
// av_free_packet(&pAVPacket);
// }
- //}
+ //}
+
//#test
if (in->buffSize > 0)
@@ -186,7 +216,7 @@
fwrite (in->buffer , sizeof(char), in->buffSize, pFile);
fflush(pFile);
}
-
+
in->payError = (in->buffSize == 0);
return !(in->payError);
}
diff --git a/RtspFace/PL_Queue.cpp b/RtspFace/PL_Queue.cpp
new file mode 100644
index 0000000..4641eec
--- /dev/null
+++ b/RtspFace/PL_Queue.cpp
@@ -0,0 +1,360 @@
+#include "PL_Queue.h"
+#include <set>
+#include <queue>
+#include <string.h>
+
+struct QBlock
+{
+ uint8_t* data;
+ size_t maxSize;
+ size_t size;
+
+ QBlock() : data(nullptr), maxSize(0), size(0) { }
+
+ QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
+
+ ~QBlock()
+ {
+ data = nullptr;
+ maxSize = 0;
+ size = 0;
+ }
+};
+
+struct QBlockPool
+{
+ uint8_t* blocksDataPool;
+
+ typedef std::set<QBlock*> qpool_t;
+ qpool_t staticPool;
+ qpool_t freePool;
+
+ QBlockPool() : blocksDataPool(nullptr)
+ {
+ }
+
+ QBlockPool(size_t maxBlockCount, size_t maxBlockSize) :
+ blocksDataPool(nullptr)
+ {
+ bp_init(maxBlockCount, maxBlockSize);
+ }
+
+ void bp_init(size_t maxBlockCount, size_t maxBlockSize)
+ {
+ const size_t totalBytes = maxBlockCount * maxBlockSize;
+ blocksDataPool = new uint8_t[totalBytes];
+ printf("QBlockPool allocate byte: %u\n", totalBytes);
+
+ for (size_t i = 0; i < maxBlockCount; i++)
+ {
+ uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
+ QBlock* qb = new QBlock(qbData, maxBlockSize);
+ staticPool.insert(qb);
+ freePool.insert(qb);
+ }
+ }
+
+ ~QBlockPool()
+ {
+ if (freePool.size() != staticPool.size())
+ {
+ printf("QBlockPool memory leakage\n");
+ }
+
+ for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
+ {
+ delete *iter;
+ }
+
+ delete[] blocksDataPool;
+ blocksDataPool = nullptr;
+ }
+
+ QBlock* bp_alloc()
+ {
+ if (freePool.empty())
+ {
+ printf("QBlockPool bp_alloc empty\n");
+ return nullptr;
+ }
+
+ qpool_t::iterator iter = freePool.begin();
+ QBlock* qb = *iter;
+ freePool.erase(iter);
+ return qb;
+ }
+
+ void bp_free(QBlock* qb)
+ {
+ if (qb == nullptr)
+ return;
+
+ qpool_t::iterator iter = staticPool.find(qb);
+ if (iter == staticPool.end())
+ {
+ printf("QBlockPool bp_free not in pool\n");
+ return;
+ }
+
+ //iter = freePool.find(qb);
+ //if (iter != freePool.end())
+ //{
+ // printf("QBlockPool bp_free block is free");
+ // return;
+ //}
+
+ qb->size = 0;
+ freePool.insert(qb);
+ }
+
+ bool bp_empty() const
+ {
+ return freePool.empty();
+ }
+};
+
+struct PL_Queue_Internal
+{
+ PL_Queue_Config config;
+ QBlockPool pool;
+
+ typedef std::queue<QBlock*> queue_t;
+ queue_t que;
+
+ pthread_mutex_t* queue_pool_mutex;
+ pthread_mutex_t* sync_full_mutex;
+ pthread_mutex_t* sync_empty_mutex;
+
+ PL_Queue_Internal() :
+ config(), pool(), que(),
+ queue_pool_mutex(new pthread_mutex_t),
+ sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
+ {
+ pthread_mutex_init(queue_pool_mutex, NULL);
+ pthread_mutex_init(sync_full_mutex, NULL);
+ pthread_mutex_init(sync_empty_mutex, NULL);
+ }
+
+ ~PL_Queue_Internal()
+ {
+ if (queue_pool_mutex != nullptr)
+ {
+ pthread_mutex_destroy(queue_pool_mutex);
+ delete queue_pool_mutex;
+ queue_pool_mutex = nullptr;
+ }
+
+ if (sync_full_mutex != nullptr)
+ {
+ pthread_mutex_destroy(sync_full_mutex);
+ delete sync_full_mutex;
+ sync_full_mutex = nullptr;
+ }
+
+ if (sync_empty_mutex != nullptr)
+ {
+ pthread_mutex_destroy(sync_empty_mutex);
+ delete sync_empty_mutex;
+ sync_empty_mutex = nullptr;
+ }
+ }
+
+ void reset()
+ {
+ if (queue_pool_mutex != nullptr)
+ {
+ pthread_mutex_destroy(queue_pool_mutex);
+ delete queue_pool_mutex;
+ queue_pool_mutex = nullptr;
+ }
+
+ queue_pool_mutex = new pthread_mutex_t;
+ pthread_mutex_init(queue_pool_mutex, NULL);
+
+ if (sync_full_mutex != nullptr)
+ {
+ pthread_mutex_destroy(sync_full_mutex);
+ delete sync_full_mutex;
+ sync_full_mutex = nullptr;
+ }
+
+ sync_full_mutex = new pthread_mutex_t;
+ pthread_mutex_init(sync_full_mutex, NULL);
+
+ if (sync_empty_mutex != nullptr)
+ {
+ pthread_mutex_destroy(sync_empty_mutex);
+ delete sync_empty_mutex;
+ sync_empty_mutex = nullptr;
+ }
+
+ sync_empty_mutex = new pthread_mutex_t;
+ pthread_mutex_init(sync_empty_mutex, NULL);
+
+ //#todo free que
+ }
+};
+
+PipeLineElem* create_PL_Queue()
+{
+ return new PL_Queue;
+}
+
+PL_Queue::PL_Queue() : internal(new PL_Queue_Internal)
+{
+}
+
+PL_Queue::~PL_Queue()
+{
+ delete (PL_Queue_Internal*)internal;
+ internal= nullptr;
+}
+
+bool PL_Queue::init(void* args)
+{
+ PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
+ PL_Queue_Config* config = (PL_Queue_Config*)args;
+
+ in->reset();
+ in->config = *config;
+ in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
+
+ if (config->syncQueueFull)
+ {
+ int ret = pthread_mutex_lock(in->sync_full_mutex);
+ if(ret != 0)
+ {
+ printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
+ return false;
+ }
+ }
+
+ if (config->syncQueueEmpty)
+ {
+ int ret = pthread_mutex_lock(in->sync_empty_mutex);
+ if(ret != 0)
+ {
+ printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void PL_Queue::finit()
+{
+ PL_Queue_Internal* in = (PL_Queue_Internal*)internal;//#todo delete
+}
+
+bool PL_Queue::pay(const PipeMaterial& pm)
+{
+ PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
+
+ QBlock* qb = nullptr;
+ if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+ {
+ // there is no available qb
+
+ if (in->config.queueFullDropBlock)
+ {
+ printf("PL_Queue::pay queueFullDropBlock\n");
+
+ pthread_mutex_lock(in->queue_pool_mutex);
+ qb = in->que.front();
+ in->que.pop();
+
+ in->pool.bp_free(qb);
+ qb = nullptr;
+ pthread_mutex_unlock(in->queue_pool_mutex);
+ }
+ else
+ {
+ if (in->config.syncQueueFull)
+ {
+ printf("PL_Queue::pay sync by sync_full_mutex\n");
+ pthread_mutex_lock(in->sync_full_mutex);
+ }
+ }
+ }
+
+ if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
+ {
+ printf("PL_Queue::pay full\n");
+ return false;
+ }
+
+ pthread_mutex_lock(in->queue_pool_mutex);
+ qb = in->pool.bp_alloc();
+ if (qb != nullptr)
+ in->que.push(qb);
+ pthread_mutex_unlock(in->queue_pool_mutex);
+
+ if (qb == nullptr)
+ {
+ printf("PL_Queue::pay bp_alloc nullptr\n");
+ return false;
+ }
+
+ if (in->config.copyData)
+ {
+ memcpy(qb->data, pm.buffer, pm.buffSize);
+ qb->size = pm.buffSize;
+ }
+
+ if (in->config.syncQueueEmpty)
+ pthread_mutex_unlock(in->sync_empty_mutex);
+
+ return true;
+}
+
+void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
+{
+ if (pm->former == nullptr || pm->args == nullptr)
+ return;
+
+ PL_Queue* _this = (PL_Queue*)pm->former;
+ QBlock* qb = (QBlock*)pm->args;
+ PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
+
+ pthread_mutex_lock(in->queue_pool_mutex);
+ in->pool.bp_free(qb);
+ pthread_mutex_unlock(in->queue_pool_mutex);
+}
+
+bool PL_Queue::gain(PipeMaterial& pm)
+{
+ PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
+
+ if (in->que.empty())
+ {
+ if (in->config.syncQueueEmpty)
+ pthread_mutex_lock(in->sync_empty_mutex);
+ }
+
+ if (in->que.empty())
+ {
+ printf("PL_Queue::gain empty\n");
+ pm.buffer = nullptr;
+ pm.buffSize = 0;
+ pm.former = this;
+ return false;
+ }
+
+ QBlock* qb = nullptr;
+ pthread_mutex_lock(in->queue_pool_mutex);
+ qb = in->que.front();
+ in->que.pop();
+ pthread_mutex_unlock(in->queue_pool_mutex);
+
+ if (in->config.syncQueueFull)
+ pthread_mutex_unlock(in->sync_full_mutex);
+
+ pm.type = PMT_BYTES;
+ pm.buffer = qb->data;
+ pm.buffSize = qb->size;
+ pm.former = this;
+ pm.deleter = pm_deleter_qb;
+ pm.args = qb;
+ return true;
+}
diff --git a/RtspFace/PL_Queue.h b/RtspFace/PL_Queue.h
new file mode 100644
index 0000000..72f01e2
--- /dev/null
+++ b/RtspFace/PL_Queue.h
@@ -0,0 +1,44 @@
+#ifndef _PL_QUEUE_H_
+#define _PL_QUEUE_H_
+
+#include "PipeLine.h"
+
+struct PL_Queue_Config
+{
+ size_t maxBlockCount;
+ size_t maxBlockSize;
+ bool cacheEmptyBlock;//#todo not implement
+
+ bool syncQueueFull;
+ bool syncQueueEmpty;
+ bool queueFullDropBlock;
+ bool copyData; //#todo not implement (copy ptr)
+
+ PL_Queue_Config() :
+ maxBlockCount(100), maxBlockSize(300000), cacheEmptyBlock(false),
+ syncQueueFull(true), syncQueueEmpty(true), queueFullDropBlock(false), copyData(true)
+ {
+ }
+};
+
+class PL_Queue : public PipeLineElem
+{
+public:
+ PL_Queue();
+ virtual ~PL_Queue();
+
+ virtual bool init(void* args);
+ virtual void finit();
+
+ virtual bool pay(const PipeMaterial& pm);
+ virtual bool gain(PipeMaterial& pm);
+
+private:
+ static void pm_deleter_qb(PipeMaterial* pm);
+
+ void* internal;
+};
+
+PipeLineElem* create_PL_Queue();
+
+#endif
diff --git a/RtspFace/PL_RTSPServer.cpp b/RtspFace/PL_RTSPServer.cpp
index feb25ac..ad5d1f9 100644
--- a/RtspFace/PL_RTSPServer.cpp
+++ b/RtspFace/PL_RTSPServer.cpp
@@ -12,8 +12,11 @@
struct RTSPServer_Internal
{
- uint8_t* buffer;
+ uint8_t buffer[1920*1080*3];
size_t buffSize;
+ size_t buffSizeMax;
+
+ RTSPServerConfig config;
bool payError;
pthread_t live_daemon_thid;
@@ -24,7 +27,7 @@
MyEncoderStub * encoderStub;
RTSPServer_Internal() :
- buffer(nullptr), buffSize(0),
+ buffSize(0), buffSizeMax(sizeof(buffer)), config(),
payError(true), live_daemon_thid(0), frame_mutex(new pthread_mutex_t), live_daemon_running(false),
server(nullptr), encoderStub(nullptr)
{
@@ -43,8 +46,10 @@
void reset()
{
- buffer = nullptr;
buffSize = 0;
+
+ RTSPServerConfig _config;
+ config =_config;
payError = true;
@@ -84,32 +89,43 @@
virtual char GetFrame(u_int8_t** FrameBuffer, unsigned int *FrameSize)
{
- if (in.buffer != nullptr && in.buffSize > 0)
- {
- *FrameBuffer = in.buffer;
- *FrameSize = in.buffSize;
-
- printf("send frame size=%u\n", in.buffSize);
-
- in.buffer = nullptr;
- in.buffSize = 0;
-
- return 1;
- }
- else
+ if (in.buffer == nullptr || in.buffSize <= 0)
{
ReleaseFrame();
return 0;
}
+
+ uint8_t* pBuffer = in.buffer;
+ size_t newBufferSize = in.buffSize;
+ if (in.config.payWithAux)
+ {
+ if (newBufferSize <= 4)
+ {
+ ReleaseFrame();
+ return 0;
+ }
+ pBuffer += 4;
+ newBufferSize -= 4;
+ }
+
+ *FrameBuffer = pBuffer;
+ *FrameSize = newBufferSize;
+
+ printf("send frame size=%u\n", in.buffSize);
}
virtual char ReleaseFrame()
{
- int ret = pthread_mutex_unlock(in.frame_mutex);
- if(ret != 0)
+ in.buffSize = 0;
+
+ if (in.config.syncDeliverFrame)
{
- printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
- return 0;
+ int ret = pthread_mutex_unlock(in.frame_mutex);
+ if(ret != 0)
+ {
+ printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
+ return 0;
+ }
}
return 1;
@@ -120,11 +136,14 @@
// write frame buffer of RTSPServer_Internal::buffer
onFrame();
- int ret = pthread_mutex_lock(in.frame_mutex);
- if(ret != 0)
+ if (in.config.syncDeliverFrame)
{
- printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret));
- return;
+ int ret = pthread_mutex_lock(in.frame_mutex);
+ if(ret != 0)
+ {
+ printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret));
+ return;
+ }
}
}
@@ -167,6 +186,12 @@
RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
in->reset();
+ if (args)
+ {
+ RTSPServerConfig* config = (RTSPServerConfig*)args;
+ in->config = *config;
+ }
+
int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
if(ret != 0)
{
@@ -191,7 +216,10 @@
if (pm.buffer == nullptr || pm.buffSize <= 0)
return false;
- in->buffer = pm.buffer;
+ if (in->buffSize > 0)
+ printf("PL_RTSPServer::pay may lost data size=%u\n", in->buffSize);
+
+ memcpy(in->buffer, pm.buffer, pm.buffSize);
in->buffSize = pm.buffSize;
if (in->encoderStub == nullptr)
diff --git a/RtspFace/PL_RTSPServer.h b/RtspFace/PL_RTSPServer.h
index 8b0afec..ea9e154 100644
--- a/RtspFace/PL_RTSPServer.h
+++ b/RtspFace/PL_RTSPServer.h
@@ -3,6 +3,18 @@
#include "PipeLine.h"
+struct RTSPServerConfig
+{
+ bool syncDeliverFrame;
+ bool payWithAux;
+ bool sendWithAux;
+
+ RTSPServerConfig() :
+ syncDeliverFrame(true), payWithAux(true), sendWithAux(false)
+ {
+ }
+};
+
class PL_RTSPServer : public PipeLineElem
{
public:
diff --git a/RtspFace/PipeLine.cpp b/RtspFace/PipeLine.cpp
index 16f5ee1..8b1ffea 100644
--- a/RtspFace/PipeLine.cpp
+++ b/RtspFace/PipeLine.cpp
@@ -1,7 +1,18 @@
#include "PipeLine.h"
-PipeMaterial::PipeMaterial() : buffer(nullptr), buffSize(0), former(nullptr)
+PipeMaterial::PipeMaterial() :
+ type(PMT__FIRST), buffer(nullptr), buffSize(0),
+ former(nullptr), deleter(nullptr), args(nullptr)
{
+}
+
+void PipeMaterial::exec_deleter()
+{
+ if (deleter != nullptr)
+ {
+ deleter(this);
+ deleter = nullptr;
+ }
}
PipeLine::PipeLine() : global_params_map(), elem_create_func_map(), elems()
@@ -81,12 +92,16 @@
if (elems.size() == 1)
{
elem_begin->gain(*pm);
+ pm->exec_deleter();
return elem_begin;
}
else if (elems.size() == 2)
{
if (elem_begin->gain(*pm))
+ {
elem_last->pay(*pm);
+ pm->exec_deleter();
+ }
else
return elem_begin;
return elem_last;
@@ -103,16 +118,22 @@
while (elem_begin != elem_last)
{
if (lastRet && (lastRet = elem_begin->pay(*pm)) )
+ {
+ pm->exec_deleter();
lastRet = elem_begin->gain(*pm);
+ }
else
- return elem_begin;
+ return elem_begin;//#todo this may memory leakage in pm
++iter;
elem_begin = *iter;
}
if (lastRet)
+ {
elem_last->pay(*pm);
+ pm->exec_deleter();
+ }
return elem_last;
}
diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h
index 898e1f6..2a59df7 100644
--- a/RtspFace/PipeLine.h
+++ b/RtspFace/PipeLine.h
@@ -12,13 +12,32 @@
class PipeLineElem;
class PipeLine;
+enum PipeMaterialBufferType
+{
+ PMT__FIRST,
+ PMT_BYTES,
+ PMT_TEXT,
+ PMT_IMAGE,
+ PMT_PM_LIST,
+ PMT_PTR_AVFRAME,
+ PMT__LAST
+};
+
+struct PipeMaterial;
+typedef void (* pm_deleter_func)(PipeMaterial* pm);
+
struct PipeMaterial
{
+ PipeMaterialBufferType type;
uint8_t* buffer;
size_t buffSize;
PipeLineElem* former;
+ pm_deleter_func deleter;
+ void* args;
PipeMaterial();
+
+ void exec_deleter();
};
class PipeLineElem
@@ -41,9 +60,10 @@
typedef PipeLineElem* (*elem_create_func_t)();
// 0 (there is no elem). do nothing
-// 1 (there is one elem). gain
-// 2 (there is two elems). gain --> pay
-// 3 (there is more than two elems). gain --> pay gain --> pay gain --> ... --> pay
+// 1 (there is one elem). gain --> pm.deleter
+// 2 (there is two elems). gain --> pay --> pm.deleter
+// 3 (there is more than two elems).
+// gain --> [pay --> pm.deleter --> gain -->] [pay --> pm.deleter --> gain -->] ... --> pay --> pm.deleter
class PipeLine
{
public:
diff --git a/RtspFace/main.cpp b/RtspFace/main.cpp
index ea5505d..ef672a1 100644
--- a/RtspFace/main.cpp
+++ b/RtspFace/main.cpp
@@ -5,6 +5,7 @@
#include "PL_H264Encoder.h"
#include "PL_AVFrameYUV420.h"
#include "PL_AVFrameBGRA.h"
+#include "PL_Queue.h"
#include <iostream>
using namespace std;
@@ -18,33 +19,61 @@
pipeLine.register_elem_creator("PL_H264Decoder", create_PL_H264Decoder);
pipeLine.register_elem_creator("PL_AVFrameYUV420", create_PL_AVFrameYUV420);
pipeLine.register_elem_creator("PL_H264Encoder", create_PL_H264Encoder);
+ pipeLine.register_elem_creator("PL_Queue", create_PL_Queue);
- PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
- RTSPConfig rtspConfig;
- rtspConfig.progName = argv[0];
- rtspConfig.rtspURL = argv[1];
- rtspConfig.aux = false; // ffmpeg need aux
- rtspConfig.verbosityLevel = 1;
- rtspConfig.tunnelOverHTTPPortNum = 0;
- rtspConfig.args = nullptr;
- bool ret = rtspClient->init(&rtspConfig);
- if (!ret)
{
- cout << "rtspClient.init error" << endl;
- exit(EXIT_FAILURE);
+ PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
+ RTSPConfig rtspConfig;
+ rtspConfig.progName = argv[0];
+ rtspConfig.rtspURL = argv[1];
+ rtspConfig.aux = true; // ffmpeg need aux, but live555 not
+ rtspConfig.verbosityLevel = 1;
+ rtspConfig.tunnelOverHTTPPortNum = 0;
+ rtspConfig.args = nullptr;
+ bool ret = rtspClient->init(&rtspConfig);
+ if (!ret)
+ {
+ cout << "rtspClient.init error" << endl;
+ exit(EXIT_FAILURE);
+ }
}
- //PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
- //h264Decoder->init(nullptr);
+ //{
+ // PL_Queue_Config config;
+ // PL_Queue* queue1 = (PL_Queue*)pipeLine.push_elem("PL_Queue");
+ // bool ret = queue1->init(&config);
+ // if (!ret)
+ // {
+ // cout << "queue1.init error" << endl;
+ // exit(EXIT_FAILURE);
+ // }
+ //}
- //PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
- //avFrameYUV420->init(nullptr);
+ {
+ PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
+ h264Decoder->init(nullptr);
+ }
+
+ //{
+ // PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
+ // avFrameYUV420->init(nullptr);
+ //}
+
+ {
+ PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder");
+ h264Encoder->init(nullptr);
+ }
- //PL_H264Encoder* h264Encoder = (PL_H264Encoder*)pipeLine.push_elem("PL_H264Encoder");
- //h264Encoder->init(nullptr);
-
- PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer");
- rtspServer->init(nullptr);
+ //{
+ // RTSPServerConfig config;
+ // PL_RTSPServer* rtspServer = (PL_RTSPServer*)pipeLine.push_elem("PL_RTSPServer");
+ // bool ret = rtspServer->init(&config);
+ // if (!ret)
+ // {
+ // cout << "rtspServer.init error" << endl;
+ // exit(EXIT_FAILURE);
+ // }
+ //}
while(true)
{
diff --git a/RtspFace/make.sh b/RtspFace/make.sh
index 6a122db..704fa7d 100644
--- a/RtspFace/make.sh
+++ b/RtspFace/make.sh
@@ -36,15 +36,16 @@
g++ -g -c -std=c++11 PL_H264Encoder.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PL_AVFrameYUV420.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PL_AVFrameBGRA.cpp $CFLAGS $CPPFLAGS
+g++ -g -c -std=c++11 PL_Queue.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 PipeLine.cpp $CFLAGS $CPPFLAGS
-g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/FFmpegH264Source.cpp $CFLAGS $CPPFLAGS
+g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveRTSPServer.cpp $CFLAGS $CPPFLAGS
g++ -g -c -std=c++11 $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS
g++ -g -std=c++11 \
main.o PipeLine.o \
- PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o \
+ PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o \
$FFMPEGRTSPSERVER_OBJ PL_RTSPServer.o \
$LDFLAGS -o rtsp_face
--
Gitblit v1.8.0