xuxiuxi
2017-07-27 f5948ed9f21e65288f8772a3a9df0ab68bf4f727
RtspFace/PipeLine.cpp
@@ -1,4 +1,5 @@
#include "PipeLine.h"
#include "MaterialBuffer.h"
#include "logger.h"
PipeMaterial::PipeMaterial() : 
@@ -7,16 +8,122 @@
{
}
void PipeMaterial::exec_deleter()
void PipeMaterial::exec_deleter(bool lastRet)
{
   if (deleter != nullptr)
   {
      deleter(this);
      deleter(this, lastRet);
      deleter = nullptr;
   }
}
PipeLine::PipeLine() : global_params_map(), elem_create_func_map(), elems()
int PipeMaterial::breake(PipeMaterialBufferType selectPmType, int _selectMbfType,
   pm_breaker_func breaker, void* args /*= nullptr*/) const
{
   int called = 0;
   const MB_Frame::MBFType selectMbfType = (MB_Frame::MBFType)_selectMbfType;
   switch(type)
   {
   case PMT_NONE:
      if (selectPmType == PMT_NONE)
      {
         called++;
         breaker(this, args);
      }
   break;
   case PMT_BYTES:
      if (selectPmType == PMT_BYTES)
      {
         called++;
         breaker(this, args);
      }
   break;
   case PMT_PTR:
      if (selectPmType == PMT_PTR)
      {
         called++;
         breaker(this, args);
      }
   break;
   case PMT_FRAME:
      if (selectPmType == PMT_FRAME || selectPmType == PipeMaterial::PMT__FIRST || selectPmType == PipeMaterial::PMT__LAST)
      {
         MB_Frame* mbf = (MB_Frame*)buffer;
         if (selectMbfType == mbf->type || selectMbfType == MB_Frame::MBFT__FIRST || selectMbfType == MB_Frame::MBFT__LAST)
         {
            called++;
            breaker(this, args);
         }
      }
   break;
   case PMT_PM_LIST:
   {
      // break pm list into single pm(s)
      PipeMaterial* pm = (PipeMaterial*)buffer;
      for (size_t i = 0; i < buffSize; i++, pm++)
      {
         if (selectPmType == pm->type || selectPmType == PipeMaterial::PMT__FIRST || selectPmType == PipeMaterial::PMT__LAST)
         {
            if (pm->type == PipeMaterial::PMT_FRAME)
            {
               MB_Frame *mbf = (MB_Frame *) pm->buffer;
               if (selectMbfType == mbf->type || selectMbfType == MB_Frame::MBFT__FIRST || selectMbfType == MB_Frame::MBFT__LAST)
               {
                  called++;
                  if (!breaker(pm, args))
                     break;
               }
            }
            else
            {
               called++;
               if (!breaker(pm, args))
                  break;
            }
         }
      }
   }
   break;
   case PMT_FRAME_LIST:
   {
      // break mbf list into single pm(s)
      if (selectPmType == PMT_FRAME_LIST || selectPmType == PMT_FRAME ||
         selectPmType == PipeMaterial::PMT__FIRST || selectPmType == PipeMaterial::PMT__LAST)
      {
         MB_Frame* mbf = (MB_Frame*)buffer;
         for (size_t i = 0; i < buffSize; i++, mbf++)
         {
            if (selectMbfType == mbf->type || selectMbfType == MB_Frame::MBFT__FIRST || selectMbfType == MB_Frame::MBFT__LAST)
            {
               PipeMaterial tmpPm;
               tmpPm.type = PMT_FRAME;
               tmpPm.buffer = mbf;
               tmpPm.buffSize = 0;
               tmpPm.former = former;
               tmpPm.deleter = nullptr;
               tmpPm.args = args;
               called++;
               if (!breaker(&tmpPm, args))
                  break;
            }
         }
      }
   }
   break;
   default:
   break;
   }
   return called;
}
PipeLine::elem_create_func_map_t PipeLine::global_elem_create_func_map;
PipeLine::PipeLine() : params_map(), elem_create_func_map(), elems()
{
}
@@ -27,7 +134,7 @@
   for(elem_vec_t::iterator iter = elems.begin(); iter != elems.end(); ++iter)
   {
      PipeLineElem* elem = *iter;
      if (elem != nullptr)
      if (elem != nullptr && elem->manager == this)
      {
         elem->finit();
         delete *iter;
@@ -50,6 +157,20 @@
   return true;
}
//static
bool PipeLine::register_global_elem_creator(const std::string& type, elem_create_func_t func)
{
   if (type.empty() || func == nullptr)
      return false;
   elem_create_func_map_t::iterator iter = global_elem_create_func_map.find(type);
   if (iter != global_elem_create_func_map.end())
      return false;
   global_elem_create_func_map.insert(std::make_pair(type, func));
   return true;
}
void PipeLine::push_elem(PipeLineElem* elem)
{
   if(elem != nullptr)
@@ -59,11 +180,74 @@
   }
}
void PipeLine::push_front_elem(PipeLineElem* elem)
{
   if(elem != nullptr)
   {
      elem->manager = this;
      elems.insert(elems.begin(), elem);
   }
}
PipeLineElem* PipeLine::at(int idx)
{
   return elems[idx];
}
bool PipeLine::remove_elem(PipeLineElem* elem)
{
   if(elem != nullptr)
   {
      for(elem_vec_t::iterator iter = elems.begin(); iter != elems.end(); ++iter)
      {
         if (*iter == elem)
         {
            iter = elems.erase(iter);
            return true;
         }
      }
   }
   return false;
}
void PipeLine::finit(elem_destory_func_t elem_destory_func)
{
   while (!elems.empty())
   {
      PipeLineElem* elem = elems.back();
      if (elem->manager == this)
      {
         elem->finit();
         if (elem_destory_func != nullptr)
            elem_destory_func(elem);
      }
      elems.pop_back();
   }
   elem_create_func_map.clear();
   params_map.clear();
}
bool PipeLine::check_pipe_complete(PipeLineElem* lastRetElem) const
{
   if (elems.empty())
      return lastRetElem == nullptr;
   else
      return lastRetElem == *elems.rbegin();
}
PipeLineElem* PipeLine::push_elem(const std::string& type)
{
   elem_create_func_map_t::iterator iter = elem_create_func_map.find(type);
   if (iter == elem_create_func_map.end())
      return nullptr;
   {
      iter = global_elem_create_func_map.find(type);
      if (iter == global_elem_create_func_map.end())
         return nullptr;
   }
   
   elem_create_func_t func = iter->second;
   if (func == nullptr)
@@ -78,35 +262,8 @@
   return elem;
}
class PipeDebugger
{
private:
   PipeLine* pipeLine;
public:
   PipeLineElem* retElem;
   PipeMaterial* pm;
   PipeDebugger(PipeLine* _pipeLine) :
      pipeLine(_pipeLine), retElem(nullptr), pm(nullptr)
   {
      LOG(DEBUG) << "pipe line begin";
   }
   ~PipeDebugger()
   {
      bool retOK = (*(pipeLine->elems).rbegin() == retElem);
      if (retOK)
         LOG(DEBUG) << "pipe line end, ret OK";
      else
         LOG(WARN) << "pipe line end, ret ERROR";
   }
};
PipeLineElem* PipeLine::pipe(PipeMaterial* pm /*= nullptr*/)
{
   PipeDebugger debugger(this);
   PipeLineElem* elem_begin = *elems.begin();
   PipeLineElem* elem_last = *elems.rbegin();
@@ -117,31 +274,31 @@
   if (pm == nullptr)
      pm = new (pmPlacement) PipeMaterial;
   
   debugger.pm = pm;
   bool lastRet = true;
   if (elems.size() == 1)
   {
      elem_begin->gain(*pm);
      pm->exec_deleter();
      return debugger.retElem = elem_begin;
      lastRet = elem_begin->gain(*pm);
      pm->exec_deleter(lastRet);
      return elem_begin;
   }
   else if (elems.size() == 2)
   {
      if (elem_begin->gain(*pm))
      {
         elem_last->pay(*pm);
         pm->exec_deleter();
         lastRet = elem_last->pay(*pm);
         pm->exec_deleter(lastRet);
      }
      else
         return debugger.retElem = elem_begin;
      return debugger.retElem = elem_last;
         return elem_begin;
      return elem_last;
   }
   else
   {
      if (!elem_begin->gain(*pm))
         return debugger.retElem = elem_begin;
         return elem_begin;
      
      bool lastRet = true;
      lastRet = true;
      elem_vec_t::iterator iter = elems.begin();
      ++iter;
      elem_begin = *iter;
@@ -149,44 +306,66 @@
      {
         if (lastRet && (lastRet = elem_begin->pay(*pm)) )
         {
            pm->exec_deleter();
            pm->exec_deleter(lastRet);
            lastRet = elem_begin->gain(*pm);
         }
         else
            return debugger.retElem = elem_begin;
            return elem_begin;
         
         ++iter;
         elem_begin = *iter;
      }
   
      if (lastRet)
      {
         elem_last->pay(*pm);
         pm->exec_deleter();
      }
      return debugger.retElem = elem_last;
      pm->exec_deleter(lastRet);
      return elem_last;
   }
   
   return nullptr;
}
void PipeLine::set_global_param(const std::string& name, const std::string& value)
void PipeLine::set_param(const std::string& name, const std::string& value)
{
   if (name.empty())
      return;
   global_params_map_t::iterator iter = global_params_map.find(name);
   if (iter == global_params_map.end())
      global_params_map.insert(std::make_pair(name, value));
   params_map_t::iterator iter = params_map.find(name);
   if (iter == params_map.end())
      params_map.insert(std::make_pair(name, value));
   else
      iter->second = value;
}
std::string PipeLine::get_global_param(const std::string& name) const
std::string PipeLine::get_param(const std::string& name) const
{
   global_params_map_t::const_iterator iter = global_params_map.find(name);
   if (iter == global_params_map.end())
   params_map_t::const_iterator iter = params_map.find(name);
   if (iter == params_map.end())
      return "";
   else
      return iter->second;
}
#ifdef ENABLE_PIPELINE_ELEM_TIMING_DEBUGGER
#include <typeinfo>
#include <sys/time.h>
PipeLineElemTimingDebugger::PipeLineElemTimingDebugger(const PipeLineElem* _elem) :
   elem(_elem), beginTime(0)
{
   struct timeval _beginTime;
   gettimeofday(&_beginTime, NULL);
   beginTime = 1000000 * _beginTime.tv_sec + _beginTime.tv_usec;
}
PipeLineElemTimingDebugger::~PipeLineElemTimingDebugger()
{
   struct timeval _endTime;
   gettimeofday(&_endTime, NULL);
   const uint64_t endTime = 1000000 * _endTime.tv_sec + _endTime.tv_usec;
   LOGP(WARN, "elem=%s@%llu, dura=%llu(us)", typeid(elem).name(), uint64_t(elem), uint64_t(endTime - beginTime));
}
#else
PipeLineElemTimingDebugger::PipeLineElemTimingDebugger(const PipeLineElem* _elem) { }
PipeLineElemTimingDebugger::~PipeLineElemTimingDebugger() { }
#endif