houxiao
2017-07-13 b022b91c0c6fa807424b6c12cc92ac5946838083
RtspFace/PL_Queue.cpp
@@ -1,17 +1,22 @@
#include "PL_Queue.h"
#include "logger.h"
#include "MaterialBuffer.h"
#include <set>
#include <queue>
#include <string.h>
struct QBlock
{
   PipeMaterial lastPM;
   std::vector<MB_Frame> lastMbfs;
   uint8_t* data;
   size_t maxSize;
   size_t size;
   
   QBlock() : data(nullptr), maxSize(0), size(0) { }
   QBlock() : lastPM(), lastMbfs(), data(nullptr), maxSize(0), size(0) { }
   
   QBlock(uint8_t* _data, size_t _maxSize) : data(_data), maxSize(_maxSize), size(0) { }
   QBlock(uint8_t* _data, size_t _maxSize) : lastPM(), lastMbfs(), data(_data), maxSize(_maxSize), size(0) { }
   
   ~QBlock()
   {
@@ -19,80 +24,113 @@
      maxSize = 0;
      size = 0;
   }
   void reset()
   {
      size = 0;
      lastPM.reset();
      lastMbfs.clear();
   }
};
struct QBlockPool
{
   uint8_t* blocksDataPool;
   uint8_t* blocksDataCache;
   
   typedef std::set<QBlock*> qpool_t;
   qpool_t staticPool;
   qpool_t freePool;
   QBlockPool() : blocksDataPool(nullptr)
   typedef std::set<QBlock*> qblock_set_t;
   qblock_set_t staticPool;
   qblock_set_t freePool;
    pthread_mutex_t pool_mutex;
    QBlockPool() : blocksDataCache(nullptr), staticPool(), freePool(), pool_mutex()
   {
   }
   
   QBlockPool(size_t maxBlockCount, size_t maxBlockSize) : 
      blocksDataPool(nullptr)
      blocksDataCache(nullptr)
   {
      bp_init(maxBlockCount, maxBlockSize);
   }
   
