From 9d9849175b11b3ba9918ad4f980aa4a1c7c2afb0 Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期四, 22 十二月 2016 14:39:50 +0800 Subject: [PATCH] add pipeline --- RtspFace/make.sh | 11 RtspFace/PipeLine.h | 81 +++ RtspFace/PL_AVFrameYUV420.h | 24 + RtspFace/PL_H264Decoder.cpp | 255 ++++++++++++ RtspFace/RTSPClient.hpp | 271 ++---------- RtspFace/main.cpp | 40 + RtspFace/PL_H264Decoder.h | 24 + RtspFace/PL_RTSPClient.cpp | 242 +++++++++++ RtspFace/PL_RTSPClient.h | 36 + RtspFace/PL_AVFrameYUV420.cpp | 115 +++++ RtspFace/PipeLine.cpp | 131 ++++++ 11 files changed, 1,014 insertions(+), 216 deletions(-) diff --git a/RtspFace/PL_AVFrameYUV420.cpp b/RtspFace/PL_AVFrameYUV420.cpp new file mode 100644 index 0000000..5927ce5 --- /dev/null +++ b/RtspFace/PL_AVFrameYUV420.cpp @@ -0,0 +1,115 @@ +#include "PL_AVFrameYUV420.h" + +extern "C" +{ + #include <libavcodec/avcodec.h> + #include <libavutil/frame.h> + #include <libavformat/avformat.h> +} + +struct AVFrameYUV420_Internal +{ + uint8_t buffer[1920*1080*3]; + size_t buffSize; + size_t buffSizeMax; + bool payError; + + AVFrameYUV420_Internal() : + buffSize(0), buffSizeMax(sizeof(buffer)), + payError(false) + { + } + + ~AVFrameYUV420_Internal() + { + } + + void reset() + { + buffSize = 0; + payError = false; + } +}; + +PipeLineElem* create_PL_AVFrameYUV420() +{ + return new PL_AVFrameYUV420; +} + +PL_AVFrameYUV420::PL_AVFrameYUV420() : internal(new AVFrameYUV420_Internal) +{ +} + +PL_AVFrameYUV420::~PL_AVFrameYUV420() +{ + delete (AVFrameYUV420_Internal*)internal; + internal= nullptr; +} + +bool PL_AVFrameYUV420::init(void* args) +{ + AVFrameYUV420_Internal* in = (AVFrameYUV420_Internal*)internal; + in->reset(); + + return true; +} + +void PL_AVFrameYUV420::finit() +{ + AVFrameYUV420_Internal* in = (AVFrameYUV420_Internal*)internal; + +} + +bool PL_AVFrameYUV420::pay(const PipeMaterial& pm) +{ + AVFrameYUV420_Internal* in = (AVFrameYUV420_Internal*)internal; + + int picSize = in->pAVCodecContext->height * in->pAVCodecContext->width; + in->buffSize = picSize * 1.5; + + int height = in->pAVFrame->height; + int width = in->pAVFrame->width; + + // write yuv420 + int a=0; + for (int i = 0; i < height; i++) + { + memcpy(in->buffer + a, in->pAVFrame->data[0] + i * in->pAVFrame->linesize[0], width); + a += width; + } + for (int i=0; i<height/2; i++) + { + memcpy(in->buffer + a, in->pAVFrame->data[1] + i * in->pAVFrame->linesize[1], width / 2); + a += width / 2; + } + for (int i=0; i<height/2; i++) + { + memcpy(in->buffer + a, in->pAVFrame->data[2] + i * in->pAVFrame->linesize[2], width / 2); + a += width / 2; + } + + //in->buffer readly + + //static size_t f=0; + //char fname[50]; + //sprintf(fname, "%u.yuv420", ++f); + //FILE * pFile = fopen (fname,"wb"); + //fwrite (in->buffer , sizeof(char), in->buffSize, pFile); + //fclose(pFile); + + + return in->payError; +} + +bool PL_AVFrameYUV420::gain(PipeMaterial& pm) +{ + AVFrameYUV420_Internal* in = (AVFrameYUV420_Internal*)internal; + + if (!in->payError) + { + pm.buffer = in->buffer; + pm.buffSize = in->buffSize; + } + pm.former = this; + return in->payError; +} diff --git a/RtspFace/PL_AVFrameYUV420.h b/RtspFace/PL_AVFrameYUV420.h new file mode 100644 index 0000000..ae6c677 --- /dev/null +++ b/RtspFace/PL_AVFrameYUV420.h @@ -0,0 +1,24 @@ +#ifndef _PL_AVFrameYUV420_H_ +#define _PL_AVFrameYUV420_H_ + +#include "PipeLine.h" + +class PL_AVFrameYUV420 : public PipeLineElem +{ +public: + PL_AVFrameYUV420(); + virtual ~PL_AVFrameYUV420(); + + virtual bool init(void* args); + virtual void finit(); + + virtual bool pay(const PipeMaterial& pm); + virtual bool gain(PipeMaterial& pm); + +private: + void* internal; +}; + +PipeLineElem* create_PL_AVFrameYUV420(); + +#endif diff --git a/RtspFace/PL_H264Decoder.cpp b/RtspFace/PL_H264Decoder.cpp new file mode 100644 index 0000000..b27918c --- /dev/null +++ b/RtspFace/PL_H264Decoder.cpp @@ -0,0 +1,255 @@ +#include "PL_H264Decoder.h" + +#include <H264VideoRTPSource.hh> // for SPropRecord +#include <libbase64.h> + +extern "C" +{ + #include <libavcodec/avcodec.h> + #include <libavutil/frame.h> + #include <libavformat/avformat.h> +} + +struct H264Decoder_Internal +{ + uint8_t buffer[1920*1080*3]; + size_t buffSize; + size_t buffSizeMax; + bool fmtp_set_to_context; + bool payError; + + AVCodecContext* pAVCodecContext; + AVFrame* pAVFrame; + + H264Decoder_Internal() : + buffSize(0), buffSizeMax(sizeof(buffer)), fmtp_set_to_context(false), + payError(false), + pAVCodecContext(nullptr), pAVFrame(nullptr) + { + } + + ~H264Decoder_Internal() + { + } + + void reset() + { + buffSize = 0; + fmtp_set_to_context = false; + payError = false; + } +}; + +PipeLineElem* create_PL_H264Decoder() +{ + return new PL_H264Decoder; +} + +PL_H264Decoder::PL_H264Decoder() : internal(new H264Decoder_Internal) +{ +} + +PL_H264Decoder::~PL_H264Decoder() +{ + delete (H264Decoder_Internal*)internal; + internal= nullptr; +} + +bool PL_H264Decoder::init(void* args) +{ + H264Decoder_Internal* in = (H264Decoder_Internal*)internal; + in->reset(); + + return true; +} + +void PL_H264Decoder::finit() +{ + H264Decoder_Internal* in = (H264Decoder_Internal*)internal; + +} + +SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, size_t& numSPropRecords) { + // Make a copy of the input string, so we can replace the commas with '\0's: + char* inStr = strDup(sPropParameterSetsStr); + if (inStr == NULL) { + numSPropRecords = 0; + return NULL; + } + + // Count the number of commas (and thus the number of parameter sets): + numSPropRecords = 1; + char* s; + for (s = inStr; *s != '\0'; ++s) { + if (*s == ',') { + ++numSPropRecords; + *s = '\0'; + } + } + + // Allocate and fill in the result array: + SPropRecord* resultArray = new SPropRecord[numSPropRecords]; + s = inStr; + for (unsigned i = 0; i < numSPropRecords; ++i) { + resultArray[i].sPropBytes = new uint8_t[256]; + + size_t sPropLength = 0; + base64_decode(s, strlen(s), (char*)resultArray[i].sPropBytes, &sPropLength, 0); + resultArray[i].sPropLength = sPropLength; + + s += strlen(s) + 1; + } + + delete[] inStr; + return resultArray; +} + +bool initH264DecoderEnv(H264Decoder_Internal* in, + uint8_t* sps, size_t spsSize, uint8_t* pps, size_t ppsSize) +{ + av_register_all(); + + // find the video encoder + AVCodec* avCodec = avcodec_find_decoder(AV_CODEC_ID_H264); + + if (!avCodec) + { + printf("codec not found!\n"); + return false; + } + + in->pAVCodecContext = avcodec_alloc_context3(avCodec); + + in->pAVCodecContext->time_base.num = 1; + in->pAVCodecContext->frame_number = 1; + in->pAVCodecContext->codec_type = AVMEDIA_TYPE_VIDEO; + in->pAVCodecContext->bit_rate = 0; + in->pAVCodecContext->time_base.den = 25; + in->pAVCodecContext->width = 1920; + in->pAVCodecContext->height = 1080; + + if (in->pAVCodecContext->extradata == NULL) + { + int totalsize = 0; + unsigned char* tmp = NULL; + unsigned char nalu_header[4] = { 0, 0, 0, 1 }; + + totalsize = 8 + spsSize + ppsSize; + + tmp = new unsigned char[totalsize]; + memcpy(tmp, nalu_header, 4); + memcpy(tmp + 4, sps, spsSize); + memcpy(tmp + 4 + spsSize, nalu_header, 4); + memcpy(tmp + 4 + spsSize + 4, pps, ppsSize); + + in->pAVCodecContext->extradata_size = totalsize; + + in->pAVCodecContext->extradata = tmp; + } + + if(avcodec_open2(in->pAVCodecContext, avCodec, NULL) >= 0) + in->pAVFrame = av_frame_alloc();// Allocate video frame + else + return false; + + return true; +} + +bool decodeH264(H264Decoder_Internal* in, uint8_t* buffer, size_t buffSize) +{ + AVPacket packet = {0}; + int frameFinished = buffSize; + + if (av_packet_from_data(&packet, buffer, buffSize) != 0) + { + printf("av_packet_from_data error\n"); + return false; + } + + // decode + avcodec_decode_video2(in->pAVCodecContext, in->pAVFrame, &frameFinished, &packet); + if(frameFinished) + { + // decode ok + + int picSize = in->pAVCodecContext->height * in->pAVCodecContext->width; + in->buffSize = picSize * 1.5; + + int height = in->pAVFrame->height; + int width = in->pAVFrame->width; + + // write yuv420 + int a=0; + for (int i = 0; i < height; i++) + { + memcpy(in->buffer + a, in->pAVFrame->data[0] + i * in->pAVFrame->linesize[0], width); + a += width; + } + for (int i=0; i<height/2; i++) + { + memcpy(in->buffer + a, in->pAVFrame->data[1] + i * in->pAVFrame->linesize[1], width / 2); + a += width / 2; + } + for (int i=0; i<height/2; i++) + { + memcpy(in->buffer + a, in->pAVFrame->data[2] + i * in->pAVFrame->linesize[2], width / 2); + a += width / 2; + } + + //in->buffer readly + + //static size_t f=0; + //char fname[50]; + //sprintf(fname, "%u.yuv420", ++f); + //FILE * pFile = fopen (fname,"wb"); + //fwrite (in->buffer , sizeof(char), in->buffSize, pFile); + //fclose(pFile); + } + else + printf("incomplete frame\n"); +} + +bool PL_H264Decoder::pay(const PipeMaterial& pm) +{ + H264Decoder_Internal* in = (H264Decoder_Internal*)internal; + + if (!in->fmtp_set_to_context) + { + if (manager == NULL) + return false; + + std::string fmtp(manager->get_global_param(PLGP_RTSP_FMTP)); + if (fmtp.empty()) + return false; + + size_t numSPropRecords = 0; + SPropRecord *p_record = parseSPropParameterSets(fmtp.c_str(), numSPropRecords); + if (numSPropRecords < 2) + return false;//#todo log + + SPropRecord &sps = p_record[0]; + SPropRecord &pps = p_record[1]; + + bool ret = initH264DecoderEnv(in, sps.sPropBytes, sps.sPropLength, pps.sPropBytes, pps.sPropLength); + if (!ret) + return false; // #todo log + else + in->fmtp_set_to_context = true; + } + + in->payError = decodeH264(in, pm.buffer, pm.buffSize); + return in->payError; +} + +bool PL_H264Decoder::gain(PipeMaterial& pm) +{ + H264Decoder_Internal* in = (H264Decoder_Internal*)internal; + + if (!in->payError) + { + pm.buffer = in->buffer; + pm.buffSize = in->buffSize; + } + pm.former = this; + return in->payError; +} diff --git a/RtspFace/PL_H264Decoder.h b/RtspFace/PL_H264Decoder.h new file mode 100644 index 0000000..c4770a9 --- /dev/null +++ b/RtspFace/PL_H264Decoder.h @@ -0,0 +1,24 @@ +#ifndef _PL_H264DECODER_H_ +#define _PL_H264DECODER_H_ + +#include "PipeLine.h" + +class PL_H264Decoder : public PipeLineElem +{ +public: + PL_H264Decoder(); + virtual ~PL_H264Decoder(); + + virtual bool init(void* args); + virtual void finit(); + + virtual bool pay(const PipeMaterial& pm); + virtual bool gain(PipeMaterial& pm); + +private: + void* internal; +}; + +PipeLineElem* create_PL_H264Decoder(); + +#endif diff --git a/RtspFace/PL_RTSPClient.cpp b/RtspFace/PL_RTSPClient.cpp new file mode 100644 index 0000000..30c18fd --- /dev/null +++ b/RtspFace/PL_RTSPClient.cpp @@ -0,0 +1,242 @@ +#include "PL_RTSPClient.h" +#include <pthread.h> + +void rtsp_client_sdp_callback(void* arg, const char* val); +void rtsp_client_fmtp_callback(void* arg, const char* val); +void rtsp_client_frame_callback(void* arg, uint8_t* buffer, size_t buffSize); +void rtsp_client_continue_callback(void* arg); +#include "RTSPClient.hpp" + +struct RTSPClient_Internal +{ + PL_RTSPClient* client; + RTSPConfig rtspConfig; + pthread_t live_daemon_thid; + char eventLoopWatchVariable; + bool live_daemon_running; + pthread_mutex_t* frame_mutex; + pthread_mutex_t* continue_mutex; + + uint8_t* lastBuffer; + size_t lastBuffSize; + + RTSPClient_Internal() : + client(nullptr), rtspConfig(), live_daemon_thid(0), + eventLoopWatchVariable(0), live_daemon_running(false), + frame_mutex(new pthread_mutex_t), continue_mutex(new pthread_mutex_t), + lastBuffer(nullptr), lastBuffSize(0) + { + pthread_mutex_init(frame_mutex, NULL); + pthread_mutex_init(continue_mutex, NULL); + } + + ~RTSPClient_Internal() + { + if (frame_mutex != nullptr) + { + pthread_mutex_destroy(frame_mutex); + delete frame_mutex; + } + + if (continue_mutex != nullptr) + { + pthread_mutex_destroy(continue_mutex); + delete continue_mutex; + } + } + + void reset() + { + client = nullptr; + rtspConfig.progName = ""; + rtspConfig.rtspURL = ""; + live_daemon_thid = 0; + eventLoopWatchVariable = 0; + live_daemon_running = false; + + if (frame_mutex != nullptr) + { + pthread_mutex_destroy(frame_mutex); + delete frame_mutex; + } + + frame_mutex = new pthread_mutex_t; + pthread_mutex_init(frame_mutex, NULL); + + if (continue_mutex != nullptr) + { + pthread_mutex_destroy(continue_mutex); + delete continue_mutex; + } + + continue_mutex = new pthread_mutex_t; + pthread_mutex_init(continue_mutex, NULL); + + lastBuffer = nullptr; + lastBuffSize = 0; + } +}; + +void* live_daemon_thd(void* arg) +{ + RTSPClient_Internal* in = (RTSPClient_Internal*)arg; + + TaskScheduler* scheduler = BasicTaskScheduler::createNew(); + UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler); + + usage(*env, in->rtspConfig.progName.c_str()); + + openURL(*env, in->client, in->rtspConfig.progName.c_str(), in->rtspConfig.rtspURL.c_str()); + + in->live_daemon_running = true; + env->taskScheduler().doEventLoop(&(in->eventLoopWatchVariable)); + in->live_daemon_running = false; +} + +PipeLineElem* create_PL_RTSPClient() +{ + return new PL_RTSPClient; +} + +PL_RTSPClient::PL_RTSPClient() : internal(new RTSPClient_Internal) +{ +} + +PL_RTSPClient::~PL_RTSPClient() +{ + delete (RTSPClient_Internal*)internal; + internal= nullptr; +} + +bool PL_RTSPClient::init(void* args) +{ + if (args == nullptr) + return false; + + const RTSPConfig* config = reinterpret_cast<const RTSPConfig*>(args); + RTSPClient_Internal* in = (RTSPClient_Internal*)internal; + in->reset(); + in->client = this; + in->rtspConfig = *config; + + int ret = pthread_mutex_lock(in->frame_mutex); + if(ret != 0) + { + printf("pthread_mutex_lock frame_mutex: %s/n", strerror(ret)); + return false; + } + + ret = pthread_mutex_lock(in->continue_mutex); + if(ret != 0) + { + printf("pthread_mutex_lock continue_mutex: %s/n", strerror(ret)); + return false; + } + + ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); + if(ret != 0) + { + printf("pthread_create: %s/n", strerror(ret)); + return false; + } + + return true; +} + +void PL_RTSPClient::finit() +{ + RTSPClient_Internal* in = (RTSPClient_Internal*)internal; + + in->eventLoopWatchVariable = 1; + pthread_join(in->live_daemon_thid, NULL); +} + +bool PL_RTSPClient::pay(const PipeMaterial& pm) +{ + RTSPClient_Internal* in = (RTSPClient_Internal*)internal; + return in->live_daemon_running; +} + +bool PL_RTSPClient::gain(PipeMaterial& pm) +{ + RTSPClient_Internal* in = (RTSPClient_Internal*)internal; + + int ret = pthread_mutex_unlock(in->continue_mutex); + if(ret != 0) + { + printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret)); + return false; + } + + ret = pthread_mutex_lock(in->frame_mutex); + if(ret != 0) + { + printf("pthread_mutex_lock: %s/n", strerror(ret)); + return false; + } + + pm.buffer = in->lastBuffer; + pm.buffSize = in->lastBuffSize; + pm.former = this; + + return true; +} + +void rtsp_client_sdp_callback(void* arg, const char* val) +{ + if (arg == nullptr || val == nullptr) + return; + + PL_RTSPClient* client = (PL_RTSPClient*)arg; + + if (client->manager == nullptr) + return; + + client->manager->set_global_param(PLGP_RTSP_SDP, val); +} + +void rtsp_client_fmtp_callback(void* arg, const char* val) +{ + if (arg == nullptr || val == nullptr) + return; + + PL_RTSPClient* client = (PL_RTSPClient*)arg; + + if (client->manager == nullptr) + return; + + client->manager->set_global_param(PLGP_RTSP_FMTP, val); +} + +void rtsp_client_frame_callback(void* arg, uint8_t* buffer, size_t buffSize) +{ + if (arg == nullptr || buffer == nullptr || buffSize == 0) + return; + + PL_RTSPClient* client = (PL_RTSPClient*)arg; + RTSPClient_Internal* in = (RTSPClient_Internal*)(client->internal); + + in->lastBuffer = buffer; + in->lastBuffSize = buffSize; + + int ret = pthread_mutex_unlock(in->frame_mutex); + if(ret != 0) + { + printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret)); + } +} + +void rtsp_client_continue_callback(void* arg) +{ + if (arg == nullptr) + return; + + PL_RTSPClient* client = (PL_RTSPClient*)arg; + RTSPClient_Internal* in = (RTSPClient_Internal*)(client->internal); + + int ret = pthread_mutex_lock(in->continue_mutex); + if(ret != 0) + { + printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret)); + } +} diff --git a/RtspFace/PL_RTSPClient.h b/RtspFace/PL_RTSPClient.h new file mode 100644 index 0000000..b5367fb --- /dev/null +++ b/RtspFace/PL_RTSPClient.h @@ -0,0 +1,36 @@ +#ifndef _PL_RTSPCLIENT_H_ +#define _PL_RTSPCLIENT_H_ + +#include "PipeLine.h" +#include <string> + +struct RTSPConfig +{ + std::string progName; + std::string rtspURL; + + RTSPConfig() : progName(), rtspURL() { } +}; + +class PL_RTSPClient : public PipeLineElem +{ + friend void rtsp_client_frame_callback(void* arg, uint8_t* buffer, size_t buffSize); + friend void rtsp_client_continue_callback(void* arg); + +public: + PL_RTSPClient(); + virtual ~PL_RTSPClient(); + + virtual bool init(void* args); + virtual void finit(); + + virtual bool pay(const PipeMaterial& pm); + virtual bool gain(PipeMaterial& pm); + +private: + void* internal; +}; + +PipeLineElem* create_PL_RTSPClient(); + +#endif diff --git a/RtspFace/PipeLine.cpp b/RtspFace/PipeLine.cpp new file mode 100644 index 0000000..6f53b19 --- /dev/null +++ b/RtspFace/PipeLine.cpp @@ -0,0 +1,131 @@ +#include "PipeLine.h" + +PipeMaterial::PipeMaterial() : buffer(nullptr), buffSize(0), former(nullptr) +{ +} + +PipeLine::PipeLine() : global_params_map(), elem_create_func_map(), elems() +{ +} + +PipeLine::~PipeLine() +{ + // pipe stop + + for(elem_vec_t::iterator iter = elems.begin(); iter != elems.end(); ++iter) + { + PipeLineElem* elem = *iter; + if (elem != nullptr) + { + elem->finit(); + delete *iter; + } + } + + elems.clear(); +} + +bool PipeLine::register_elem_creator(const std::string& type, elem_create_func_t func) +{ + if (type.empty() || func == nullptr) + return false; + + elem_create_func_map_t::iterator iter = elem_create_func_map.find(type); + if (iter != elem_create_func_map.end()) + return false; + + elem_create_func_map.insert(std::make_pair(type, func)); + return true; +} + +void PipeLine::push_elem(PipeLineElem* elem) +{ + if(elem != nullptr) + { + elem->manager = this; + elems.push_back(elem); + } +} + +PipeLineElem* PipeLine::push_elem(const std::string& type) +{ + elem_create_func_map_t::iterator iter = elem_create_func_map.find(type); + if (iter == elem_create_func_map.end()) + return nullptr; + + elem_create_func_t func = iter->second; + if (func == nullptr) + return nullptr; + + PipeLineElem* elem = func(); + if (elem == nullptr) + return nullptr; + + elem->manager = this; + push_elem(elem); + return elem; +} + +PipeLineElem* PipeLine::pipe(PipeMaterial* pm /*= nullptr*/) +{ + PipeLineElem* elem_begin = *elems.begin(); + PipeLineElem* elem_last = *elems.rbegin(); + + if (elems.empty() || elem_begin == nullptr || elem_last == nullptr) + return nullptr; + + uint8_t pmPlacement[sizeof(PipeMaterial)]; + if (pm == nullptr) + pm = new (pmPlacement) PipeMaterial; + + if (elems.size() == 1) + { + elem_begin->gain(*pm); + return elem_begin; + } + else if (elems.size() == 2) + { + elem_begin->gain(*pm); + elem_last->pay(*pm); + return elem_last; + } + else + { + elem_begin->gain(*pm); + + elem_vec_t::iterator iter = elems.begin(); + while (elem_begin != elem_last) + { + ++iter; + elem_begin = *iter; + elem_begin->pay(*pm); + elem_begin->gain(*pm); + } + + elem_last->pay(*pm); + return elem_last; + } + + return nullptr; +} + +void PipeLine::set_global_param(const std::string& name, const std::string& value) +{ + if (name.empty()) + return; + + global_params_map_t::iterator iter = global_params_map.find(name); + if (iter == global_params_map.end()) + global_params_map.insert(std::make_pair(name, value)); + else + iter->second = value; +} + +std::string PipeLine::get_global_param(const std::string& name) const +{ + global_params_map_t::const_iterator iter = global_params_map.find(name); + if (iter == global_params_map.end()) + return ""; + else + return iter->second; +} diff --git a/RtspFace/PipeLine.h b/RtspFace/PipeLine.h new file mode 100644 index 0000000..898e1f6 --- /dev/null +++ b/RtspFace/PipeLine.h @@ -0,0 +1,81 @@ +#ifndef _PIPELINE_H_ +#define _PIPELINE_H_ + +#include <string> +#include <stdint.h> +#include <map> +#include <vector> + +#define PLGP_RTSP_SDP "RTSP_SDP" +#define PLGP_RTSP_FMTP "RTSP_FMTP" + +class PipeLineElem; +class PipeLine; + +struct PipeMaterial +{ + uint8_t* buffer; + size_t buffSize; + PipeLineElem* former; + + PipeMaterial(); +}; + +class PipeLineElem +{ +public: + PipeLineElem() : manager(nullptr) { } + virtual ~PipeLineElem() { } + + virtual bool init(void* args) = 0; + virtual void finit() = 0; + + // buffer: delete it who create it + virtual bool pay(const PipeMaterial& pm) = 0; + virtual bool gain(PipeMaterial& pm) = 0; + +public: + PipeLine* manager; +}; + +typedef PipeLineElem* (*elem_create_func_t)(); + +// 0 (there is no elem). do nothing +// 1 (there is one elem). gain +// 2 (there is two elems). gain --> pay +// 3 (there is more than two elems). gain --> pay gain --> pay gain --> ... --> pay +class PipeLine +{ +public: + PipeLine(); + + // stop and delete all managed elements + ~PipeLine(); + + bool register_elem_creator(const std::string& type, elem_create_func_t func); + void push_elem(PipeLineElem* elem); + PipeLineElem* push_elem(const std::string& type); + + // do pipe sync. returns the element who returns false, or the last one. + PipeLineElem* pipe(PipeMaterial* pm = nullptr); + + // do pipe async + void pipe_start(); + void pipe_notify(PipeLineElem*); + void pipe_stop(); + + void set_global_param(const std::string& name, const std::string& value); + std::string get_global_param(const std::string& name) const; + +private: + typedef std::map<const std::string, elem_create_func_t> elem_create_func_map_t; + elem_create_func_map_t elem_create_func_map; + + typedef std::vector<PipeLineElem*> elem_vec_t; + elem_vec_t elems; + + typedef std::map<const std::string, std::string> global_params_map_t; + global_params_map_t global_params_map; +}; + +#endif diff --git a/RtspFace/RTSPClient.cpp b/RtspFace/RTSPClient.hpp similarity index 74% rename from RtspFace/RTSPClient.cpp rename to RtspFace/RTSPClient.hpp index a306d89..78088ef 100644 --- a/RtspFace/RTSPClient.cpp +++ b/RtspFace/RTSPClient.hpp @@ -24,10 +24,6 @@ #include "BasicUsageEnvironment.hh" #include <iostream> -#include <libbase64.h> - -bool initH264DecoderEnv(uint8_t* sps, size_t spsSize, uint8_t* pps, size_t ppsSize); -int decodeH264(uint8_t* pBuffer, int dwBufsize, const char *outfile) ; // Forward function definitions: @@ -43,7 +39,7 @@ // called at the end of a stream's expected duration (if the stream has not already signaled its end using a RTCP "BYE") // The main streaming routine (for each "rtsp://" URL): -void openURL(UsageEnvironment& env, char const* progName, char const* rtspURL); +void openURL(UsageEnvironment& env, void* args, char const* progName, char const* rtspURL); // Used to iterate through each stream's 'subsessions', setting up each one: void setupNextSubsession(RTSPClient* rtspClient); @@ -68,7 +64,7 @@ char eventLoopWatchVariable = 0; -int main(int argc, char** argv) { +int test_main(int argc, char** argv) { // Begin by setting up our usage environment: TaskScheduler* scheduler = BasicTaskScheduler::createNew(); UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler); @@ -81,7 +77,7 @@ // There are argc-1 URLs: argv[1] through argv[argc-1]. Open and start streaming each one: for (int i = 1; i <= argc-1; ++i) { - openURL(*env, argv[0], argv[i]); + openURL(*env, NULL, argv[0], argv[i]); } // All subsequent activity takes place within the event loop: @@ -134,6 +130,7 @@ public: StreamClientState scs; + void* args; }; // Define a data sink (a subclass of "MediaSink") to receive the data for each subsession (i.e., each audio or video 'substream'). @@ -141,39 +138,44 @@ // Or it might be a "FileSink", for outputting the received data into a file (as is done by the "openRTSP" application). // In this example code, however, we define a simple 'dummy' sink that receives incoming data, but does nothing with it. -class DummySink: public MediaSink { +class DummySink: public MediaSink +{ public: - static DummySink* createNew(UsageEnvironment& env, - MediaSubsession& subsession, // identifies the kind of data that's being received - char const* streamId = NULL); // identifies the stream itself (optional) + static DummySink* createNew(UsageEnvironment& env, + void* _args, + MediaSubsession& subsession, // identifies the kind of data that's being received + char const* streamId = NULL); // identifies the stream itself (optional) private: - DummySink(UsageEnvironment& env, MediaSubsession& subsession, char const* streamId); - // called only by "createNew()" - virtual ~DummySink(); + DummySink(UsageEnvironment& env, void* _args, MediaSubsession& subsession, char const* streamId); + // called only by "createNew()" + virtual ~DummySink(); - static void afterGettingFrame(void* clientData, unsigned frameSize, - unsigned numTruncatedBytes, + static void afterGettingFrame(void* clientData, unsigned frameSize, + unsigned numTruncatedBytes, struct timeval presentationTime, - unsigned durationInMicroseconds); - void afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, + unsigned durationInMicroseconds); + void afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds); -private: - // redefined virtual functions: - virtual Boolean continuePlaying(); +public: + void* args; private: - u_int8_t* fReceiveBuffer; - MediaSubsession& fSubsession; - char* fStreamId; + // redefined virtual functions: + virtual Boolean continuePlaying(); + +private: + u_int8_t* fReceiveBuffer; + MediaSubsession& fSubsession; + char* fStreamId; }; #define RTSP_CLIENT_VERBOSITY_LEVEL 1 // by default, print verbose output from each "RTSPClient" static unsigned rtspClientCount = 0; // Counts how many streams (i.e., "RTSPClient"s) are currently in use. -void openURL(UsageEnvironment& env, char const* progName, char const* rtspURL) { +void openURL(UsageEnvironment& env, void* args, char const* progName, char const* rtspURL) { // Begin by creating a "RTSPClient" object. Note that there is a separate "RTSPClient" object for each stream that we wish // to receive (even if more than stream uses the same "rtsp://" URL). RTSPClient* rtspClient = ourRTSPClient::createNew(env, rtspURL, RTSP_CLIENT_VERBOSITY_LEVEL, progName); @@ -181,6 +183,8 @@ env << "Failed to create a RTSP client for URL \"" << rtspURL << "\": " << env.getResultMsg() << "\n"; return; } + + ((ourRTSPClient*)rtspClient)->args = args; ++rtspClientCount; @@ -290,7 +294,9 @@ // (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later, // after we've sent a RTSP "PLAY" command.) - scs.subsession->sink = DummySink::createNew(env, *scs.subsession, rtspClient->url()); + DummySink* mySink; + scs.subsession->sink = mySink = DummySink::createNew(env, ((ourRTSPClient*)rtspClient)->args, + *scs.subsession, rtspClient->url()); // perhaps use your own custom "MediaSink" subclass instead if (scs.subsession->sink == NULL) { env << *rtspClient << "Failed to create a data sink for the \"" << *scs.subsession @@ -447,7 +453,9 @@ ourRTSPClient::ourRTSPClient(UsageEnvironment& env, char const* rtspURL, int verbosityLevel, char const* applicationName, portNumBits tunnelOverHTTPPortNum) - : RTSPClient(env,rtspURL, verbosityLevel, applicationName, tunnelOverHTTPPortNum, -1) { + : RTSPClient(env,rtspURL, verbosityLevel, applicationName, tunnelOverHTTPPortNum, -1), + args(nullptr) +{ } ourRTSPClient::~ourRTSPClient() { @@ -471,70 +479,33 @@ } } -SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr, - // result parameter: - size_t& numSPropRecords) { - // Make a copy of the input string, so we can replace the commas with '\0's: - char* inStr = strDup(sPropParameterSetsStr); - if (inStr == NULL) { - numSPropRecords = 0; - return NULL; - } - - // Count the number of commas (and thus the number of parameter sets): - numSPropRecords = 1; - char* s; - for (s = inStr; *s != '\0'; ++s) { - if (*s == ',') { - ++numSPropRecords; - *s = '\0'; - } - } - - // Allocate and fill in the result array: - SPropRecord* resultArray = new SPropRecord[numSPropRecords]; //****** 鐪嬪埌 杩欓噷浜� 鎶� *******/ - s = inStr; - for (unsigned i = 0; i < numSPropRecords; ++i) { - resultArray[i].sPropBytes = new uint8_t[256]; - - size_t sPropLength = 0; - base64_decode(s, strlen(s), (char*)resultArray[i].sPropBytes, &sPropLength, 0); - resultArray[i].sPropLength = sPropLength; - - s += strlen(s) + 1; - } - - delete[] inStr; - return resultArray; -} - // Implementation of "DummySink": // Even though we're not going to be doing anything with the incoming data, we still need to receive it. // Define the size of the buffer that we'll use: -#define DUMMY_SINK_RECEIVE_BUFFER_SIZE 100000 +#define DUMMY_SINK_RECEIVE_BUFFER_SIZE 1920*1080*3 -DummySink* DummySink::createNew(UsageEnvironment& env, MediaSubsession& subsession, char const* streamId) { - return new DummySink(env, subsession, streamId); +DummySink* DummySink::createNew(UsageEnvironment& env, void* _args, MediaSubsession& subsession, char const* streamId) +{ + return new DummySink(env, _args, subsession, streamId); } -DummySink::DummySink(UsageEnvironment& env, MediaSubsession& subsession, char const* streamId) - : MediaSink(env), - fSubsession(subsession) { - fStreamId = strDup(streamId); - fReceiveBuffer = new u_int8_t[DUMMY_SINK_RECEIVE_BUFFER_SIZE]; - - //parse sdp - //const char* strSDP = fSubsession.savedSDPLines(); - const char* strFmtp = fSubsession.fmtp_spropparametersets(); - //std::cout << strFmtp << std::endl; - - size_t numSPropRecords = 0; - SPropRecord *p_record = parseSPropParameterSets(fSubsession.fmtp_spropparametersets(), numSPropRecords); - SPropRecord &sps = p_record[0]; - SPropRecord &pps = p_record[1]; +DummySink::DummySink(UsageEnvironment& env, void* _args, MediaSubsession& subsession, char const* streamId) + : MediaSink(env), args(_args), fSubsession(subsession) +{ + fStreamId = strDup(streamId); + fReceiveBuffer = new u_int8_t[DUMMY_SINK_RECEIVE_BUFFER_SIZE]; + + // ffmpeg need AUX header + fReceiveBuffer[0]=0x00; fReceiveBuffer[1]=0x00; fReceiveBuffer[2]=0x00; fReceiveBuffer[3]=0x01; + + //parse sdp + const char* strSDP = fSubsession.savedSDPLines(); + rtsp_client_sdp_callback(args, strSDP); - initH264DecoderEnv(sps.sPropBytes, sps.sPropLength, pps.sPropBytes, pps.sPropLength); + const char* strFmtp = fSubsession.fmtp_spropparametersets(); + rtsp_client_fmtp_callback(args, strFmtp); + //std::cout << strFmtp << std::endl; } DummySink::~DummySink() { @@ -547,7 +518,7 @@ DummySink* sink = (DummySink*)clientData; if (frameSize > 0) - decodeH264(sink->fReceiveBuffer, frameSize, NULL); + rtsp_client_frame_callback(sink->args, sink->fReceiveBuffer, frameSize + 4); sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); } @@ -581,137 +552,11 @@ Boolean DummySink::continuePlaying() { if (fSource == NULL) return False; // sanity check (should not happen) + rtsp_client_continue_callback(args); + // Request the next frame of data from our input source. "afterGettingFrame()" will get called later, when it arrives: - fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE, + fSource->getNextFrame(fReceiveBuffer + 4, DUMMY_SINK_RECEIVE_BUFFER_SIZE, afterGettingFrame, this, onSourceClosure, this); return True; -} - - -/********* - -*********/ - -extern "C" -{ - #include <libavcodec/avcodec.h> - #include <libavutil/frame.h> - #include <libavformat/avformat.h> -} - -AVCodecContext* g_pAVCodecContext = NULL; -AVFrame* g_pAVFrame = NULL; - -bool initH264DecoderEnv(uint8_t* sps, size_t spsSize, uint8_t* pps, size_t ppsSize) -{ - av_register_all(); - - // find the video encoder - AVCodec* avCodec = avcodec_find_decoder(AV_CODEC_ID_H264); - - if (!avCodec) - { - printf("codec not found!\n"); - return -1; - } - - g_pAVCodecContext = avcodec_alloc_context3(avCodec); - - //鍒濆鍖栧弬鏁帮紝涓嬮潰鐨勫弬鏁板簲璇ョ敱鍏蜂綋鐨勪笟鍔″喅瀹� - g_pAVCodecContext->time_base.num = 1; - g_pAVCodecContext->frame_number = 1; //姣忓寘涓�涓棰戝抚 - g_pAVCodecContext->codec_type = AVMEDIA_TYPE_VIDEO; - g_pAVCodecContext->bit_rate = 0; - g_pAVCodecContext->time_base.den = 25; - g_pAVCodecContext->width = 1920; - g_pAVCodecContext->height = 1080; - - if (g_pAVCodecContext->extradata == NULL) - { - int totalsize = 0; - unsigned char* tmp = NULL; - unsigned char nalu_header[4] = { 0, 0, 0, 1 }; - - totalsize = 8 + spsSize + ppsSize; - - tmp = new unsigned char[totalsize]; - memcpy(tmp, nalu_header, 4); - memcpy(tmp + 4, sps, spsSize); - memcpy(tmp + 4 + spsSize, nalu_header, 4); - memcpy(tmp + 4 + spsSize + 4, pps, ppsSize); - - g_pAVCodecContext->extradata_size = totalsize; // g_pAVCodecContext 涓烘垜瑙g爜鏃跺�欎娇鐢ㄧ殑涓婁笅鏂� - - g_pAVCodecContext->extradata = tmp; - } - - if(avcodec_open2(g_pAVCodecContext, avCodec, NULL) >= 0) - g_pAVFrame = av_frame_alloc();// Allocate video frame - else - return false; - - return true; -} - -int decodeH264(uint8_t* pBuffer, int dwBufsize, const char *outfile) -{ - AVPacket packet = {0}; - int frameFinished = dwBufsize;//杩欎釜鏄殢渚垮~鍏ユ暟瀛楋紝娌′粈涔堜綔鐢� - - uint8_t newBuff[dwBufsize+4]; - newBuff[0]=0x00; newBuff[1]=0x00; newBuff[2]=0x00; newBuff[3]=0x01; - memcpy(newBuff + 4, pBuffer, dwBufsize); - - //packet.data = pBuffer;//杩欓噷濉叆涓�涓寚鍚戝畬鏁碒264鏁版嵁甯х殑鎸囬拡 - //packet.size = dwBufsize;//杩欎釜濉叆H264鏁版嵁甯х殑澶у皬 - - if (av_packet_from_data(&packet, newBuff, dwBufsize + 4) != 0){ - printf("exchange data failed!\n"); - } - - //涓嬮潰寮�濮嬬湡姝g殑瑙g爜 - avcodec_decode_video2(g_pAVCodecContext, g_pAVFrame, &frameFinished, &packet); - if(frameFinished)//鎴愬姛瑙g爜 - { - int picSize = g_pAVCodecContext->height * g_pAVCodecContext->width; - int newSize = picSize * 1.5; - - //鐢宠鍐呭瓨 - uint8_t *buff = new uint8_t[newSize]; - - int height = g_pAVFrame->height; - int width = g_pAVFrame->width; - - //鍐欏叆鏁版嵁 - int a=0; - for (int i=0; i<height; i++) - { - memcpy(buff+a,g_pAVFrame->data[0] + i * g_pAVFrame->linesize[0], width); - a+=width; - } - for (int i=0; i<height/2; i++) - { - memcpy(buff+a,g_pAVFrame->data[1] + i * g_pAVFrame->linesize[1], width/2); - a+=width/2; - } - for (int i=0; i<height/2; i++) - { - memcpy(buff+a,g_pAVFrame->data[2] + i * g_pAVFrame->linesize[2], width/2); - a+=width/2; - } - - //buff readly - - //static size_t f=0; - //char fname[50]; - //sprintf(fname, "%u.yuv420", ++f); - //FILE * pFile = fopen (fname,"wb"); - //fwrite (buff , sizeof(char), newSize, pFile); - //fclose(pFile); - - delete[] buff; - } - else - printf("incomplete frame\n"); } diff --git a/RtspFace/main.cpp b/RtspFace/main.cpp new file mode 100644 index 0000000..e9c6a48 --- /dev/null +++ b/RtspFace/main.cpp @@ -0,0 +1,40 @@ +#include "PipeLine.h" +#include "PL_RTSPClient.h" +#include "PL_H264Decoder.h" +#include "PL_AVFrameYUV420.h" + +#include <iostream> +using namespace std; + +int main(int argc, char** argv) +{ + PipeLine pipeLine; + + pipeLine.register_elem_creator("PL_RTSPClient", create_PL_RTSPClient); + pipeLine.register_elem_creator("PL_H264Decoder", create_PL_H264Decoder); + pipeLine.register_elem_creator("PL_AVFrameYUV420", create_PL_H264Decoder); + + PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient"); + RTSPConfig rtspConfig; + rtspConfig.progName = argv[0]; + rtspConfig.rtspURL = argv[1]; + bool ret = rtspClient->init(&rtspConfig); + if (!ret) + { + cout << "rtspClient.init error" << endl; + exit(EXIT_FAILURE); + } + + PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder"); + h264Decoder->init(nullptr); + + PL_AVFrameYUV420* AVFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420"); + AVFrameYUV420->init(nullptr); + + while(true) + { + //cout << "begin pipe" << endl; + pipeLine.pipe(); + //cout << "end pipe" << endl; + } +} diff --git a/RtspFace/make.sh b/RtspFace/make.sh index fbe9d62..3568267 100644 --- a/RtspFace/make.sh +++ b/RtspFace/make.sh @@ -10,14 +10,19 @@ LIBBASE64_INC="-I$LIBBASE64_BASE/include" LIBBASE64_LIB="$LIBBASE64_BASE/lib/libbase64.o" -CPPFLAGS+="$LIVEMEDIA_INC $FFMPEG_INC $LIBBASE64_INC" -LDFLAGS+="$LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB" +CPPFLAGS+="-pthread $LIVEMEDIA_INC $FFMPEG_INC $LIBBASE64_INC" +LDFLAGS+="-pthread $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB" CFLAGS+="-D__STDC_CONSTANT_MACROS" rm rtsp_face +rm *.o -g++ -g RTSPClient.cpp $CFLAGS $CPPFLAGS $LDFLAGS -o rtsp_face +g++ -g -c -std=c++11 main.cpp $CFLAGS $CPPFLAGS +g++ -g -c -std=c++11 PL_RTSPClient.cpp $CFLAGS $CPPFLAGS +g++ -g -c -std=c++11 PL_H264Decoder.cpp $CFLAGS $CPPFLAGS +g++ -g -c -std=c++11 PipeLine.cpp $CFLAGS $CPPFLAGS +g++ -g -std=c++11 main.o PL_RTSPClient.o PL_H264Decoder.o PipeLine.o $LDFLAGS -o rtsp_face #export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$FFMPEG_BASE/lib #./rtsp_face rtsp://admin:admin12345@192.168.1.63:554/h264/ch1/main/av_stream -- Gitblit v1.8.0