#include "PL_RTSPServer2.h"
|
#include "MaterialBuffer.h"
|
#include "logger.h"
|
|
#include <liveMedia/liveMedia.hh>
|
#include <BasicUsageEnvironment/BasicUsageEnvironment.hh>
|
|
#include "FFmpegRTSPServer/IEncoder.h"
|
#include "FFmpegRTSPServer/LiveRTSPServer.h"
|
#include "FFmpegRTSPServer/H264FramedSource.h"
|
#include "FFmpegRTSPServer/LiveServerMediaSubsession.h"
|
#include "PreAllocBufferQueue.h"
|
#include "MediaHelper.h"
|
|
struct RTSPServer2_Internal
|
{
|
RTSPServer2Config config;
|
|
pthread_t live_daemon_thid;
|
bool live_daemon_running;
|
|
MESAI::LiveRTSPServer* server;
|
|
PreAllocBufferQueue* frameQueue;
|
pthread_mutex_t* queue_mutex;
|
pthread_mutex_t* queue_empty_mutex;
|
pthread_mutex_t* queue_full_mutex;
|
|
bool auxLineSet;
|
|
RTSPServer2_Internal() :
|
config(),
|
live_daemon_thid(0), live_daemon_running(false),
|
server(nullptr),
|
frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config
|
auxLineSet(false)
|
{
|
}
|
|
~RTSPServer2_Internal()
|
{
|
reset();
|
}
|
|
void reset()
|
{
|
RTSPServer2Config _config;
|
config =_config;
|
|
if (frameQueue != nullptr)
|
{
|
delete frameQueue;
|
frameQueue = nullptr;
|
}
|
|
if (queue_mutex != nullptr)
|
{
|
pthread_mutex_destroy(queue_mutex);
|
delete queue_mutex;
|
queue_mutex = nullptr;
|
}
|
queue_mutex = new pthread_mutex_t;
|
pthread_mutex_init(queue_mutex, NULL);
|
|
if (queue_empty_mutex != nullptr)
|
{
|
pthread_mutex_destroy(queue_empty_mutex);
|
delete queue_empty_mutex;
|
queue_empty_mutex = nullptr;
|
}
|
queue_empty_mutex = new pthread_mutex_t;
|
pthread_mutex_init(queue_empty_mutex, NULL);
|
|
if (queue_full_mutex != nullptr)
|
{
|
pthread_mutex_destroy(queue_full_mutex);
|
delete queue_full_mutex;
|
queue_full_mutex = nullptr;
|
}
|
queue_full_mutex = new pthread_mutex_t;
|
pthread_mutex_init(queue_full_mutex, NULL);
|
|
live_daemon_thid = 0;
|
live_daemon_running = false;
|
|
server = nullptr; //#todo delete
|
|
auxLineSet = false;
|
}
|
};
|
|
PipeLineElem* create_PL_RTSPServer2()
|
{
|
return new PL_RTSPServer2;
|
}
|
|
PL_RTSPServer2::PL_RTSPServer2() : internal(new RTSPServer2_Internal)
|
{
|
}
|
|
PL_RTSPServer2::~PL_RTSPServer2()
|
{
|
delete (RTSPServer2_Internal*)internal;
|
internal = nullptr;
|
}
|
|
struct DeliverFrameCallback
|
{
|
RTSPServer2_Internal* in;
|
PreAllocBufferQueue::Buffer* lastBuffer;
|
|
DeliverFrameCallback(RTSPServer2_Internal* _in)
|
: in(_in) , lastBuffer(nullptr)
|
{
|
}
|
|
~DeliverFrameCallback()
|
{
|
if (lastBuffer != nullptr)
|
{
|
in->frameQueue->Release(lastBuffer);
|
lastBuffer = nullptr;
|
}
|
}
|
|
static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts)
|
{
|
DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
|
|
if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full())
|
{
|
int ret = pthread_mutex_unlock(_this->in->queue_full_mutex);
|
if (ret != 0)
|
{
|
LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL;
|
}
|
}
|
|
if (_this->in->frameQueue->Empty())
|
{
|
int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
|
if (ret != 0)
|
{
|
LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
|
}
|
}
|
|
int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
|
if (ret != 0)
|
{
|
LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
|
}
|
|
ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
|
|
if (_this->lastBuffer != nullptr)
|
{
|
// this can not happen
|
_this->in->frameQueue->Release(_this->lastBuffer);
|
_this->lastBuffer = nullptr;
|
}
|
|
_this->lastBuffer = _this->in->frameQueue->Dequeue();
|
if (_this->lastBuffer == nullptr)
|
return false;
|
|
buffer = _this->lastBuffer->buffer; // #todo send nalu
|
buffSize = _this->lastBuffer->buffSize;
|
//LOG_WARN << "sizeS=" << buffSize << LOG_ENDL;
|
|
//LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
|
//static size_t f = 0;
|
//static FILE *pFile = fopen("/data/bb.264", "wb");
|
//fwrite(buffer, sizeof(char), buffSize, pFile);
|
//fflush(pFile);
|
//if (++f > 30){
|
// fclose(pFile);
|
// exit(0);
|
//}
|
|
gettimeofday(&pts, NULL);
|
return (_this->lastBuffer != nullptr);
|
}
|
|
static void releaseFrame(void* args)
|
{
|
DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
|
|
if (_this->lastBuffer != nullptr)
|
{
|
ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
|
_this->in->frameQueue->Release(_this->lastBuffer);
|
_this->lastBuffer = nullptr;
|
}
|
}
|
};
|
|
static void* live_daemon_thd(void* arg)
|
{
|
RTSPServer2_Internal* in = (RTSPServer2_Internal*)arg;
|
|
in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080);
|
|
in->server->init();
|
|
MESAI::H264FramedSource::FrameCallbacks cbs;
|
cbs.args = new DeliverFrameCallback(in);//#todo delete
|
cbs.deliverFrameCallback = DeliverFrameCallback::deliverFrame;
|
cbs.releaseFrameCallback = DeliverFrameCallback::releaseFrame;
|
in->server->framedSource = new MESAI::H264FramedSource(*in->server->env, cbs);
|
|
in->live_daemon_running = true;
|
in->server->run(); // does not return
|
//#todo delete framedSource
|
in->live_daemon_running = false;
|
}
|
|
bool PL_RTSPServer2::init(void* args)
|
{
|
RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
|
in->reset();
|
|
if (args)
|
{
|
RTSPServer2Config* config = (RTSPServer2Config*)args;
|
in->config = *config;
|
}
|
|
PreAllocBufferQueue::Config qcfg;
|
qcfg.multithreadSafe = false;
|
qcfg.fullQueueDropFront = true;
|
qcfg.fullQueueSync = false;
|
qcfg.count = 20;
|
qcfg.maxBuffSize = 524288; // 512KB
|
in->frameQueue = new PreAllocBufferQueue(qcfg);
|
|
int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
|
if(ret != 0)
|
{
|
LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL;
|
return false;
|
}
|
|
return true;
|
}
|
|
void PL_RTSPServer2::finit()
|
{
|
RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
|
|
pthread_join(in->live_daemon_thid, NULL);
|
}
|
|
bool PL_RTSPServer2::pay(const PipeMaterial& pm)
|
{
|
RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
|
|
if (pm.buffer == nullptr)
|
return false;
|
|
if (pm.type != PipeMaterial::PMT_FRAME)
|
{
|
LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL;
|
return false;
|
}
|
|
if (!in->auxLineSet)
|
{
|
std::string spsStr(this->manager->get_param(PLGP_ENC_SPS_B64));
|
std::string ppsStr(this->manager->get_param(PLGP_ENC_PPS_B64));
|
|
if (!spsStr.empty() && !ppsStr.empty())
|
{
|
MESAI::H264FramedSource* framedSource = dynamic_cast<MESAI::H264FramedSource*>(in->server->framedSource);
|
framedSource->spsBase64 = spsStr;
|
framedSource->ppsBase64 = ppsStr;
|
|
in->auxLineSet = true;
|
|
LOG_INFO <<"sps:" << spsStr.size() << ", pps:" << ppsStr.size() << LOG_ENDL;
|
}
|
}
|
|
while (in->config.payBlockFullQueue && in->frameQueue->Full())
|
{
|
int ret = pthread_mutex_lock(in->queue_full_mutex);
|
if (ret != 0)
|
{
|
LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL;
|
}
|
|
//if (in->frameQueue->Full())
|
//{
|
// LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL;
|
// return false;
|
//}
|
}
|
|
MB_Frame* frame = (MB_Frame*)pm.buffer;
|
if (frame->buffer == nullptr || frame->buffSize == 0)
|
return false;
|
|
//LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL;
|
|
ScopeLocker<pthread_mutex_t>(in->queue_mutex);
|
//if (in->frameQueue->Full())
|
// LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL;
|
|
PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
|
if (qbuff == nullptr)
|
{
|
LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL;
|
int ret = pthread_mutex_unlock(in->queue_empty_mutex);
|
if (ret != 0)
|
{
|
LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
|
}
|
return false;
|
}
|
|
const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig());
|
size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize);
|
|
memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min
|
qbuff->buffSize = copySize;
|
|
//static size_t f = 0;
|
//static FILE *pFile = fopen("/data/aa.264", "wb");
|
//fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
|
//if (++f > 400){
|
// fclose(pFile);
|
// exit(0);
|
//}
|
|
int ret = pthread_mutex_unlock(in->queue_empty_mutex);
|
if (ret != 0)
|
{
|
LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
|
}
|
|
if (copySize < frame->buffSize)
|
{
|
LOG_WARN << "copy frame truncated" << LOG_ENDL;
|
}
|
|
return true;
|
}
|
|
bool PL_RTSPServer2::gain(PipeMaterial& pm)
|
{
|
RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
|
|
pm.type = PipeMaterial::PMT_NONE;
|
pm.buffer = nullptr;
|
pm.buffSize = 0;
|
pm.former = this;
|
return true;
|
}
|