houxiao
2017-08-17 6faf88ba05f174a80c68f01c0412cae9789dbc8c
RtspFace/PL_RTSPServer2.cpp
@@ -24,6 +24,7 @@
   PreAllocBufferQueue* frameQueue;
   pthread_mutex_t* queue_mutex;
   pthread_mutex_t* queue_empty_mutex;
   pthread_mutex_t* queue_full_mutex;
   bool auxLineSet;
@@ -31,10 +32,9 @@
      config(),
      live_daemon_thid(0), live_daemon_running(false),
      server(nullptr),
      frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
      frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config
      auxLineSet(false)
   {
      pthread_mutex_init(queue_mutex, NULL);
   }
   
   ~RTSPServer2_Internal()
@@ -59,7 +59,6 @@
         delete queue_mutex;
         queue_mutex = nullptr;
      }
      queue_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_mutex, NULL);
@@ -69,9 +68,17 @@
         delete queue_empty_mutex;
         queue_empty_mutex = nullptr;
      }
      queue_empty_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_empty_mutex, NULL);
      if (queue_full_mutex != nullptr)
      {
         pthread_mutex_destroy(queue_full_mutex);
         delete queue_full_mutex;
         queue_full_mutex = nullptr;
      }
      queue_full_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_full_mutex, NULL);
      live_daemon_thid = 0;
      live_daemon_running = false;
@@ -120,19 +127,28 @@
   {
      DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
      if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full())
      {
         int ret = pthread_mutex_unlock(_this->in->queue_full_mutex);
         if (ret != 0)
         {
            LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL;
         }
      }
      if (_this->in->frameQueue->Empty())
      {
         int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
         if (ret != 0)
         {
            LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
            LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
         }
      }
      int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
         LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
      }
      ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
@@ -144,17 +160,24 @@
         _this->lastBuffer = nullptr;
      }
      //#todo
      //find frameQueue->Seek is pps/sps
      // if not: send bufferred pps , return;
      _this->lastBuffer = _this->in->frameQueue->Dequeue();
      if (_this->lastBuffer == nullptr)
         return false;
      buffer = _this->lastBuffer->buffer;
      buffSize = _this->lastBuffer->buffSize;
      buffer = _this->lastBuffer->buffer + 4; // #todo send nalu
      buffSize = _this->lastBuffer->buffSize - 4;
      //LOG_WARN << "sizeS=" << buffSize << LOG_ENDL;
      //LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
      //static size_t f = 0;
      //static FILE *pFile = fopen("/data/bb.264", "wb");
      //fwrite(buffer, sizeof(char), buffSize, pFile);
      //fflush(pFile);
      //if (++f > 30){
      //   fclose(pFile);
      //   exit(0);
@@ -212,14 +235,14 @@
   qcfg.multithreadSafe = false;
   qcfg.fullQueueDropFront = true;
   qcfg.fullQueueSync = false;
   qcfg.count = 32;
   qcfg.maxBuffSize = 100000;
   qcfg.count = 20;
   qcfg.maxBuffSize = 524288; // 512KB
   in->frameQueue = new PreAllocBufferQueue(qcfg);
   int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
   if(ret != 0)
   {
      LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
      LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL;
      return false;
   }
@@ -242,7 +265,7 @@
   
   if (pm.type != PipeMaterial::PMT_FRAME)
   {
      LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << std::endl;
      LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL;
      return false;
   }
@@ -258,35 +281,62 @@
         framedSource->ppsBase64 = ppsStr;
         in->auxLineSet = true;
         LOG_INFO <<"sps:" << spsStr.size() << ", pps:" << ppsStr.size() << LOG_ENDL;
      }
   }
//#todo
   // find if is pps/sps
   // buffer the frame into RTSPServer2_Internal
   while (in->config.payBlockFullQueue && in->frameQueue->Full())
   {
      int ret = pthread_mutex_lock(in->queue_full_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL;
      }
      //if (in->frameQueue->Full())
      //{
      //   LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL;
      //   return false;
      //}
   }
   MB_Frame* frame = (MB_Frame*)pm.buffer;
   if (frame->buffer == nullptr || frame->buffSize == 0)
      return false;
   //LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL;
   ScopeLocker<pthread_mutex_t>(in->queue_mutex);
   //if (in->frameQueue->Full())
   //   LOG_WARN << "PL_RTSPServer2::pay may lost data" << std::endl;
   //   LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL;
   PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
   if (qbuff == nullptr)
   {
      LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << std::endl;
      LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL;
      int ret = pthread_mutex_unlock(in->queue_empty_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
         LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
      }
      return false;
   }
   memcpy(qbuff->buffer, frame->buffer, frame->buffSize);
   qbuff->buffSize = frame->buffSize;
   const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig());
   size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize);
   memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min
   qbuff->buffSize = copySize;
   //static size_t f = 0;
   //static FILE *pFile = fopen("/data/aa.264", "wb");
   //fwrite(qbuff->buffer, sizeof(char), frame->buffSize, pFile);
   //fwrite(qbuff->buffer, sizeof(char), qbuff->buffSize, pFile);
   //fflush(pFile);
   //if (++f > 400){
   //   fclose(pFile);
   //   exit(0);
@@ -295,8 +345,14 @@
   int ret = pthread_mutex_unlock(in->queue_empty_mutex);
   if (ret != 0)
   {
      LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
      LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
   }
   if (copySize < frame->buffSize)
   {
      LOG_WARN << "copy frame truncated" << LOG_ENDL;
   }
   return true;
}