#include "PL_RTSPServer2.h" #include "MaterialBuffer.h" #include "logger.h" #include #include #include "FFmpegRTSPServer/IEncoder.h" #include "FFmpegRTSPServer/LiveRTSPServer.h" #include "FFmpegRTSPServer/H264FramedSource.h" #include "FFmpegRTSPServer/LiveServerMediaSubsession.h" #include "PreAllocBufferQueue.h" #include "MediaHelper.h" struct RTSPServer2_Internal { RTSPServer2Config config; pthread_t live_daemon_thid; bool live_daemon_running; MESAI::LiveRTSPServer* server; PreAllocBufferQueue* frameQueue; pthread_mutex_t* queue_mutex; pthread_mutex_t* queue_empty_mutex; pthread_mutex_t* queue_full_mutex; bool auxLineSet; RTSPServer2_Internal() : config(), live_daemon_thid(0), live_daemon_running(false), server(nullptr), frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config auxLineSet(false) { } ~RTSPServer2_Internal() { reset(); } void reset() { RTSPServer2Config _config; config =_config; if (frameQueue != nullptr) { delete frameQueue; frameQueue = nullptr; } if (queue_mutex != nullptr) { pthread_mutex_destroy(queue_mutex); delete queue_mutex; queue_mutex = nullptr; } queue_mutex = new pthread_mutex_t; pthread_mutex_init(queue_mutex, NULL); if (queue_empty_mutex != nullptr) { pthread_mutex_destroy(queue_empty_mutex); delete queue_empty_mutex; queue_empty_mutex = nullptr; } queue_empty_mutex = new pthread_mutex_t; pthread_mutex_init(queue_empty_mutex, NULL); if (queue_full_mutex != nullptr) { pthread_mutex_destroy(queue_full_mutex); delete queue_full_mutex; queue_full_mutex = nullptr; } queue_full_mutex = new pthread_mutex_t; pthread_mutex_init(queue_full_mutex, NULL); live_daemon_thid = 0; live_daemon_running = false; server = nullptr; //#todo delete auxLineSet = false; } }; PipeLineElem* create_PL_RTSPServer2() { return new PL_RTSPServer2; } PL_RTSPServer2::PL_RTSPServer2() : internal(new RTSPServer2_Internal) { } PL_RTSPServer2::~PL_RTSPServer2() { delete (RTSPServer2_Internal*)internal; internal = nullptr; } struct DeliverFrameCallback { RTSPServer2_Internal* in; PreAllocBufferQueue::Buffer* lastBuffer; DeliverFrameCallback(RTSPServer2_Internal* _in) : in(_in) , lastBuffer(nullptr) { } ~DeliverFrameCallback() { if (lastBuffer != nullptr) { in->frameQueue->Release(lastBuffer); lastBuffer = nullptr; } } static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts) { DeliverFrameCallback* _this = (DeliverFrameCallback*)args; if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full()) { int ret = pthread_mutex_unlock(_this->in->queue_full_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL; } } if (_this->in->frameQueue->Empty()) { int ret = pthread_mutex_lock(_this->in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL; } } int ret = pthread_mutex_lock(_this->in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL; } ScopeLocker(_this->in->queue_mutex); if (_this->lastBuffer != nullptr) { // this can not happen _this->in->frameQueue->Release(_this->lastBuffer); _this->lastBuffer = nullptr; } //#todo //find frameQueue->Seek is pps/sps // if not: send bufferred pps , return; _this->lastBuffer = _this->in->frameQueue->Dequeue(); if (_this->lastBuffer == nullptr) return false; buffer = _this->lastBuffer->buffer + 4; // #todo send nalu buffSize = _this->lastBuffer->buffSize - 4; //LOG_WARN << "sizeS=" << buffSize << LOG_ENDL; //LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL; //static size_t f = 0; //static FILE *pFile = fopen("/data/bb.264", "wb"); //fwrite(buffer, sizeof(char), buffSize, pFile); //fflush(pFile); //if (++f > 30){ // fclose(pFile); // exit(0); //} gettimeofday(&pts, NULL); return (_this->lastBuffer != nullptr); } static void releaseFrame(void* args) { DeliverFrameCallback* _this = (DeliverFrameCallback*)args; if (_this->lastBuffer != nullptr) { ScopeLocker(_this->in->queue_mutex); _this->in->frameQueue->Release(_this->lastBuffer); _this->lastBuffer = nullptr; } } }; static void* live_daemon_thd(void* arg) { RTSPServer2_Internal* in = (RTSPServer2_Internal*)arg; in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080); in->server->init(); MESAI::H264FramedSource::FrameCallbacks cbs; cbs.args = new DeliverFrameCallback(in);//#todo delete cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame; cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame; in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs); in->live_daemon_running = true; in->server->run(); // does not return //#todo delete framedSource in->live_daemon_running = false; } bool PL_RTSPServer2::init(void* args) { RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal; in->reset(); if (args) { RTSPServer2Config* config = (RTSPServer2Config*)args; in->config = *config; } PreAllocBufferQueue::Config qcfg; qcfg.multithreadSafe = false; qcfg.fullQueueDropFront = true; qcfg.fullQueueSync = false; qcfg.count = 20; qcfg.maxBuffSize = 524288; // 512KB in->frameQueue = new PreAllocBufferQueue(qcfg); int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); if(ret != 0) { LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL; return false; } return true; } void PL_RTSPServer2::finit() { RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal; pthread_join(in->live_daemon_thid, NULL); } bool PL_RTSPServer2::pay(const PipeMaterial& pm) { RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal; if (pm.buffer == nullptr) return false; if (pm.type != PipeMaterial::PMT_FRAME) { LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL; return false; } if (!in->auxLineSet) { std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64)); std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64)); if (!spsStr.empty() && !ppsStr.empty()) { MESAI::H264FramedSource* framedSource = dynamic_cast(in->server->framedSource); framedSource->spsBase64 = spsStr; framedSource->ppsBase64 = ppsStr; in->auxLineSet = true; LOG_INFO <<"sps:" << spsStr.size() << ", pps:" << ppsStr.size() << LOG_ENDL; } } //#todo // find if is pps/sps // buffer the frame into RTSPServer2_Internal while (in->config.payBlockFullQueue && in->frameQueue->Full()) { int ret = pthread_mutex_lock(in->queue_full_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL; } //if (in->frameQueue->Full()) //{ // LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL; // return false; //} } MB_Frame* frame = (MB_Frame*)pm.buffer; if (frame->buffer == nullptr || frame->buffSize == 0) return false; //LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL; ScopeLocker(in->queue_mutex); //if (in->frameQueue->Full()) // LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL; PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue(); if (qbuff == nullptr) { LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL; int ret = pthread_mutex_unlock(in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL; } return false; } const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig()); size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize); memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min qbuff->buffSize = copySize; //static size_t f = 0; //static FILE *pFile = fopen("/data/aa.264", "wb"); //fwrite(qbuff->buffer, sizeof(char), qbuff->buffSize, pFile); //fflush(pFile); //if (++f > 400){ // fclose(pFile); // exit(0); //} int ret = pthread_mutex_unlock(in->queue_empty_mutex); if (ret != 0) { LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL; } if (copySize < frame->buffSize) { LOG_WARN << "copy frame truncated" << LOG_ENDL; } return true; } bool PL_RTSPServer2::gain(PipeMaterial& pm) { RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal; pm.type = PipeMaterial::PMT_NONE; pm.buffer = nullptr; pm.buffSize = 0; pm.former = this; return true; }