xingzilong
2017-08-18 9e5babf9db52e64bdae60137be7696e56241fca6
RtspFace/PL_RTSPServer2.cpp
@@ -1,4 +1,4 @@
#include "PL_RTSPServer.h"
#include "PL_RTSPServer2.h"
#include "MaterialBuffer.h"
#include "logger.h"
@@ -12,9 +12,35 @@
#include "PreAllocBufferQueue.h"
#include "MediaHelper.h"
struct RTSPServer_Internal
typedef enum {
    NALU_TYPE_SLICE    = 1,
    NALU_TYPE_DPA      = 2,
    NALU_TYPE_DPB      = 3,
    NALU_TYPE_DPC      = 4,
    NALU_TYPE_IDR      = 5,
    NALU_TYPE_SEI      = 6,
    NALU_TYPE_SPS      = 7,
    NALU_TYPE_PPS      = 8,
    NALU_TYPE_AUD      = 9,
    NALU_TYPE_EOSEQ    = 10,
    NALU_TYPE_EOSTREAM = 11,
    NALU_TYPE_FILL     = 12,
} NaluType;
typedef struct
{
   RTSPServerConfig config;
    int startcodeprefix_len;      //! 4 for parameter sets and first slice in picture, 3 for everything else (suggested)
    unsigned len;                 //! Length of the NAL unit (Excluding the start code, which does not belong to the NALU)
    unsigned max_size;            //! Nal Unit Buffer size
    int forbidden_bit;            //! should be always FALSE
    int nal_reference_idc;        //! NALU_PRIORITY_xxxx
    int nal_unit_type;            //! NALU_TYPE_xxxx
    char *buf;                    //! contains the first byte followed by the EBSP
} NALU_t;
struct RTSPServer2_Internal
{
   RTSPServer2Config config;
   pthread_t live_daemon_thid;
   bool live_daemon_running;
@@ -24,27 +50,35 @@
   PreAllocBufferQueue* frameQueue;
   pthread_mutex_t* queue_mutex;
   pthread_mutex_t* queue_empty_mutex;
   pthread_mutex_t* queue_full_mutex;
   bool auxLineSet;
   RTSPServer_Internal() :
    uint8_t lastSps[50];
    uint8_t lastPps[50];
    uint8_t lastSpsPps[100];
    size_t lastSpsSize;
    size_t lastPpsSize;
   RTSPServer2_Internal() :
      config(),
      live_daemon_thid(0), live_daemon_running(false),
      server(nullptr),
      frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
      auxLineSet(false)
      frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config
      auxLineSet(false),
        lastSps(), lastPps(),lastSpsPps(),lastSpsSize(0),lastPpsSize(0)
   {
      pthread_mutex_init(queue_mutex, NULL);
   }
   
   ~RTSPServer_Internal()
   ~RTSPServer2_Internal()
   {
      reset();
   }
   
   void reset()
   {
      RTSPServerConfig _config;
      RTSPServer2Config _config;
      config =_config;
      if (frameQueue != nullptr)
@@ -59,7 +93,6 @@
         delete queue_mutex;
         queue_mutex = nullptr;
      }
      queue_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_mutex, NULL);
@@ -69,9 +102,17 @@
         delete queue_empty_mutex;
         queue_empty_mutex = nullptr;
      }
      queue_empty_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_empty_mutex, NULL);
      if (queue_full_mutex != nullptr)
      {
         pthread_mutex_destroy(queue_full_mutex);
         delete queue_full_mutex;
         queue_full_mutex = nullptr;
      }
      queue_full_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_full_mutex, NULL);
      live_daemon_thid = 0;
      live_daemon_running = false;
