video analysis2.0拆分,ffmpeg封装go接口库
zhangmeng
2019-09-18 b73029149580370e62dd6c14a270aea902f85cf2
csrc/worker/rec.cpp
File was renamed from csrc/rec.cpp
@@ -1,12 +1,12 @@
#include "rec.hpp"
#include <unistd.h>
#include <sys/time.h>
#include "ffmpeg/format/FormatIn.hpp"
#include "ffmpeg/data/CodedData.hpp"
#include "buz/recorder.hpp"
#include "ffmpeg/log/log.hpp"
#include "common/callback.hpp"
#include "../ffmpeg/format/FormatIn.hpp"
#include "../ffmpeg/data/CodedData.hpp"
#include "../ffmpeg/log/log.hpp"
#include "../common/callback.hpp"
using namespace logif;
using namespace ffwrapper;
@@ -14,48 +14,15 @@
namespace cffmpeg_wrap
{
    rec::rec(ffwrapper::FormatIn *in)
    :recRef_(in)
    rec::rec()
    :recRef_(NULL)
    ,minduration_(250)
    ,maxduration_(750)
    {}
    rec::~rec()
    {
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            map_rec_.clear();
        }
        {
            std::lock_guard<std::mutex> l(mtx_pkt_);
            list_pkt_.clear();
        }
    }
    std::unique_ptr<Recorder> rec::newRec(std::string id, std::string dir, const int mind, const int maxd){
        if(!recRef_){
            logIt("Init wrapper first");
            return nullptr;
        }
        std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
        rec->SetCallback([&](std::string &id, int &index, std::string &path){
            setRecInfo(id, index, path);
        });
        int trycnt = 0;
        while(trycnt < 100){
            const int ret = rec->Run(dir.c_str(), mind, maxd);
            if(ret == 0) break;
            usleep(200000);
        }
        if (trycnt < 100){
            return rec;
        }
        return nullptr;
        clear();
    }
    void rec::setRecInfo(std::string &id, int &index, std::string &path){
@@ -71,7 +38,33 @@
        info.fPath = path;
        info.recID = id;
        list_recInfo_.emplace_back(info);
        logIt("LIST REC FILES COUNT : %d", list_recInfo_.size());
    }
    std::unique_ptr<buz::Recorder> rec::startRec(std::string id, std::string dir, const int mind, const int maxd){
        if(!recRef_){
            logIt("Init wrapper first");
            return nullptr;
        }
        std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
        rec->SetCallback([&](std::string &id, int &index, std::string &path){
            setRecInfo(id, index, path);
        });
        int trycnt = 0;
        while(trycnt < 100){
            auto ret = rec->Run(dir.c_str(), mind, maxd);
            if(ret == 0) break;
            usleep(200000);
        }
        if (trycnt < 100){
            std::lock_guard<std::mutex> locker(mtx_pkt_);
            rec->PushPackets(list_pkt_);
            return rec;
        }
        return nullptr;
    }
    void rec::GetRecInfo(std::string &recID, int &index, std::string &path){
@@ -79,27 +72,35 @@
        // 获取信息
        {
            std::lock_guard<std::mutex> l(mtx_recInfo_);
            if(list_recInfo_.empty()){
                index = -1;
                path = "";
                return;
            if(!list_recInfo_.empty()){
                auto info = list_recInfo_.front();
                recID = info.recID;
                index = info.frmIdx;
                path = info.fPath;
                list_recInfo_.pop_front();
            }
            auto info = list_recInfo_.front();
            recID = info.recID;
            index = info.frmIdx;
            path = info.fPath;
            list_recInfo_.pop_front();
        }
        // 删除rec实例
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            if (map_rec_.find(recID) != map_rec_.end())
            if (map_rec_.empty()){
                return;
            }
            if (map_rec_.find(recID) != map_rec_.end()){
                map_rec_.erase(recID);
                return;
            }
            for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
                if (iter->second.rec && iter->second.rec->ErrorOcurred()){
                if (iter->second && iter->second->ErrorOcurred()){
                    recID = iter->first;
                    index = -1;
                    path = "";
                    iter == map_rec_.erase(iter);
                    break;
                }else{
                    iter++;
                }
@@ -107,54 +108,73 @@
        }
    }
    void rec::clear(){
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            map_rec_.clear();
        }
        {
            std::lock_guard<std::mutex> l(mtx_pkt_);
            list_pkt_.clear();
        }
    }
    void rec::Load(ffwrapper::FormatIn *in){
        recRef_ = in;
    }
    void rec::Unload(){
        recRef_ = NULL;
        clear();
    }
    const bool rec::Loaded() const{
        return recRef_ != NULL;
    }
    void rec::NewRec(const char* id, const char *output, const int mindur, const int maxdur){
        std::string rid(id);
        std::string dir(output);
        
        minduration_ = mindur * 25;
        maxduration_ = maxdur * 25;
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            if (map_rec_.find(rid) != map_rec_.end()){
                map_rec_.erase(rid);
            }
            map_rec_[rid] = {rid, dir, mindur, maxdur, nullptr};
            map_rec_[rid] = startRec(rid, dir, mindur, maxdur);
        }
        
        minduration_ = mindur * 25;
        maxduration_ = maxdur * 25;
    }
    void rec::FireRecSignal(const char* sid,const int64_t &id){
    void rec::FireRecSignal(const char* sid, const int64_t &id){
        std::lock_guard<std::mutex> l(mtx_rec_);
        auto iter = map_rec_.find(sid);
        if (iter != map_rec_.end()){
            if(iter->second.rec){
                iter->second.rec->FireRecorder(id);
            if(iter->second){
                iter->second->FireRecorder(id);
            }
        }
        // logIt("recorders count: %d", map_rec_.size());
    }
    void rec::SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
        if (!data) return;
        cachePacket(data, id);
        std::lock_guard<std::mutex> l(mtx_rec_);
        for(auto &i : map_rec_){
            if (!i.second.rec){
                i.second.rec = newRec(i.second.rid, i.second.dir, i.second.min, i.second.max);
                if (i.second.rec){
                    //此函数还未退出,不需要这个锁
                    // std::lock_guard<std::mutex> locker(mtx_pkt_);
                    for(auto &k : list_pkt_){
                        i.second.rec->CachePacket({k.data, k.id});
                    }
                    // 新的数据缓存
                    i.second.rec->CachePacket({data, id});
                    logIt("START REC %d FRAMES", list_pkt_.size());
                }
            }else if (i.second.rec){
                i.second.rec->CachePacket({data, id});
            if (i.second){
                i.second->PushPacket({data, id});
            }
        }
        cachePacket(data, id);
    }
    void rec::cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
@@ -173,7 +193,8 @@
    int rec::shrinkCache(){
        //超过最大缓存,丢弃gop
        while (list_pkt_.size() > minduration_) {
        //缓存最小长度的,用于记录
        while (list_pkt_.size() > minduration_/2) {
            list_pkt_.pop_front();
            while(!list_pkt_.empty()){
                auto &cache = list_pkt_.front();