houxiao
2017-07-13 b022b91c0c6fa807424b6c12cc92ac5946838083
RtspFace/PipeLine.cpp
@@ -1,4 +1,5 @@
#include "PipeLine.h"
#include "MaterialBuffer.h"
#include "logger.h"
PipeMaterial::PipeMaterial() : 
@@ -7,13 +8,117 @@
{
}
void PipeMaterial::exec_deleter()
void PipeMaterial::exec_deleter(bool lastRet)
{
   if (deleter != nullptr)
   {
      deleter(this);
      deleter(this, lastRet);
      deleter = nullptr;
   }
}
int PipeMaterial::breake(PipeMaterialBufferType selectPmType, int _selectMbfType,
   pm_breaker_func breaker, void* args /*= nullptr*/) const
{
   int called = 0;
   const MB_Frame::MBFType selectMbfType = (MB_Frame::MBFType)_selectMbfType;
   switch(type)
   {
   case PMT_NONE:
      if (selectPmType == PMT_NONE)
      {
         called++;
         breaker(this, args);
      }
   break;
   case PMT_BYTES:
      if (selectPmType == PMT_BYTES)
      {
         called++;
         breaker(this, args);
      }
   break;
   case PMT_PTR:
      if (selectPmType == PMT_PTR)
      {
         called++;
         breaker(this, args);
      }
   break;
   case PMT_FRAME:
      if (selectPmType == PMT_FRAME || selectPmType == PipeMaterial::PMT__FIRST || selectPmType == PipeMaterial::PMT__LAST)
      {
         MB_Frame* mbf = (MB_Frame*)buffer;
         if (selectMbfType == mbf->type || selectMbfType == MB_Frame::MBFT__FIRST || selectMbfType == MB_Frame::MBFT__LAST)
         {
            called++;
            breaker(this, args);
         }
      }
   break;
   case PMT_PM_LIST:
   {
      // break pm list into single pm(s)
      PipeMaterial* pm = (PipeMaterial*)buffer;
      for (size_t i = 0; i < buffSize; i++, pm++)
      {
         if (selectPmType == pm->type || selectPmType == PipeMaterial::PMT__FIRST || selectPmType == PipeMaterial::PMT__LAST)
            {
                if (pm->type == PipeMaterial::PMT_FRAME)
                {
                    MB_Frame *mbf = (MB_Frame *) pm->buffer;
                    if (selectMbfType == mbf->type || selectMbfType == MB_Frame::MBFT__FIRST || selectMbfType == MB_Frame::MBFT__LAST)
               {
                  called++;
                        if (!breaker(pm, args))
                            break;
               }
                }
                else
                {
               called++;
                    if (!breaker(pm, args))
                        break;
                }
            }
      }
   }
   break;
   case PMT_FRAME_LIST:
   {
      // break mbf list into single pm(s)
      if (selectPmType == PMT_FRAME_LIST || selectPmType == PMT_FRAME ||
         selectPmType == PipeMaterial::PMT__FIRST || selectPmType == PipeMaterial::PMT__LAST)
      {
         MB_Frame* mbf = (MB_Frame*)buffer;
         for (size_t i = 0; i < buffSize; i++, mbf++)
         {
            if (selectMbfType == mbf->type || selectMbfType == MB_Frame::MBFT__FIRST || selectMbfType == MB_Frame::MBFT__LAST)
            {
               PipeMaterial tmpPm;
               tmpPm.type = PMT_FRAME;
               tmpPm.buffer = mbf;
               tmpPm.buffSize = 0;
               tmpPm.former = former;
               tmpPm.deleter = nullptr;
               tmpPm.args = args;
               called++;
               if (!breaker(&tmpPm, args))
                  break;
            }
         }
      }
   }
   break;
   default:
   break;
   }
   return called;
}
PipeLine::elem_create_func_map_t PipeLine::global_elem_create_func_map;
@@ -104,7 +209,10 @@
bool PipeLine::check_pipe_complete(PipeLineElem* lastRetElem) const
{
   return lastRetElem == *elems.rbegin();
   if (elems.empty())
      return lastRetElem == nullptr;
   else
      return lastRetElem == *elems.rbegin();
}
PipeLineElem* PipeLine::push_elem(const std::string& type)
@@ -130,35 +238,8 @@
   return elem;
}
class PipeDebugger
{
private:
   PipeLine* pipeLine;
public:
   PipeLineElem* retElem;
   PipeMaterial* pm;
   PipeDebugger(PipeLine* _pipeLine) :
      pipeLine(_pipeLine), retElem(nullptr), pm(nullptr)
   {
      //LOG_DEBUG << "pipe line begin" << std::endl;
   }
   ~PipeDebugger()
   {
      //bool retOK = (*(pipeLine->elems).rbegin() == retElem);
      //if (retOK)
      //   LOG_DEBUG << "pipe line end, ret OK" << std::endl;
      //else
      //   LOG_WARN << "pipe line end, ret ERROR" << std::endl;
   }
};
PipeLineElem* PipeLine::pipe(PipeMaterial* pm /*= nullptr*/)
{
   PipeDebugger debugger(this);
   PipeLineElem* elem_begin = *elems.begin();
   PipeLineElem* elem_last = *elems.rbegin();
@@ -169,31 +250,31 @@
   if (pm == nullptr)
      pm = new (pmPlacement) PipeMaterial;
   
   debugger.pm = pm;
    bool lastRet = true;
   if (elems.size() == 1)
   {
      elem_begin->gain(*pm);
      pm->exec_deleter();
      return debugger.retElem = elem_begin;
      lastRet = elem_begin->gain(*pm);
      pm->exec_deleter(lastRet);
      return elem_begin;
   }
   else if (elems.size() == 2)
   {
      if (elem_begin->gain(*pm))
      {
         elem_last->pay(*pm);
         pm->exec_deleter();
         lastRet = elem_last->pay(*pm);
         pm->exec_deleter(lastRet);
      }
      else
         return debugger.retElem = elem_begin;
      return debugger.retElem = elem_last;
         return elem_begin;
      return elem_last;
   }
   else
   {
      if (!elem_begin->gain(*pm))
         return debugger.retElem = elem_begin;
         return elem_begin;
      
      bool lastRet = true;
      lastRet = true;
      elem_vec_t::iterator iter = elems.begin();
      ++iter;
      elem_begin = *iter;
@@ -201,22 +282,20 @@
      {
         if (lastRet && (lastRet = elem_begin->pay(*pm)) )
         {
            pm->exec_deleter();
            pm->exec_deleter(lastRet);
            lastRet = elem_begin->gain(*pm);
         }
         else
            return debugger.retElem = elem_begin;
            return elem_begin;
         
         ++iter;
         elem_begin = *iter;
      }
   
      if (lastRet)
      {
         elem_last->pay(*pm);
         pm->exec_deleter();
      }
      return debugger.retElem = elem_last;
        pm->exec_deleter(lastRet);
        return elem_last;
   }
   
   return nullptr;
@@ -242,3 +321,27 @@
   else
      return iter->second;
}
#ifdef ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER
#include <typeinfo>
#include <sys/time.h>
PipeLineElemTimingDebugger::PipeLineElemTimingDebugger(const PipeLineElem* _elem) :
   elem(_elem), beginTime(0)
{
    struct timeval _beginTime;
    gettimeofday(&_beginTime, NULL);
    beginTime = 1000000 * _beginTime.tv_sec + _beginTime.tv_usec;
}
PipeLineElemTimingDebugger::~PipeLineElemTimingDebugger()
{
    struct timeval _endTime;
    gettimeofday(&_endTime, NULL);
    const uint64_t endTime = 1000000 * _endTime.tv_sec + _endTime.tv_usec;
    LOGP(WARN, "elem=%s@%llu, dura=%llu(us)", typeid(elem).name(), uint64_t(elem), uint64_t(endTime - beginTime));
}
#else
PipeLineElemTimingDebugger::PipeLineElemTimingDebugger(const PipeLineElem* _elem) { }
PipeLineElemTimingDebugger::~PipeLineElemTimingDebugger() { }
#endif