   void bp_init(size_t maxBlockCount, size_t maxBlockSize)
   {
      const size_t totalBytes = maxBlockCount * maxBlockSize;
      blocksDataPool = new uint8_t[totalBytes];
      printf("QBlockPool allocate byte: %u\n", totalBytes);
        pthread_mutex_init(&pool_mutex, NULL);
        pthread_mutex_lock(&pool_mutex);
        const size_t totalBytes = maxBlockCount * maxBlockSize;
      blocksDataCache = new uint8_t[totalBytes];
      LOGP(WARN, "QBlockPool allocate byte: %u", totalBytes);
      
      for (size_t i = 0; i < maxBlockCount; i++)
      {
         uint8_t* qbData = new (blocksDataPool + i * maxBlockSize) uint8_t[maxBlockSize];
         uint8_t* qbData = new (blocksDataCache + i * maxBlockSize) uint8_t[maxBlockSize];
         QBlock* qb = new QBlock(qbData, maxBlockSize);
         staticPool.insert(qb);
         freePool.insert(qb);
      }
   }
        pthread_mutex_unlock(&pool_mutex);
    }
   void bp_reset()
   {
        pthread_mutex_lock(&pool_mutex);
        if (freePool.size() != staticPool.size())
      {
         LOG_WARN << "QBlockPool memory leakage" << LOG_ENDL;
      }
      freePool.clear();
      for(qblock_set_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
         delete *iter;
      staticPool.clear();
      delete[] blocksDataCache;
      blocksDataCache = nullptr;
        pthread_mutex_unlock(&pool_mutex);
    }
   ~QBlockPool()
   {
      if (freePool.size() != staticPool.size())
      {
         printf("QBlockPool memory leakage\n");
      }
      for(qpool_t::iterator iter = staticPool.begin(); iter != staticPool.end(); ++iter)
      {
         delete *iter;
      }
      delete[] blocksDataPool;
      blocksDataPool = nullptr;
   }
      bp_reset();
        pthread_mutex_destroy(&pool_mutex);
    }
   
   QBlock* bp_alloc()
   {
      if (freePool.empty())
        pthread_mutex_lock(&pool_mutex);
        if (freePool.empty())
      {
         printf("QBlockPool bp_alloc empty\n");
            pthread_mutex_unlock(&pool_mutex);
            LOG_WARN << "no free blocks" << LOG_ENDL;
         return nullptr;
      }
      qpool_t::iterator iter = freePool.begin();
        qblock_set_t::iterator iter = freePool.begin();
      QBlock* qb = *iter;
      freePool.erase(iter);
      return qb;
   }
        pthread_mutex_unlock(&pool_mutex);
        return qb;
    }
   
   void bp_free(QBlock* qb)
   {
      if (qb == nullptr)
         return;
      qpool_t::iterator iter = staticPool.find(qb);
        pthread_mutex_lock(&pool_mutex);
        qblock_set_t::iterator iter = staticPool.find(qb);
      if (iter == staticPool.end())
      {
         printf("QBlockPool bp_free not in pool\n");
            pthread_mutex_unlock(&pool_mutex);
            LOG_WARN << "block not belongs to pool" << LOG_ENDL;
         return;
      }
      
@@ -103,9 +141,10 @@
      //   return;
      //}
      
      qb->size = 0;
      qb->reset();
      freePool.insert(qb);
   }
        pthread_mutex_unlock(&pool_mutex);
    }
   
   bool bp_empty() const
   {
@@ -116,32 +155,32 @@
struct PL_Queue_Internal
{
   PL_Queue_Config config;
   QBlockPool pool;
   QBlockPool poolBlocks;
   
   typedef std::queue<QBlock*> queue_t;
   queue_t que;
   typedef std::queue<QBlock*> qblock_queue_t;
   qblock_queue_t queBlocks;
   
   pthread_mutex_t* queue_pool_mutex;
   pthread_mutex_t* queue_mutex;
   pthread_mutex_t* sync_full_mutex;
   pthread_mutex_t* sync_empty_mutex;
   
   PL_Queue_Internal() : 
      config(), pool(), que(),
      queue_pool_mutex(new pthread_mutex_t),
      config(), poolBlocks(), queBlocks(),
      queue_mutex(new pthread_mutex_t),
      sync_full_mutex(new pthread_mutex_t), sync_empty_mutex(new pthread_mutex_t)
   {
      pthread_mutex_init(queue_pool_mutex, NULL);
      pthread_mutex_init(queue_mutex, NULL);
      pthread_mutex_init(sync_full_mutex, NULL);
      pthread_mutex_init(sync_empty_mutex, NULL);
   }
   
   ~PL_Queue_Internal()
   {
      if (queue_pool_mutex != nullptr)
      if (queue_mutex != nullptr)
      {
         pthread_mutex_destroy(queue_pool_mutex);
         delete queue_pool_mutex;
         queue_pool_mutex = nullptr;
         pthread_mutex_destroy(queue_mutex);
         delete queue_mutex;
         queue_mutex = nullptr;
      }
      
      if (sync_full_mutex != nullptr)
@@ -161,15 +200,15 @@
   void reset()
   {
      if (queue_pool_mutex != nullptr)
      if (queue_mutex != nullptr)
      {
         pthread_mutex_destroy(queue_pool_mutex);
         delete queue_pool_mutex;
         queue_pool_mutex = nullptr;
         pthread_mutex_destroy(queue_mutex);
         delete queue_mutex;
         queue_mutex = nullptr;
      }
      
      queue_pool_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_pool_mutex, NULL);
      queue_mutex = new pthread_mutex_t;
      pthread_mutex_init(queue_mutex, NULL);
      
      if (sync_full_mutex != nullptr)
      {
@@ -190,8 +229,8 @@
      
      sync_empty_mutex = new pthread_mutex_t;
      pthread_mutex_init(sync_empty_mutex, NULL);
      //#todo free que
      poolBlocks.bp_reset();
   }
};
@@ -213,28 +252,31 @@
bool PL_Queue::init(void* args)
{
   PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
   PL_Queue_Config* config = (PL_Queue_Config*)args;
   in->reset();
   in->config = *config;
   in->pool.bp_init(config->maxBlockCount, config->maxBlockSize);
    if (args != nullptr)
    {
        PL_Queue_Config* config = (PL_Queue_Config*)args;
        in->config = *config;
    }
   in->poolBlocks.bp_init(in->config.maxBlockCount, in->config.maxBlockSize);
   
   if (config->syncQueueFull)
   if (in->config.syncQueueFull)
   {
      int ret = pthread_mutex_lock(in->sync_full_mutex);
      if(ret != 0)
      {
         printf("pthread_mutex_lock sync_full_mutex: %d\n", ret);
         LOGP(WARN, "pthread_mutex_lock sync_full_mutex: %d", ret);
         return false;
      }
   }
   
   if (config->syncQueueEmpty)
   if (in->config.syncQueueEmpty)
   {
      int ret = pthread_mutex_lock(in->sync_empty_mutex);
      if(ret != 0)
      {
         printf("pthread_mutex_lock sync_empty_mutex: %d\n", ret);
         LOGP(WARN, "pthread_mutex_lock sync_empty_mutex: %d", ret);
         return false;
      }
   }
@@ -252,63 +294,108 @@
   PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
   
   QBlock* qb = nullptr;
   if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
   if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
   {
      // there is no available qb
      
      if (in->config.queueFullDropBlock)
      if (in->config.queueFullDropFrontBlock)
      {
         printf("PL_Queue::pay queueFullDropBlock\n");
         LOG_WARN << "PL_Queue::pay queueFullDropFrontBlock" << LOG_ENDL;
      
         pthread_mutex_lock(in->queue_pool_mutex);
         qb = in->que.front();
         in->que.pop();
         in->pool.bp_free(qb);
         pthread_mutex_lock(in->queue_mutex);
         qb = in->queBlocks.front();
         in->queBlocks.pop();
            pthread_mutex_unlock(in->queue_mutex);
            in->poolBlocks.bp_free(qb);
         qb = nullptr;
         pthread_mutex_unlock(in->queue_pool_mutex);
      }
      else
      {
         if (in->config.syncQueueFull)
         {
            printf("PL_Queue::pay sync by sync_full_mutex\n");
                LOG_WARN << "PL_Queue::pay sync by sync_full_mutex" << LOG_ENDL;
            pthread_mutex_lock(in->sync_full_mutex);
         }
      }
   }
   
   if (in->que.size() >= in->config.maxBlockCount || in->pool.bp_empty())
   if (in->queBlocks.size() >= in->config.maxBlockCount || in->poolBlocks.bp_empty())
   {
      printf("PL_Queue::pay full\n");
        LOG_WARN << "PL_Queue::pay full" << LOG_ENDL;
      return false;
   }
   
   pthread_mutex_lock(in->queue_pool_mutex);
   qb = in->pool.bp_alloc();
   if (qb != nullptr)
      in->que.push(qb);
   pthread_mutex_unlock(in->queue_pool_mutex);
   qb = in->poolBlocks.bp_alloc();
   if (qb == nullptr)
   {
      printf("PL_Queue::pay bp_alloc nullptr\n");
        LOG_WARN << "PL_Queue::pay bp_alloc nullptr" << LOG_ENDL;
      return false;
   }
   if (in->config.copyData)
   {
      memcpy(qb->data, pm.buffer, pm.buffSize);
      qb->size = pm.buffSize;
   }
    bool cacheOk = false;
    if (pm.type == PipeMaterial::PMT_BYTES)
    {
        qb->lastPM = pm;
        if (in->config.copyData)
        {
            memcpy(qb->data, pm.buffer, pm.buffSize);
            qb->size = pm.buffSize;
            qb->lastPM.buffer = qb->data;
        }
        cacheOk = true;
    }
    else if (pm.type == PipeMaterial::PMT_FRAME)
    {
        qb->lastPM = pm;
        MB_Frame* mbf = (MB_Frame*)pm.buffer;
        qb->lastMbfs.push_back(*mbf);
        qb->lastPM.buffer = &(qb->lastMbfs[0]);
        if (in->config.copyData)
        {
            memcpy(qb->data, mbf->buffer, mbf->buffSize);
            qb->size = mbf->buffSize;
            qb->lastMbfs[0].buffer = qb->data;
        }
        cacheOk = true;
    }
    else if (pm.type == PipeMaterial::PMT_FRAME_LIST)
    {
        if (in->config.cacheFrameListFunc != nullptr)
        {
            size_t s = qb->maxSize;
            cacheOk = in->config.cacheFrameListFunc(pm, qb->lastPM, qb->lastMbfs, qb->data, s);
            if (cacheOk)
                qb->size = s;
        }
    }
    if (! cacheOk)
    {
        in->poolBlocks.bp_free(qb);
        LOG_WARN << "pm/mbf type not support" << LOG_ENDL;
        return false;
    }
    pthread_mutex_lock(in->queue_mutex);
    in->queBlocks.push(qb);
    pthread_mutex_unlock(in->queue_mutex);
   if (in->config.syncQueueEmpty)
      pthread_mutex_unlock(in->sync_empty_mutex);
   return true;
}
void PL_Queue::pm_deleter_qb(PipeMaterial* pm)
void PL_Queue::pm_deleter_qb(PipeMaterial* pm, bool lastRet)
{
   if (pm->former == nullptr || pm->args == nullptr)
      return;
@@ -316,25 +403,30 @@
   PL_Queue* _this = (PL_Queue*)pm->former;
   QBlock* qb = (QBlock*)pm->args;
   PL_Queue_Internal* in = (PL_Queue_Internal*)_this->internal;
   pthread_mutex_lock(in->queue_pool_mutex);
   in->pool.bp_free(qb);
   pthread_mutex_unlock(in->queue_pool_mutex);
   pthread_mutex_lock(in->queue_mutex);//#todo lastRet
    in->queBlocks.pop();
    pthread_mutex_unlock(in->queue_mutex);
    in->poolBlocks.bp_free(qb);
    if (in->config.syncQueueFull)
        pthread_mutex_unlock(in->sync_full_mutex);
}
bool PL_Queue::gain(PipeMaterial& pm)
{
   PL_Queue_Internal* in = (PL_Queue_Internal*)internal;
   
   if (in->que.empty())
   if (in->queBlocks.empty())
   {
      if (in->config.syncQueueEmpty)
         pthread_mutex_lock(in->sync_empty_mutex);
   }
   
   if (in->que.empty())
   if (in->queBlocks.empty())
   {
      printf("PL_Queue::gain empty\n");
      LOG_DEBUG << "PL_Queue::gain empty" << LOG_ENDL;
      pm.buffer = nullptr;
      pm.buffSize = 0;
      pm.former = this;
@@ -342,19 +434,15 @@
   }
   QBlock* qb = nullptr;
   pthread_mutex_lock(in->queue_pool_mutex);
   qb = in->que.front();
   in->que.pop();
   pthread_mutex_unlock(in->queue_pool_mutex);
   if (in->config.syncQueueFull)
      pthread_mutex_unlock(in->sync_full_mutex);
   pm.type = PipeMaterial::PMT_BYTES;
   pm.buffer = qb->data;
   pm.buffSize = qb->size;
   pm.former = this;
   pm.deleter = pm_deleter_qb;
   pm.args = qb;
   pthread_mutex_lock(in->queue_mutex);
   qb = in->queBlocks.front();
   //in->queBlocks.pop(); // pop in deleter
   pthread_mutex_unlock(in->queue_mutex);
    pm = qb->lastPM;
    pm.former = this;
    pm.deleter = pm_deleter_qb;
    pm.args = qb;
   return true;
}