video analysis2.0拆分,ffmpeg封装go接口库
zhangmeng
2019-08-19 54ea1c13885725584a6a50d520f67e8a75f85b6f
fix rec
4个文件已修改
123 ■■■■ 已修改文件
csrc/buz/recorder.cpp 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/buz/recorder.hpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/wrapper.cpp 91 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/wrapper.hpp 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/buz/recorder.cpp
@@ -3,6 +3,7 @@
#include <thread>
#include <unistd.h>
#include <chrono>
extern "C"{
#include <libavcodec/avcodec.h>
@@ -32,6 +33,7 @@
        ,file_path_("")
        ,func_rec_info_(nullptr)
        ,thrd_(nullptr)
        ,error_occured_(false)
        {
            // logIt("RECODER ID: %s", id_.c_str());
        }
@@ -41,8 +43,11 @@
            try
            {
                if (thrd_){
                    stop_recorder_.store(true);
                    cv_.notify_one();
                    {
                        std::unique_lock<std::mutex> locker(mutex_pkt_);
                        stop_recorder_.store(true);
                        cv_.notify_one();
                    }
                    thrd_->join();
                    logIt("REC THREAD JOINED, QUIT!!!");
                }
@@ -138,9 +143,15 @@
                std::list<avpacket> pkts;
                {
                    std::unique_lock<std::mutex> locker(mutex_pkt_);
                    cv_.wait(locker,[&]{
                    auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
                        return !list_pkt_.empty() || stop_recorder_.load();
                    });
                    if (!status){
                        end_writer();
                        error_occured_ = true;
                        break;
                    }
                    if(stop_recorder_.load()){
                        end_writer();
                        break;
csrc/buz/recorder.hpp
@@ -40,6 +40,8 @@
                void SetCallback(FUNC_REC_INFO cb){
                    func_rec_info_ = cb;
                }
                const bool ErrorOcurred(){return error_occured_;}
            private:
                void run_thread();
@@ -73,6 +75,8 @@
                int                     file_frame_index_;
                std::string             file_path_;
                FUNC_REC_INFO           func_rec_info_;
                bool                    error_occured_;
        };
    }
}
csrc/wrapper.cpp
@@ -149,7 +149,10 @@
                auto data(std::make_shared<CodedData>());
                if(!in->readPacket(data)){
                    logIt("read packet error");
                    pkt.id = -1; data = nullptr; id = 0;
                    data.reset();
                    data = nullptr;
                    pkt.id = -1;
                    id = 0;
                }else{
                    pkt.id = id++;
                }
@@ -160,7 +163,10 @@
                run_worker(in.get(), pkt);
                if(!data){
                    map_rec_.clear();
                    {
                        std::lock_guard<std::mutex> l(mutex_rec_);
                        map_rec_.clear();
                    }
                    std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
                    list_rec_pkt_.clear();
@@ -205,20 +211,25 @@
            }
        }
        cache_rec_pkt(pkt);
        for(auto &i : map_rec_){
            if (!i.second.rec){
                i.second.rec = i.second.fn_init(in);
                if (i.second.rec){
                    std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
                    for(auto &k : list_rec_pkt_){
                        avpacket p = {k.data, k.id};
                        i.second.rec->CachePacket(p);
        {
            std::lock_guard<std::mutex> l(mutex_rec_);
            for(auto &i : map_rec_){
                if (!i.second.rec){
                    i.second.rec = std::move(init_recorder(in, i.second.rid, i.second.dir, i.second.min, i.second.max));
                    if (i.second.rec){
                        std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
                        for(auto &k : list_rec_pkt_){
                            avpacket p = {k.data, k.id};
                            i.second.rec->CachePacket(p);
                        }
                        logIt("START REC %d FRAMES", list_rec_pkt_.size());
                    }
                    logIt("START REC %d FRAMES", list_rec_pkt_.size());
                }else if (i.second.rec){
                    i.second.rec->CachePacket(pkt);
                }
            }else if (i.second.rec){
                i.second.rec->CachePacket(pkt);
            }
        }
    }
@@ -255,13 +266,13 @@
    }
    //////////////recorder
    std::shared_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
    std::unique_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
        if(!in){
            logIt("Init wrapper first");
            return nullptr;
        }
        auto rec = std::make_shared<Recorder>(in, id.c_str());
        std::unique_ptr<Recorder> rec(new Recorder(in, id.c_str()));
        rec->SetCallback([&](std::string &id, int &index, std::string &path){
            cache_rec_info(id, index, path);
@@ -283,25 +294,50 @@
        std::string rid(id);
        std::string dir(output);
        
        auto fn = [=](FormatIn *in){
            return init_recorder(in, rid, dir, mindur, maxdur);
        };
        std::shared_ptr<Recorder> rec(nullptr);
        FnRec r = FnRec{fn, rec};
        map_rec_[rid] = r;
        std::lock_guard<std::mutex> l(mutex_rec_);
        // auto fn = [=](FormatIn *in){
        //     return init_recorder(in, rid, dir, mindur, maxdur);
        // };
        // FnRec r = FnRec{fn, nullptr};
        if (map_rec_.find(rid) != map_rec_.end()){
            map_rec_.erase(rid);
        }
        // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
        //     if (iter->second.rec && iter->second.rec->ErrorOcurred()){
        //         iter == map_rec_.erase(iter);
        //     }else{
        //         iter++;
        //     }
        // }
        FnRec fr;
        fr.rid = rid;
        fr.dir = dir;
        fr.min = mindur;
        fr.max = maxdur;
        map_rec_[rid] = std::move(fr);
        minduration = mindur * 25;
        maxduration = maxdur * 25;
    }
    int Wrapper::FireRecorder(const char* sid,const int64_t &id){
        std::lock_guard<std::mutex> l(mutex_rec_);
        auto iter = map_rec_.find(sid);
        if (iter != map_rec_.end()){
            if(iter->second.rec){
                iter->second.rec->FireRecorder(id);
            }
        }
        // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
        //     if (iter->second.rec && iter->second.rec->ErrorOcurred()){
        //         iter == map_rec_.erase(iter);
        //     }else{
        //         iter++;
        //     }
        // }
    }
    void Wrapper::cache_rec_info(std::string &id, int &index, std::string &path){
@@ -336,10 +372,21 @@
        if (map_rec_.find(info.rec_id) != map_rec_.end())
            map_rec_.erase(info.rec_id);
        for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
            if (iter->second.rec && iter->second.rec->ErrorOcurred()){
                iter == map_rec_.erase(iter);
            }else{
                iter++;
            }
        }
        // logIt("go get info index: %d, file: %s\n", index, path.c_str());
    }
    std::string Wrapper::GetRecorderID(const std::string &path){
        std::lock_guard<std::mutex> l(mutex_rec_);
        std::string ret("");
        auto iter = list_rec_map_.find(path);
        if (iter != list_rec_map_.end()){
csrc/wrapper.hpp
@@ -44,11 +44,14 @@
        struct avpacket;
    }
    typedef std::function<std::shared_ptr<buz::Recorder>(ffwrapper::FormatIn*)> FN_REC;
    // typedef std::function<std::shared_ptr<buz::Recorder>(ffwrapper::FormatIn*)> FN_REC;
    typedef struct _fn_rec{
        FN_REC fn_init;
        std::shared_ptr<buz::Recorder> rec;
        std::string rid;
        std::string dir;
        int min;
        int max;
        std::unique_ptr<buz::Recorder> rec;
    }FnRec;
    class Wrapper{
@@ -60,7 +63,7 @@
            std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
            // ffwrapper::FormatIn* init_reader_gb28181(const char* input);
            void run_worker(ffwrapper::FormatIn *in, buz::avpacket &pkt);
            std::shared_ptr<buz::Recorder> init_recorder(ffwrapper::FormatIn *in, std::string id,std::string dir, const int mind, const int maxd);
            std::unique_ptr<buz::Recorder> init_recorder(ffwrapper::FormatIn *in, std::string id,std::string dir, const int mind, const int maxd);
            void cache_rec_info(std::string &id, int &index, std::string &path);
            void cache_pic(std::shared_ptr<ffwrapper::FrameData> &frame, int64_t &id);