@@ -79,30 +120,32 @@
      server = nullptr; //#todo delete
      auxLineSet = false;
        lastSpsSize = 0;
        lastPpsSize = 0;
   }
};
PipeLineElem* create_PL_RTSPServer()
PipeLineElem* create_PL_RTSPServer2()
{
   return new PL_RTSPServer;
   return new PL_RTSPServer2;
}
PL_RTSPServer::PL_RTSPServer() : internal(new RTSPServer_Internal)
PL_RTSPServer2::PL_RTSPServer2() : internal(new RTSPServer2_Internal)
{
}
PL_RTSPServer::~PL_RTSPServer()
PL_RTSPServer2::~PL_RTSPServer2()
{
   delete (RTSPServer_Internal*)internal;
   delete (RTSPServer2_Internal*)internal;
   internal = nullptr;
}
struct DeliverFrameCallback
{
   RTSPServer_Internal* in;
   RTSPServer2_Internal* in;
   PreAllocBufferQueue::Buffer* lastBuffer;
   DeliverFrameCallback(RTSPServer_Internal* _in)
   DeliverFrameCallback(RTSPServer2_Internal* _in)
         : in(_in) , lastBuffer(nullptr)
   {
   }
@@ -116,17 +159,44 @@
      }
   }
    static int get_nalu_info(NALU_t* _p_nalu_info,const uint8_t* _pbuffer,const size_t _len)
    {
        if((nullptr==_pbuffer)||(_len<=4))
        {
            return -1;
        }
        _p_nalu_info->forbidden_bit = _pbuffer[4] & 0x80; //1 bit
        _p_nalu_info->nal_reference_idc = _pbuffer[4] & 0x60; // 2 bit
        _p_nalu_info->nal_unit_type = _pbuffer[4] & 0x1f;// 5 bit
        return 0;
    }
   static bool deliverFrame(void* args, uint8_t*& buffer, size_t& buffSize, timeval& pts)
   {
      DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
      if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full())
      {
         int ret = pthread_mutex_unlock(_this->in->queue_full_mutex);
         if (ret != 0)
         {
            LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL;
         }
      }
      if (_this->in->frameQueue->Empty())
      {
         int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
         if (ret != 0)
         {
            LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
            LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
         }
      }
      int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
      }
      ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
@@ -138,17 +208,39 @@
         _this->lastBuffer = nullptr;
      }
      //#todo
#if 0
      //find frameQueue->Seek is pps/sps
        PreAllocBufferQueue::Buffer* _p_Buffer = _this->in->frameQueue->Seek();
      // if not: send bufferred pps , return;
        NALU_t _obj_NALU_t;
        get_nalu_info(&_obj_NALU_t,_p_Buffer->buffer,_p_Buffer->buffSize);
        if(NALU_TYPE_PPS!=_obj_NALU_t.nal_unit_type)
        {
            memcpy(_this->in->lastSpsPps,_this->in->lastSps,_this->in->lastSpsSize);
            memcpy(_this->in->lastSpsPps+_this->in->lastSpsSize,_this->in->lastPps,_this->in->lastPpsSize);
            buffer = _this->in->lastSpsPps;
            buffSize = _this->in->lastSpsSize+_this->in->lastPpsSize;
            gettimeofday(&pts, NULL);
            return true;
        }
#endif
      _this->lastBuffer = _this->in->frameQueue->Dequeue();
      if (_this->lastBuffer == nullptr)
         return false;
      buffer = _this->lastBuffer->buffer;
      buffSize = _this->lastBuffer->buffSize;
      buffer = _this->lastBuffer->buffer + 4; // #todo send nalu
      buffSize = _this->lastBuffer->buffSize - 4;
      //LOG_WARN << "sizeS=" << buffSize << LOG_ENDL;
      LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
      //LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
      //static size_t f = 0;
      //static FILE *pFile = fopen("/data/bb.264", "wb");
      //fwrite(buffer, sizeof(char), buffSize, pFile);
      //fflush(pFile);
      //if (++f > 30){
      //   fclose(pFile);
      //   exit(0);
