From 663104b9be90ed303b87c8acddac8421583a9e39 Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期三, 16 八月 2017 12:38:59 +0800 Subject: [PATCH] aaaaa --- RtspFace/PL_RTSPServer2.cpp | 76 ++++++++++++++++++++++++++++++-------- 1 files changed, 60 insertions(+), 16 deletions(-) diff --git a/RtspFace/PL_RTSPServer2.cpp b/RtspFace/PL_RTSPServer2.cpp index bd98fa4..adc8d6c 100644 --- a/RtspFace/PL_RTSPServer2.cpp +++ b/RtspFace/PL_RTSPServer2.cpp @@ -24,6 +24,7 @@ PreAllocBufferQueue* frameQueue; pthread_mutex_t* queue_mutex; pthread_mutex_t* queue_empty_mutex; + pthread_mutex_t* queue_full_mutex; bool auxLineSet; @@ -31,10 +32,9 @@ config(), live_daemon_thid(0), live_daemon_running(false), server(nullptr), - frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config + frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config auxLineSet(false) { - pthread_mutex_init(queue_mutex, NULL); } ~RTSPServer2_Internal() @@ -59,7 +59,6 @@ delete queue_mutex; queue_mutex = nullptr; } - queue_mutex = new pthread_mutex_t; pthread_mutex_init(queue_mutex, NULL); @@ -69,9 +68,17 @@ delete queue_empty_mutex; queue_empty_mutex = nullptr; } - queue_empty_mutex = new pthread_mutex_t; pthread_mutex_init(queue_empty_mutex, NULL); + + if (queue_full_mutex != nullptr) + { + pthread_mutex_destroy(queue_full_mutex); + delete queue_full_mutex; + queue_full_mutex = nullptr; + } + queue_full_mutex = new pthread_mutex_t; + pthread_mutex_init(queue_full_mutex, NULL); live_daemon_thid = 0; live_daemon_running = false; @@ -120,19 +127,28 @@ { DeliverFrameCallback* _this = (DeliverFrameCallback*)args; + if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full()) + { + int ret = pthread_mutex_unlock(_this->in->queue_full_mutex); + if (ret != 0) + { + LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL; + } + } + if (_this->in->frameQueue->Empty()) { int ret = pthread_mutex_lock(_this->in->queue_empty_mutex); if (ret != 0) { - LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl; + LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL; } } int ret = pthread_mutex_lock(_this->in->queue_empty_mutex); if (ret != 0) { - LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl; + LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL; } ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex); @@ -150,11 +166,13 @@ buffer = _this->lastBuffer->buffer; buffSize = _this->lastBuffer->buffSize; + LOG_WARN << "sizeS=" << buffSize << LOG_ENDL; //LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL; //static size_t f = 0; //static FILE *pFile = fopen("/data/bb.264", "wb"); //fwrite(buffer, sizeof(char), buffSize, pFile); + //fflush(pFile); //if (++f > 30){ // fclose(pFile); // exit(0); @@ -212,14 +230,14 @@ qcfg.multithreadSafe = false; qcfg.fullQueueDropFront = true; qcfg.fullQueueSync = false; - qcfg.count = 32; - qcfg.maxBuffSize = 200000; + qcfg.count = 20; + qcfg.maxBuffSize = 524288; // 512KB in->frameQueue = new PreAllocBufferQueue(qcfg); int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); if(ret != 0) { - LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl; + LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL; return false; } @@ -242,7 +260,7 @@ if (pm.type != PipeMaterial::PMT_FRAME) { - LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << std::endl; + LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL; return false; } @@ -263,28 +281,48 @@ } } + while (in->config.payBlockFullQueue && in->frameQueue->Full()) + { + int ret = pthread_mutex_lock(in->queue_full_mutex); + if (ret != 0) + { + LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL; + } + + //if (in->frameQueue->Full()) + //{ + // LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL; + // return false; + //} + } + MB_Frame* frame = (MB_Frame*)pm.buffer; if (frame->buffer == nullptr || frame->buffSize == 0) return false; + LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL; + ScopeLocker<pthread_mutex_t>(in->queue_mutex); //if (in->frameQueue->Full()) - // LOG_WARN << "PL_RTSPServer2::pay may lost data" << std::endl; + // LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL; PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue(); if (qbuff == nullptr) { - LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << std::endl; + LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL; int ret = pthread_mutex_unlock(in->queue_empty_mutex); if (ret != 0) { - LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl; + LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL; } return false; } - memcpy(qbuff->buffer, frame->buffer, frame->buffSize);//#todo size min - qbuff->buffSize = frame->buffSize; + const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig()); + size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize); + + memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min + qbuff->buffSize = copySize; //static size_t f = 0; //static FILE *pFile = fopen("/data/aa.264", "wb"); @@ -297,8 +335,14 @@ int ret = pthread_mutex_unlock(in->queue_empty_mutex); if (ret != 0) { - LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl; + LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL; } + + if (copySize < frame->buffSize) + { + LOG_WARN << "copy frame truncated" << LOG_ENDL; + } + return true; } -- Gitblit v1.8.0