From 5cf652629fb40796cd2e0ab17c3617ed52365473 Mon Sep 17 00:00:00 2001
From: pans <pans@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期五, 18 八月 2017 10:32:05 +0800
Subject: [PATCH] capnp client fix bug
---
RtspFace/PL_RTSPServer2.cpp | 147 +++++++++++++++++++++++++++++++++++--------------
1 files changed, 105 insertions(+), 42 deletions(-)
diff --git a/RtspFace/PL_RTSPServer2.cpp b/RtspFace/PL_RTSPServer2.cpp
index bda31d5..0506315 100644
--- a/RtspFace/PL_RTSPServer2.cpp
+++ b/RtspFace/PL_RTSPServer2.cpp
@@ -1,4 +1,4 @@
-#include "PL_RTSPServer.h"
+#include "PL_RTSPServer2.h"
#include "MaterialBuffer.h"
#include "logger.h"
@@ -12,9 +12,9 @@
#include "PreAllocBufferQueue.h"
#include "MediaHelper.h"
-struct RTSPServer_Internal
+struct RTSPServer2_Internal
{
- RTSPServerConfig config;
+ RTSPServer2Config config;
pthread_t live_daemon_thid;
bool live_daemon_running;
@@ -24,27 +24,27 @@
PreAllocBufferQueue* frameQueue;
pthread_mutex_t* queue_mutex;
pthread_mutex_t* queue_empty_mutex;
+ pthread_mutex_t* queue_full_mutex;
bool auxLineSet;
- RTSPServer_Internal() :
+ RTSPServer2_Internal() :
config(),
live_daemon_thid(0), live_daemon_running(false),
server(nullptr),
- frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
+ frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config
auxLineSet(false)
{
- pthread_mutex_init(queue_mutex, NULL);
}
- ~RTSPServer_Internal()
+ ~RTSPServer2_Internal()
{
reset();
}
void reset()
{
- RTSPServerConfig _config;
+ RTSPServer2Config _config;
config =_config;
if (frameQueue != nullptr)
@@ -59,7 +59,6 @@
delete queue_mutex;
queue_mutex = nullptr;
}
-
queue_mutex = new pthread_mutex_t;
pthread_mutex_init(queue_mutex, NULL);
@@ -69,9 +68,17 @@
delete queue_empty_mutex;
queue_empty_mutex = nullptr;
}
-
queue_empty_mutex = new pthread_mutex_t;
pthread_mutex_init(queue_empty_mutex, NULL);
+
+ if (queue_full_mutex != nullptr)
+ {
+ pthread_mutex_destroy(queue_full_mutex);
+ delete queue_full_mutex;
+ queue_full_mutex = nullptr;
+ }
+ queue_full_mutex = new pthread_mutex_t;
+ pthread_mutex_init(queue_full_mutex, NULL);
live_daemon_thid = 0;
live_daemon_running = false;
@@ -82,27 +89,27 @@
}
};
-PipeLineElem* create_PL_RTSPServer()
+PipeLineElem* create_PL_RTSPServer2()
{
- return new PL_RTSPServer;
+ return new PL_RTSPServer2;
}
-PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal)
+PL_RTSPServer2::PL_RTSPServer2() : internal(new RTSPServer2_Internal)
{
}
-PL_RTSPServer::~PL_RTSPServer()
+PL_RTSPServer2::~PL_RTSPServer2()
{
- delete (RTSPServer_Internal*)internal;
+ delete (RTSPServer2_Internal*)internal;
internal = nullptr;
}
struct DeliverFrameCallback
{
- RTSPServer_Internal* in;
+ RTSPServer2_Internal* in;
PreAllocBufferQueue::Buffer* lastBuffer;
- DeliverFrameCallback(RTSPServer_Internal* _in)
+ DeliverFrameCallback(RTSPServer2_Internal* _in)
: in(_in) , lastBuffer(nullptr)
{
}
@@ -120,13 +127,28 @@
{
DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
+ if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full())
+ {
+ int ret = pthread_mutex_unlock(_this->in->queue_full_mutex);
+ if (ret != 0)
+ {
+ LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL;
+ }
+ }
+
if (_this->in->frameQueue->Empty())
{
int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
if (ret != 0)
{
- LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
+ LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
}
+ }
+
+ int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
+ if (ret != 0)
+ {
+ LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
}
ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
@@ -138,17 +160,24 @@
_this->lastBuffer = nullptr;
}
+ //#todo
+ //find frameQueue->Seek is pps/sps
+ // if not: send bufferred pps , return;
+
+
_this->lastBuffer = _this->in->frameQueue->Dequeue();
if (_this->lastBuffer == nullptr)
return false;
- buffer = _this->lastBuffer->buffer;
- buffSize = _this->lastBuffer->buffSize;
+ buffer = _this->lastBuffer->buffer + 4; // #todo send nalu
+ buffSize = _this->lastBuffer->buffSize - 4;
+ //LOG_WARN << "sizeS=" << buffSize << LOG_ENDL;
- LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
+ //LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
//static size_t f = 0;
//static FILE *pFile = fopen("/data/bb.264", "wb");
//fwrite(buffer, sizeof(char), buffSize, pFile);
+ //fflush(pFile);
//if (++f > 30){
// fclose(pFile);
// exit(0);
@@ -173,7 +202,7 @@
static void* live_daemon_thd(void* arg)
{
- RTSPServer_Internal* in = (RTSPServer_Internal*)arg;
+ RTSPServer2_Internal* in = (RTSPServer2_Internal*)arg;
in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080);
@@ -191,13 +220,14 @@
in->live_daemon_running = false;
}
-bool PL_RTSPServer::init(void* args)
+bool PL_RTSPServer2::init(void* args)
{
- RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+ RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
+ in->reset();
if (args)
{
- RTSPServerConfig* config = (RTSPServerConfig*)args;
+ RTSPServer2Config* config = (RTSPServer2Config*)args;
in->config = *config;
}
@@ -205,37 +235,37 @@
qcfg.multithreadSafe = false;
qcfg.fullQueueDropFront = true;
qcfg.fullQueueSync = false;
- qcfg.count = 32;
- qcfg.maxBuffSize = 100000;
+ qcfg.count = 20;
+ qcfg.maxBuffSize = 524288; // 512KB
in->frameQueue = new PreAllocBufferQueue(qcfg);
int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
if(ret != 0)
{
- LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
+ LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL;
return false;
}
return true;
}
-void PL_RTSPServer::finit()
+void PL_RTSPServer2::finit()
{
- RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+ RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
pthread_join(in->live_daemon_thid, NULL);
}
-bool PL_RTSPServer::pay(const PipeMaterial& pm)
+bool PL_RTSPServer2::pay(const PipeMaterial& pm)
{
- RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+ RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
if (pm.buffer == nullptr)
return false;
if (pm.type != PipeMaterial::PMT_FRAME)
{
- LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl;
+ LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL;
return false;
}
@@ -251,35 +281,62 @@
framedSource->ppsBase64 = ppsStr;
in->auxLineSet = true;
+
+ LOG_INFO <<"sps:" << spsStr.size() << ", pps:" << ppsStr.size() << LOG_ENDL;
}
+ }
+
+//#todo
+ // find if is pps/sps
+ // buffer the frame into RTSPServer2_Internal
+
+ while (in->config.payBlockFullQueue && in->frameQueue->Full())
+ {
+ int ret = pthread_mutex_lock(in->queue_full_mutex);
+ if (ret != 0)
+ {
+ LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL;
+ }
+
+ //if (in->frameQueue->Full())
+ //{
+ // LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL;
+ // return false;
+ //}
}
MB_Frame* frame = (MB_Frame*)pm.buffer;
if (frame->buffer == nullptr || frame->buffSize == 0)
return false;
+ //LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL;
+
ScopeLocker<pthread_mutex_t>(in->queue_mutex);
//if (in->frameQueue->Full())
- // LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl;
+ // LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL;
PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
if (qbuff == nullptr)
{
- LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl;
+ LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL;
int ret = pthread_mutex_unlock(in->queue_empty_mutex);
if (ret != 0)
{
- LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+ LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
}
return false;
}
- memcpy(qbuff->buffer, frame->buffer, frame->buffSize);
- qbuff->buffSize = frame->buffSize;
+ const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig());
+ size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize);
+
+ memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min
+ qbuff->buffSize = copySize;
//static size_t f = 0;
//static FILE *pFile = fopen("/data/aa.264", "wb");
- //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
+ //fwrite(qbuff->buffer, sizeof(char), qbuff->buffSize, pFile);
+ //fflush(pFile);
//if (++f > 400){
// fclose(pFile);
// exit(0);
@@ -288,14 +345,20 @@
int ret = pthread_mutex_unlock(in->queue_empty_mutex);
if (ret != 0)
{
- LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
+ LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
}
+
+ if (copySize < frame->buffSize)
+ {
+ LOG_WARN << "copy frame truncated" << LOG_ENDL;
+ }
+
return true;
}
-bool PL_RTSPServer::gain(PipeMaterial& pm)
+bool PL_RTSPServer2::gain(PipeMaterial& pm)
{
- RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
+ RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
pm.type = PipeMaterial::PMT_NONE;
pm.buffer = nullptr;
--
Gitblit v1.8.0