@@ -173,7 +265,7 @@
static void* live_daemon_thd(void* arg)
{
   RTSPServer_Internal* in = (RTSPServer_Internal*)arg;
   RTSPServer2_Internal* in = (RTSPServer2_Internal*)arg;
   in->server = new MESAI::LiveRTSPServer(nullptr, 8554, 8080);
@@ -191,13 +283,14 @@
   in->live_daemon_running = false;
}
bool PL_RTSPServer::init(void* args)
bool PL_RTSPServer2::init(void* args)
{
   RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
   RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
   in->reset();
   if (args)
   {
      RTSPServerConfig* config = (RTSPServerConfig*)args;
      RTSPServer2Config* config = (RTSPServer2Config*)args;
      in->config = *config;
   }
@@ -205,37 +298,37 @@
   qcfg.multithreadSafe = false;
   qcfg.fullQueueDropFront = true;
   qcfg.fullQueueSync = false;
   qcfg.count = 32;
   qcfg.maxBuffSize = 100000;
   qcfg.count = 20;
   qcfg.maxBuffSize = 524288; // 512KB
   in->frameQueue = new PreAllocBufferQueue(qcfg);
   int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
   if(ret != 0)
   {
      LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
      LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL;
      return false;
   }
   return true;
}
void PL_RTSPServer::finit()
void PL_RTSPServer2::finit()
{
   RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
   RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
   pthread_join(in->live_daemon_thid, NULL);
}
bool PL_RTSPServer::pay(const PipeMaterial& pm)
bool PL_RTSPServer2::pay(const PipeMaterial& pm)
{
   RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
   RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
   if (pm.buffer == nullptr)
      return false;
   
   if (pm.type != PipeMaterial::PMT_FRAME)
   {
      LOG_ERROR << "PL_RTSPServer::pay only support PMT_FRAME" << std::endl;
      LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL;
      return false;
   }
@@ -251,35 +344,79 @@
         framedSource->ppsBase64 = ppsStr;
         in->auxLineSet = true;
         LOG_INFO <<"sps:" << spsStr.size() << ", pps:" << ppsStr.size() << LOG_ENDL;
      }
   }
   MB_Frame* frame = (MB_Frame*)pm.buffer;
   if (frame->buffer == nullptr || frame->buffSize == 0)
      return false;
    MB_Frame* frame = (MB_Frame*)pm.buffer;
    if (frame->buffer == nullptr || frame->buffSize == 0)
        return false;
    //LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL;
//#todo
#if 0
   // find if is pps/
    // buffer the frame into spsRTSPServer2_Internal
    if(frame->type==MB_Frame::MBFT_H264_NALU)
    {
        NALU_t objNALU_t;
        DeliverFrameCallback::get_nalu_info(&objNALU_t,(const uint8_t*)(frame->buffer),frame->buffSize);
        if(NALU_TYPE_PPS!=objNALU_t.nal_unit_type)
        {
            memcpy(in->lastPps,frame->buffer,frame->buffSize);
        }
        else if(NALU_TYPE_SPS!=objNALU_t.nal_unit_type)
        {
            memcpy(in->lastSps,frame->buffer,frame->buffSize);
        }
    }
#endif
   while (in->config.payBlockFullQueue && in->frameQueue->Full())
   {
      int ret = pthread_mutex_lock(in->queue_full_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL;
      }
      //if (in->frameQueue->Full())
      //{
      //   LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL;
      //   return false;
      //}
   }
   ScopeLocker<pthread_mutex_t>(in->queue_mutex);
   //if (in->frameQueue->Full())
   //   LOG_WARN << "PL_RTSPServer::pay may lost data" << std::endl;
   //   LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL;
   PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
   if (qbuff == nullptr)
   {
      LOG_WARN << "PL_RTSPServer::pay may lost data size=" << frame->buffSize << std::endl;
      LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL;
      int ret = pthread_mutex_unlock(in->queue_empty_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
         LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
      }
      return false;
   }
   memcpy(qbuff->buffer, frame->buffer, frame->buffSize);
   qbuff->buffSize = frame->buffSize;
   const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig());
   size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize);
   memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min
   qbuff->buffSize = copySize;
   //static size_t f = 0;
   //static FILE *pFile = fopen("/data/aa.264", "wb");
   //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
   //fwrite(qbuff->buffer, sizeof(char), qbuff->buffSize, pFile);
   //fflush(pFile);
   //if (++f > 400){
   //   fclose(pFile);
   //   exit(0);
@@ -288,18 +425,24 @@
   int ret = pthread_mutex_unlock(in->queue_empty_mutex);
   if (ret != 0)
   {
      LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
      LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
   }
   if (copySize < frame->buffSize)
   {
      LOG_WARN << "copy frame truncated" << LOG_ENDL;
   }
   return true;
}
bool PL_RTSPServer::gain(PipeMaterial& pm)
bool PL_RTSPServer2::gain(PipeMaterial& pm)
{
   RTSPServer_Internal* in = (RTSPServer_Internal*)internal;
   RTSPServer2_Internal* in = (RTSPServer2_Internal*)internal;
   pm.type = PipeMaterial::PMT_NONE;
   pm.buffer = nullptr;
   pm.buffSize = 0;
   pm.former = this;
   return true;
   return false;
}