houxiao
2017-08-16 663104b9be90ed303b87c8acddac8421583a9e39
RtspFace/PL_RTSPServer2.cpp
@@ -24,6 +24,7 @@
   PreAllocBufferQueue* frameQueue;
   pthread_mutex_t* queue_mutex;
   pthread_mutex_t* queue_empty_mutex;
   pthread_mutex_t* queue_full_mutex;
   bool auxLineSet;
@@ -31,10 +32,9 @@
      config(),
      live_daemon_thid(0), live_daemon_running(false),
      server(nullptr),
      frameQueue(nullptr), queue_mutex(new pthread_mutex_t), queue_empty_mutex(new pthread_mutex_t), //#todo from config
      frameQueue(nullptr), queue_mutex(nullptr), queue_empty_mutex(nullptr), queue_full_mutex(nullptr), //#todo from config
      auxLineSet(false)
   {
      pthread_mutex_init(queue_mutex, NULL);
   }
   
   ~RTSPServer2_Internal()
@@ -59,7 +59,6 @@
         delete queue_mutex;
         queue_mutex = nullptr;
      }
      queue_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_mutex, NULL);
@@ -69,9 +68,17 @@
         delete queue_empty_mutex;
         queue_empty_mutex = nullptr;
      }
      queue_empty_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_empty_mutex, NULL);
      if (queue_full_mutex != nullptr)
      {
         pthread_mutex_destroy(queue_full_mutex);
         delete queue_full_mutex;
         queue_full_mutex = nullptr;
      }
      queue_full_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_full_mutex, NULL);
      live_daemon_thid = 0;
      live_daemon_running = false;
@@ -120,19 +127,28 @@
   {
      DeliverFrameCallback* _this = (DeliverFrameCallback*)args;
      if (_this->in->config.payBlockFullQueue && !_this->in->frameQueue->Full())
      {
         int ret = pthread_mutex_unlock(_this->in->queue_full_mutex);
         if (ret != 0)
         {
            LOG_WARN << "pthread_mutex_unlock queue_full_mutex, ret=" << ret << LOG_ENDL;
         }
      }
      if (_this->in->frameQueue->Empty())
      {
         int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
         if (ret != 0)
         {
            LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
            LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
         }
      }
      int ret = pthread_mutex_lock(_this->in->queue_empty_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << std::endl;
         LOG_WARN << "pthread_mutex_lock queue_empty_mutex, ret=" << ret << LOG_ENDL;
      }
      ScopeLocker<pthread_mutex_t>(_this->in->queue_mutex);
@@ -150,11 +166,13 @@
      buffer = _this->lastBuffer->buffer;
      buffSize = _this->lastBuffer->buffSize;
      LOG_WARN << "sizeS=" << buffSize << LOG_ENDL;
      //LOG_INFO << "DeliverFrameCallback buffSize=" << buffSize << LOG_ENDL;
      //static size_t f = 0;
      //static FILE *pFile = fopen("/data/bb.264", "wb");
      //fwrite(buffer, sizeof(char), buffSize, pFile);
      //fflush(pFile);
      //if (++f > 30){
      //   fclose(pFile);
      //   exit(0);
@@ -212,14 +230,14 @@
   qcfg.multithreadSafe = false;
   qcfg.fullQueueDropFront = true;
   qcfg.fullQueueSync = false;
   qcfg.count = 32;
   qcfg.maxBuffSize = 200000;
   qcfg.count = 20;
   qcfg.maxBuffSize = 524288; // 512KB
   in->frameQueue = new PreAllocBufferQueue(qcfg);
   int ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in);
   if(ret != 0)
   {
      LOG_ERROR << "pthread_create: " << strerror(ret) << std::endl;
      LOG_ERROR << "pthread_create: " << strerror(ret) << LOG_ENDL;
      return false;
   }
@@ -242,7 +260,7 @@
   
   if (pm.type != PipeMaterial::PMT_FRAME)
   {
      LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << std::endl;
      LOG_ERROR << "PL_RTSPServer2::pay only support PMT_FRAME" << LOG_ENDL;
      return false;
   }
@@ -263,28 +281,48 @@
      }
   }
   while (in->config.payBlockFullQueue && in->frameQueue->Full())
   {
      int ret = pthread_mutex_lock(in->queue_full_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_lock queue_full_mutex, ret=" << ret << LOG_ENDL;
      }
      //if (in->frameQueue->Full())
      //{
      //   LOG_WARN << "frameQueue wakeup while full" << LOG_ENDL;
      //   return false;
      //}
   }
   MB_Frame* frame = (MB_Frame*)pm.buffer;
   if (frame->buffer == nullptr || frame->buffSize == 0)
      return false;
   LOG_WARN << "sizeR=" << frame->buffSize << LOG_ENDL;
   ScopeLocker<pthread_mutex_t>(in->queue_mutex);
   //if (in->frameQueue->Full())
   //   LOG_WARN << "PL_RTSPServer2::pay may lost data" << std::endl;
   //   LOG_WARN << "PL_RTSPServer2::pay may lost data" << LOG_ENDL;
   PreAllocBufferQueue::Buffer* qbuff = in->frameQueue->Enqueue();
   if (qbuff == nullptr)
   {
      LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << std::endl;
      LOG_WARN << "PL_RTSPServer2::pay may lost data size=" << frame->buffSize << LOG_ENDL;
      int ret = pthread_mutex_unlock(in->queue_empty_mutex);
      if (ret != 0)
      {
         LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
         LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
      }
      return false;
   }
   memcpy(qbuff->buffer, frame->buffer, frame->buffSize);//#todo size min
   qbuff->buffSize = frame->buffSize;
   const PreAllocBufferQueue::Config& qcfg(in->frameQueue->GetConfig());
   size_t copySize = std::min(qcfg.maxBuffSize, frame->buffSize);
   memcpy(qbuff->buffer, frame->buffer, copySize);//#todo size min
   qbuff->buffSize = copySize;
   //static size_t f = 0;
   //static FILE *pFile = fopen("/data/aa.264", "wb");
@@ -297,8 +335,14 @@
   int ret = pthread_mutex_unlock(in->queue_empty_mutex);
   if (ret != 0)
   {
      LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << std::endl;
      LOG_WARN << "pthread_mutex_unlock queue_empty_mutex, ret=" << ret << LOG_ENDL;
   }
   if (copySize < frame->buffSize)
   {
      LOG_WARN << "copy frame truncated" << LOG_ENDL;
   }
   return true;
}