video analysis2.0拆分,ffmpeg封装go接口库
zhangmeng
2019-09-18 b73029149580370e62dd6c14a270aea902f85cf2
fix rec bug
1个文件已删除
1个文件已添加
5 文件已重命名
7个文件已修改
682 ■■■■ 已修改文件
csrc/buz/recorder.cpp 221 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/buz/recorder.hpp 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/cffmpeg.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/ffmpeg/format/FormatIn.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/ffmpeg/format/FormatOut.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/stream.cpp 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/worker/decoder.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/worker/decoder.hpp 补丁 | 查看 | 原始文档 | blame | 历史
csrc/worker/rec.cpp 163 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/worker/rec.hpp 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/worker/stream.cpp 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/worker/stream.hpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/wrapper.cpp 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/wrapper.hpp 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
csrc/buz/recorder.cpp
@@ -25,11 +25,11 @@
        ,maxduration(30 * 25)
        ,minduration(10 * 25)
        ,end_frame(minduration)
        ,cur_frame(-1)
        ,cur_frame(0)
        ,stop_recorder_(false)
        ,id_(id)
        ,id_frame_(0)
        ,file_frame_index_(-1)
        ,id_frame_(-1)
        ,id_frame_in_file_(-1)
        ,file_path_("")
        ,func_rec_info_(nullptr)
        ,thrd_(nullptr)
@@ -43,13 +43,13 @@
            try
            {
                if (thrd_){
                    {
                        std::unique_lock<std::mutex> locker(mutex_pkt_);
                    if (!stop_recorder_.load()){
                        stop_recorder_.store(true);
                        cv_.notify_one();
                    }
                    thrd_->join();
                    logIt("REC THREAD JOINED, QUIT!!!");
                    // logIt("REC THREAD JOINED, QUIT!!!, %s", id_.c_str());
                }
            }
            catch(const std::exception& e)
@@ -71,121 +71,95 @@
            out_ = new FormatOut(in_->getStream(), "mp4");
            file_path_ = dir_ + "/" + sole::uuid4().base62() + ".mp4";
            auto ret = out_->JustWriter(in_->getStream(), file_path_.c_str());
            if (ret){
            return 0;
        }
        void Recorder::start_writer(){
            if (cur_frame == 0) {
                sole::uuid u4 = sole::uuid4();
                file_path_ = dir_ + "/" + u4.base62() + ".mp4";
                out_->JustWriter(in_->getStream(), file_path_.c_str());
                logIt("START RECORD %s", file_path_.c_str());
            }
            return -1;
        }
        int Recorder::write_correctly(const avpacket &pkt){
        int Recorder::write_correctly(const CPacket &pkt){
            //reader failed, break stream
            if(pkt.id == -1 && !pkt.data){
                end_writer();
                return -1;
            }
            if (cur_frame == end_frame){
                return 1;
            }
            // writer error, reinit writer
            int64_t cur = cur_frame++;
            AVPacket &op = pkt.data->getAVPacket();
            AVPacket np(op);
            av_copy_packet(&np, &op);
            if(!out_->writeFrame(np, cur)){
            auto ret = out_->writeFrame(np, cur);
                av_packet_unref(&np);
                end_writer();
                return -1;
            }
            av_packet_unref(&np);
            if (!ret) return -1;
            if(pkt.id == id_frame_){
                file_frame_index_ = cur_frame-1;
                id_frame_in_file_ = cur_frame-1;
            }
            // logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_);
            return 0;
        }
        void Recorder::end_writer(){
            if(cur_frame == -1) return;
            out_->endWriter();
            logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
                 file_frame_index_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
            //reinit cur_frame clear list pkt
            {
                std::lock_guard<std::mutex> locker(mutex_pkt_);
                cur_frame = -1;
                end_frame = minduration;
                list_pkt_.clear();
            }
            //callback to frame index and path
            if(func_rec_info_){
                func_rec_info_(id_, file_frame_index_, file_path_);
            }
        }
        void Recorder::run_thread(){
            bool reinit_writer = false;
            while(!stop_recorder_.load()){
                if (reinit_writer) {
                    while(!stop_recorder_.load()){
                        if(init_writer() == 0)
                            break;
                        usleep(300000);
                    }
                    if(stop_recorder_.load()) break;
                }
                std::list<avpacket> pkts;
                {
                    std::unique_lock<std::mutex> locker(mutex_pkt_);
                    auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
                        return !list_pkt_.empty() || stop_recorder_.load();
                    });
                    if (!status){
                        end_writer();
                        error_occured_ = true;
                        break;
                    }
                    if(stop_recorder_.load()){
                        end_writer();
                        break;
                    }
                    if(cur_frame == -1){
                        continue;
                    }
                    list_pkt_.swap(pkts);
                }
                if (cur_frame == 0) {
                    start_writer();
                }
                for(auto &i : pkts){
                    if (cur_frame < end_frame){
                        if(write_correctly(i) != 0){
                            stop_recorder_.store(true);
                            break;
                        }
                    }else{
                        end_writer();
                        stop_recorder_.store(true);
                        break;
                    }
                }
            }
            if (out_){
                delete out_;
                out_ = NULL;
            }
            // stop_recorder_.store(false);
            {
                std::lock_guard<std::mutex> l(mutex_pkt_);
                list_pkt_.clear();
            }
            // logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
            //      id_frame_in_file_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
            //callback to frame index and path
            if(func_rec_info_){
                func_rec_info_(id_,id_frame_in_file_, file_path_);
            }
        }
        void Recorder::run_thread(){
            while(!stop_recorder_.load()){
                std::list<CPacket> pkts;
                {
                    std::unique_lock<std::mutex> locker(mutex_pkt_);
                    auto status = cv_.wait_for(locker, std::chrono::seconds(3), [&]{
                        return !list_pkt_.empty() || stop_recorder_.load();
                    });
                    if (!status || stop_recorder_.load()){
                        error_occured_ = !status;
                        break;
                    }
                    list_pkt_.swap(pkts);
                }
                int ret = 0;
                for(auto &i : pkts){
                    ret = write_correctly(i);
                    if (ret != 0){
                        break;
                    }
                }
                if (ret != 0){
                    break;
                }
            }
            stop_recorder_.store(true);
            end_writer();
        }
        int Recorder::Run(const char* output, const int mind, const int maxd){
@@ -204,7 +178,7 @@
                end_frame = minduration;
            }
            logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);
            // logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);
            thrd_.reset(new std::thread([&]{
                run_thread();
@@ -215,38 +189,37 @@
        }
        int Recorder::FireRecorder(const int64_t &id){
            if(cur_frame == -1){
            if (stop_recorder_.load()) return -1;
            if(id_frame_ == -1){
                id_frame_ = id;
                logIt("FIRST FIRE RECORD ID: %lld", id);
                {
                    std::lock_guard<std::mutex> locker(mutex_pkt_);
                    cur_frame = 0;
                    if (list_pkt_.size() > end_frame){
                        end_frame = list_pkt_.size() + minduration/2;
                        if (end_frame > maxduration)
                            end_frame = maxduration;
                    }
                }
            }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){
                // logIt("FIRST FIRE RECORD ID: %lld, cur_frame: %d, end_frame: %d", id, cur_frame, end_frame);
            }else if(cur_frame > minduration/2 && end_frame < maxduration){
                end_frame = end_frame + minduration / 2;
                if(end_frame > maxduration){
                    end_frame = maxduration;
                }
                // logIt("PROLONG REC, cur_frame: %d, end_frame: %d", cur_frame, end_frame);
            }
            // logIt("FIRE REC FRAME ID: %lld", id);
            return 0;
        }
        int Recorder::CachePacket(const avpacket &pkt){
        int Recorder::PushPacket(const CPacket &pkt){
            if (stop_recorder_.load()) return 0;
            std::lock_guard<std::mutex> locker(mutex_pkt_);
            if(cur_frame == -1){
                //error occur, stream break
                if(pkt.id == -1 && pkt.data == nullptr){
                    list_pkt_.clear();
                    return -1;
                }
            if(id_frame_ == -1){
                //wait I 
                if (list_pkt_.empty()) {
                    AVPacket &avpkt = pkt.data->getAVPacket();
@@ -258,17 +231,43 @@
                maybe_dump_gop();
                list_pkt_.push_back(pkt);
                // cv_.notify_one();
            }else{
                list_pkt_.push_back(pkt);
                cv_.notify_one();
            }
            return 0;
            return list_pkt_.size();
        }
        int Recorder::PushPackets(std::list<CPacket> &lst){
            if (stop_recorder_.load()) return 0;
            std::lock_guard<std::mutex> locker(mutex_pkt_);
            bool i = false;
            for (auto &p : lst){
                if (!i){
                    AVPacket &avpkt = p.data->getAVPacket();
                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
                        continue;
                    }
                    i = true;
                }
                list_pkt_.push_back(p);
            }
            maybe_dump_gop();
            cv_.notify_one();
            // logIt("CACHE PACKET : %d", list_pkt_.size());
            return list_pkt_.size();
        }
        void Recorder::maybe_dump_gop(){
            //超过min/2,丢弃gop
            while (list_pkt_.size() > maxduration) {
            while (list_pkt_.size() > minduration) {
                list_pkt_.pop_front();
                while(!list_pkt_.empty()){
                    auto &cache = list_pkt_.front();
csrc/buz/recorder.hpp
@@ -22,10 +22,12 @@
namespace cffmpeg_wrap{
    namespace buz{
        struct avpacket{
        // 缓存的视频帧,等待fire触发开始录像
        typedef struct _cache_pkt{
            std::shared_ptr<ffwrapper::CodedData> data;
            int64_t id;
        };
        }CPacket;
        class Recorder{
            public:
@@ -34,7 +36,8 @@
            public: 
                int Run(const char* output, const int mind, const int maxd);
                int CachePacket(const avpacket &pkt);
                int PushPacket(const CPacket &pkt);
                int PushPackets(std::list<CPacket> &lst);
                int FireRecorder(const int64_t &id);
                void SetCallback(FUNC_REC_INFO cb){
@@ -42,12 +45,12 @@
                }
                const bool ErrorOcurred(){return error_occured_;}
                const std::string& RecID()const{return id_;}
            private:
                void run_thread();
                int init_writer();
                void start_writer();
                int write_correctly(const avpacket &pkt);
                int write_correctly(const CPacket &pkt);
                void end_writer();
                void maybe_dump_gop();
@@ -60,7 +63,7 @@
                int     end_frame;
                int     cur_frame;
                std::list<avpacket>     list_pkt_;
                std::list<CPacket>     list_pkt_;
                std::atomic_bool        stop_recorder_;
                std::mutex              mutex_pkt_;
@@ -72,7 +75,7 @@
                std::string             id_;
                int64_t                 id_frame_;
                int                     file_frame_index_;
                int                     id_frame_in_file_;
                std::string             file_path_;
                FUNC_REC_INFO           func_rec_info_;
csrc/cffmpeg.cpp
@@ -38,12 +38,12 @@
void c_ffmpeg_run_gb28181(const cffmpeg h){
    Wrapper *s = (Wrapper*)h;
    s->UseGB28181();
    s->GB28181();
}
void c_ffmepg_use_cpu(const cffmpeg h){
    Wrapper *s = (Wrapper*)h;
    s->UseCPU();
    s->CPUDec();
}
@@ -64,7 +64,7 @@
    std::string p(""), id("");
    s->GetInfoRecorder(id, i, p);
    // printf("cffmpeg get info : index : %d, file : %s\n", i, p.c_str());
    // printf("cffmpeg get info : index : %d, file : %s, recid: %s\n", i, p.c_str(), id.c_str());
    *index = i;
    
csrc/ffmpeg/format/FormatIn.cpp
@@ -106,11 +106,10 @@
        }
        ret = avformat_open_input(&ctx_, "", NULL, options);
        if(ret < 0){
            logIt("open %s failed:%s",filename,
                  getAVErrorDesc(ret).c_str());
        }
        // if(ret < 0){
            // logIt("open %s failed:%s",filename,
            //       getAVErrorDesc(ret).c_str());
        // }
        return ret;
    }
@@ -119,11 +118,10 @@
    int FormatIn::open(const char *filename, AVDictionary **options){
        const int ret = avformat_open_input(&ctx_, filename, NULL, options);
        if(ret < 0){
            logIt("open %s failed:%s",filename,
                    getAVErrorDesc(ret).c_str());
        }
        // if(ret < 0){
        //     logIt("open %s failed:%s",filename,
        //             getAVErrorDesc(ret).c_str());
        // }
        return ret;
    }
@@ -266,8 +264,8 @@
        while (!founded){
            const int ret = av_read_frame(ctx_, &pkt_out);
            if(ret < 0){
                logIt("read frame from %s failed:%s",
                        ctx_->filename,getAVErrorDesc(ret).c_str());
                // logIt("read frame from %s failed:%s",
                //         ctx_->filename,getAVErrorDesc(ret).c_str());
    
                return false;
            }
csrc/ffmpeg/format/FormatOut.cpp
@@ -404,34 +404,32 @@
        pkt.dts = pkt.pts;
        pkt.duration = av_rescale_q(calc_duration, time_base_q, time_base); //(double)(calc_duration)*(double)(av_q2d(time_base_q)) / (double)(av_q2d(time_base));
        
        // if (pkt.duration < 0 || time_base.den != 90000){
            // logIt("CALCULATE DURATION : %lld, fame count : %lld, TIMEBASE: %d", calc_duration,time_stamp, time_base.den);
        // }
        // logIt("FRAME ID: %lld, PTS : %lld, DTS : %lld", frame_cnt, pkt.pts, pkt.dts);
    }
    bool FormatOut::writeFrame(AVPacket &pkt, const int64_t &frame_cnt,
                              bool interleaved/* = true*/){
        adjustPTS(pkt, frame_cnt);
        return writeFrame2(pkt, interleaved);
        auto ret = writeFrame2(pkt, interleaved);
        if (!ret){
            logIt("write to file failed, pkt.pts: %lld, dts: %lld, frame count: %d",
                    pkt.pts, pkt.dts, frame_cnt);
        }
        return ret;
    }
    bool FormatOut::writeFrame2(AVPacket &pkt, bool interleaved){
        
        int ret = 0;
        if(interleaved)
        if(interleaved){
            ret = av_interleaved_write_frame(ctx_, &pkt);
        else
        {
        }else{
            // returns 1 if flushed and there is no more data to flush
            ret = av_write_frame(ctx_, &pkt);
        }
    
        if(ret < 0)
        {
            logIt("write packet to file failed:%s",
                    getAVErrorDesc(ret).c_str());
        if(ret < 0){
            return false;
        }
csrc/stream.cpp
File was deleted
csrc/worker/decoder.cpp
File was renamed from csrc/decoder.cpp
@@ -1,10 +1,10 @@
#include "decoder.hpp"
#include "ffmpeg/bridge/cvbridge.hpp"
#include "ffmpeg/format/FormatIn.hpp"
#include "ffmpeg/data/CodedData.hpp"
#include "ffmpeg/data/FrameData.hpp"
#include "ffmpeg/log/log.hpp"
#include "../ffmpeg/bridge/cvbridge.hpp"
#include "../ffmpeg/format/FormatIn.hpp"
#include "../ffmpeg/data/CodedData.hpp"
#include "../ffmpeg/data/FrameData.hpp"
#include "../ffmpeg/log/log.hpp"
extern "C"{
#include <libavformat/avformat.h>
csrc/worker/decoder.hpp
csrc/worker/rec.cpp
File was renamed from csrc/rec.cpp
@@ -1,12 +1,12 @@
#include "rec.hpp"
#include <unistd.h>
#include <sys/time.h>
#include "ffmpeg/format/FormatIn.hpp"
#include "ffmpeg/data/CodedData.hpp"
#include "buz/recorder.hpp"
#include "ffmpeg/log/log.hpp"
#include "common/callback.hpp"
#include "../ffmpeg/format/FormatIn.hpp"
#include "../ffmpeg/data/CodedData.hpp"
#include "../ffmpeg/log/log.hpp"
#include "../common/callback.hpp"
using namespace logif;
using namespace ffwrapper;
@@ -14,48 +14,15 @@
namespace cffmpeg_wrap
{
    rec::rec(ffwrapper::FormatIn *in)
    :recRef_(in)
    rec::rec()
    :recRef_(NULL)
    ,minduration_(250)
    ,maxduration_(750)
    {}
    rec::~rec()
    {
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            map_rec_.clear();
        }
        {
            std::lock_guard<std::mutex> l(mtx_pkt_);
            list_pkt_.clear();
        }
    }
    std::unique_ptr<Recorder> rec::newRec(std::string id, std::string dir, const int mind, const int maxd){
        if(!recRef_){
            logIt("Init wrapper first");
            return nullptr;
        }
        std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
        rec->SetCallback([&](std::string &id, int &index, std::string &path){
            setRecInfo(id, index, path);
        });
        int trycnt = 0;
        while(trycnt < 100){
            const int ret = rec->Run(dir.c_str(), mind, maxd);
            if(ret == 0) break;
            usleep(200000);
        }
        if (trycnt < 100){
            return rec;
        }
        return nullptr;
        clear();
    }
    void rec::setRecInfo(std::string &id, int &index, std::string &path){
@@ -71,7 +38,33 @@
        info.fPath = path;
        info.recID = id;
        list_recInfo_.emplace_back(info);
        logIt("LIST REC FILES COUNT : %d", list_recInfo_.size());
    }
    std::unique_ptr<buz::Recorder> rec::startRec(std::string id, std::string dir, const int mind, const int maxd){
        if(!recRef_){
            logIt("Init wrapper first");
            return nullptr;
        }
        std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
        rec->SetCallback([&](std::string &id, int &index, std::string &path){
            setRecInfo(id, index, path);
        });
        int trycnt = 0;
        while(trycnt < 100){
            auto ret = rec->Run(dir.c_str(), mind, maxd);
            if(ret == 0) break;
            usleep(200000);
        }
        if (trycnt < 100){
            std::lock_guard<std::mutex> locker(mtx_pkt_);
            rec->PushPackets(list_pkt_);
            return rec;
        }
        return nullptr;
    }
    void rec::GetRecInfo(std::string &recID, int &index, std::string &path){
@@ -79,11 +72,7 @@
        // 获取信息
        {
            std::lock_guard<std::mutex> l(mtx_recInfo_);
            if(list_recInfo_.empty()){
                index = -1;
                path = "";
                return;
            }
            if(!list_recInfo_.empty()){
            auto info = list_recInfo_.front();
            recID = info.recID;
            index = info.frmIdx;
@@ -91,15 +80,27 @@
            list_recInfo_.pop_front();
        }
        }
        // 删除rec实例
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            if (map_rec_.find(recID) != map_rec_.end())
            if (map_rec_.empty()){
                return;
            }
            if (map_rec_.find(recID) != map_rec_.end()){
                map_rec_.erase(recID);
                return;
            }
            for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
                if (iter->second.rec && iter->second.rec->ErrorOcurred()){
                if (iter->second && iter->second->ErrorOcurred()){
                    recID = iter->first;
                    index = -1;
                    path = "";
                    iter == map_rec_.erase(iter);
                    break;
                }else{
                    iter++;
                }
@@ -107,54 +108,73 @@
        }
    }
    void rec::clear(){
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            map_rec_.clear();
        }
        {
            std::lock_guard<std::mutex> l(mtx_pkt_);
            list_pkt_.clear();
        }
    }
    void rec::Load(ffwrapper::FormatIn *in){
        recRef_ = in;
    }
    void rec::Unload(){
        recRef_ = NULL;
        clear();
    }
    const bool rec::Loaded() const{
        return recRef_ != NULL;
    }
    void rec::NewRec(const char* id, const char *output, const int mindur, const int maxdur){
        std::string rid(id);
        std::string dir(output);
        minduration_ = mindur * 25;
        maxduration_ = maxdur * 25;
        
        {
            std::lock_guard<std::mutex> l(mtx_rec_);
            if (map_rec_.find(rid) != map_rec_.end()){
                map_rec_.erase(rid);
            }
            map_rec_[rid] = {rid, dir, mindur, maxdur, nullptr};
            map_rec_[rid] = startRec(rid, dir, mindur, maxdur);
        }
        
        minduration_ = mindur * 25;
        maxduration_ = maxdur * 25;
    }
    void rec::FireRecSignal(const char* sid,const int64_t &id){
        std::lock_guard<std::mutex> l(mtx_rec_);
        auto iter = map_rec_.find(sid);
        if (iter != map_rec_.end()){
            if(iter->second.rec){
                iter->second.rec->FireRecorder(id);
            if(iter->second){
                iter->second->FireRecorder(id);
            }
        }
        // logIt("recorders count: %d", map_rec_.size());
    }
    void rec::SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
        if (!data) return;
        cachePacket(data, id);
        std::lock_guard<std::mutex> l(mtx_rec_);
        for(auto &i : map_rec_){
            if (!i.second.rec){
                i.second.rec = newRec(i.second.rid, i.second.dir, i.second.min, i.second.max);
                if (i.second.rec){
                    //此函数还未退出,不需要这个锁
                    // std::lock_guard<std::mutex> locker(mtx_pkt_);
                    for(auto &k : list_pkt_){
                        i.second.rec->CachePacket({k.data, k.id});
                    }
                    // 新的数据缓存
                    i.second.rec->CachePacket({data, id});
                    logIt("START REC %d FRAMES", list_pkt_.size());
                }
            }else if (i.second.rec){
                i.second.rec->CachePacket({data, id});
            if (i.second){
                i.second->PushPacket({data, id});
            }
        }
        cachePacket(data, id);
    }
    void rec::cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
@@ -173,7 +193,8 @@
    int rec::shrinkCache(){
        //超过最大缓存,丢弃gop
        while (list_pkt_.size() > minduration_) {
        //缓存最小长度的,用于记录
        while (list_pkt_.size() > minduration_/2) {
            list_pkt_.pop_front();
            while(!list_pkt_.empty()){
                auto &cache = list_pkt_.front();
csrc/worker/rec.hpp
File was renamed from csrc/rec.hpp
@@ -7,6 +7,8 @@
#include <list>
#include <mutex>
#include "../buz/recorder.hpp"
namespace ffwrapper
{
    class FormatIn;
@@ -16,12 +18,6 @@
namespace cffmpeg_wrap
{
    namespace buz{
        class Recorder;
        struct avpacket;
    }
    class rec
    {
    private:
@@ -30,14 +26,7 @@
        int     minduration_;
        // 录像的实例,对应任务
        typedef struct _fn_rec{
            std::string rid;        //id对应任务id
            std::string dir;
            int min;
            int max;
            std::unique_ptr<buz::Recorder> rec;
        }FnRec;
        std::unordered_map<std::string, FnRec> map_rec_;
        std::unordered_map<std::string, std::unique_ptr<buz::Recorder> > map_rec_;
        // 多线程添加任务实例,在读流线程使用录像,但是添加在另一个线程
        std::mutex mtx_rec_;
@@ -52,26 +41,28 @@
        std::mutex mtx_recInfo_;
        // 缓存的视频帧,等待firerecsignal触发开始录像
        typedef struct _cache_pkt{
            std::shared_ptr<ffwrapper::CodedData> data;
            int64_t id;
        }CPacket;
        std::list<CPacket> list_pkt_;
        std::list<buz::CPacket> list_pkt_;
        // 多线程,生产者线程reader push pkt,消费者,录像线程pop
        std::mutex mtx_pkt_;
    private: 
        // 创建录像实例
        std::unique_ptr<buz::Recorder> newRec(std::string id, std::string dir, const int mind, const int maxd);
        // 录像实例的回调函数,录像完成后设置录像文件路径,id和帧id
        void setRecInfo(std::string &id, int &index, std::string &path);
        // 缓存视频包
        void cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
        // 丢弃缓存
        int shrinkCache();
        // 创建录像实例开始录像
        std::unique_ptr<buz::Recorder> startRec(std::string id, std::string dir, const int mind, const int maxd);
        // 清除缓存,断线重连时需要
        void clear();
    public:
        void NewRec(const char* id, const char *output, const int mindur, const int maxdur);
        // 准备好录像
        void Load(ffwrapper::FormatIn *in);
        void Unload();
        const bool Loaded() const;
        // 缓存录像的视频包,等待触发录像,或直接放到录像缓存
        void SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
        // 触发录像
@@ -80,7 +71,7 @@
        void GetRecInfo(std::string &recID, int &index, std::string &path);
        
    public:
        explicit rec(ffwrapper::FormatIn *in);
        rec();
        ~rec();
    };
} // namespace cffmpeg_wrap
csrc/worker/stream.cpp
New file
@@ -0,0 +1,51 @@
#include "stream.hpp"
#include "../ffmpeg/data/CodedData.hpp"
namespace cffmpeg_wrap{
    stream::stream(const int maxSize)
    :max_size_(maxSize)
    {}
    stream::~stream(){
        std::lock_guard<std::mutex> locker(mutex_avpkt_);
        list_avpkt_.clear();
    }
    int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){
        if (data){
            std::lock_guard<std::mutex> locker(mutex_avpkt_);
            list_avpkt_.push_back(data);
            while(list_avpkt_.size() > max_size_){
                list_avpkt_.pop_front();
                while(!list_avpkt_.empty()){
                    auto &cache = list_avpkt_.front();
                    AVPacket &avpkt = cache->getAVPacket();
                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
                        list_avpkt_.pop_front();
                    }else{
                        break;
                    }
                }
            }
            return list_avpkt_.size();
        }
        return 0;
    }
    void stream::GetPacket(unsigned char **pktData, int *size, int *key){
        std::lock_guard<std::mutex> l(mutex_avpkt_);
        if(list_avpkt_.empty()){
            return;
        }
        auto data = list_avpkt_.front();
        auto pkt = data->getAVPacket();
        *key = pkt.flags & AV_PKT_FLAG_KEY;
        *size = pkt.size;
        *pktData = (unsigned char *)malloc(*size);
        memcpy(*pktData, pkt.data, pkt.size);
        list_avpkt_.pop_front();
    }
}
csrc/worker/stream.hpp
File was renamed from csrc/stream.hpp
@@ -15,9 +15,9 @@
    private:
        std::list<std::shared_ptr<ffwrapper::CodedData> > list_avpkt_;
        std::mutex mutex_avpkt_;
        const int max_size_;
    public:
        stream(/* args */);
        explicit stream(const int maxSize);
        ~stream();
        int SetPacket(std::shared_ptr<ffwrapper::CodedData> data);
csrc/wrapper.cpp
@@ -21,12 +21,20 @@
#include "buz/recorder.hpp"
#include "stream.hpp"
#include "decoder.hpp"
#include "rec.hpp"
#include "worker/stream.hpp"
#include "worker/decoder.hpp"
#include "worker/rec.hpp"
using namespace logif;
using namespace ffwrapper;
#define DELETE_POINTER(p) \
do \
{ \
if(NULL != p) \
delete p; \
p = NULL; \
}while(0)
namespace cffmpeg_wrap{
    using namespace buz;
@@ -38,12 +46,11 @@
    ,scale_f_(SWS_POINT)
    ,gb_(0)
    ,cpu_(0)
    ,use_decoder_(false)
    ,thread_(nullptr)
    ,stop_stream_(false)
    ,stream_(nullptr)
    ,decoder_(nullptr)
    ,rec_(nullptr)
    ,rec_(new rec)
    {
        makeTheWorld();
    }
@@ -57,6 +64,7 @@
                stop_stream_.store(true);
                thread_->join();
            }
            DELETE_POINTER(rec_);
        }
        catch(const std::exception& e)
        {
@@ -70,11 +78,11 @@
        scale_h_ = h;
    }
    void Wrapper::UseGB28181(){
    void Wrapper::GB28181(){
        gb_ = 1;
    }
    void Wrapper::UseCPU(){
    void Wrapper::CPUDec(){
        cpu_ = 1;
    }
@@ -123,19 +131,24 @@
        return 0;
    }
    void Wrapper::init_stream(){
        if (stream_) delete stream_;
        stream_ = new stream;
    }
    void Wrapper::init_decoder(ffwrapper::FormatIn *in){
        if (decoder_) delete decoder_;
    void Wrapper::init_worker(ffwrapper::FormatIn *in){
        if (rec_->Loaded() && stream_ && decoder_) return;
        stream_ = new stream(3 * 25);
        decoder_ = new decoder(in, scale_w_, scale_h_,scale_f_);
        rec_->Load(in);
        if(fn_rec_lazy_) fn_rec_lazy_(in);
    }
    void Wrapper::init_rec(ffwrapper::FormatIn *in){
        if (rec_) delete rec_;
        rec_ = new rec(in);
    void Wrapper::run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
        if (stream_) stream_->SetPacket(data);
        if (decoder_) decoder_->SetFrame(data, id);
        if (rec_->Loaded()) rec_->SetPacket(data, id);
    }
    void Wrapper::deinit_worker(){
        DELETE_POINTER(stream_);
        DELETE_POINTER(decoder_);
        rec_->Unload();
    }
    void Wrapper::run_stream_thread(){
@@ -148,11 +161,8 @@
                usleep(200000);
                continue;
            }
            init_stream();
            if (use_decoder_){
                init_decoder(in.get());
            }
            init_rec(in.get());
            init_worker(in.get());
            int64_t id = 0;
            while(!stop_stream_.load()){
@@ -161,24 +171,28 @@
                    logIt("read packet error");
                    break;
                }
                if (stream_) stream_->SetPacket(data);
                if (use_decoder_ && decoder_) decoder_->SetFrame(data, id);
                if (rec_) rec_->SetPacket(data, id);
                run_worker(data, id);
                id++;
            }
            deinit_worker();
        }
    }
    void Wrapper::BuildRecorder(const char* id, const char *output, const int mindur, const int maxdur){
        
        if (rec_){
        if (rec_->Loaded()){
            rec_->NewRec(id, output, mindur, maxdur);
        }else{
            std::string rid(id), dir(output);
            fn_rec_lazy_ =
            [=](ffwrapper::FormatIn *in){rec_->NewRec(rid.c_str(), dir.c_str(), mindur, maxdur);};
        }
    }
    int Wrapper::FireRecorder(const char* sid,const int64_t &id){
        if (rec_){
        if (rec_->Loaded()){
            rec_->FireRecSignal(sid, id);
        }
    }
@@ -190,7 +204,7 @@
    }
    ////////decoder
    void Wrapper::BuildDecoder(){
        use_decoder_ = true;
        // use_decoder_ = true;
    }
    void Wrapper::GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id){
csrc/wrapper.hpp
@@ -12,7 +12,6 @@
#include <thread>
#include <atomic>
#include <mutex>
#include <unordered_map>
#include <memory>
#include "common/callback.hpp"
@@ -34,50 +33,46 @@
        public:
            Wrapper();
            ~Wrapper ();
        private: 
            std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
            void init_stream();
            void init_decoder(ffwrapper::FormatIn *in);
            void init_rec(ffwrapper::FormatIn *in);
            
        void init_worker(ffwrapper::FormatIn *in);
        void run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
        void deinit_worker();
        public: 
            int RunStream(const char* input);
        private: 
            void run_stream_thread();
        public: //recorder
            void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd);
            int FireRecorder(const char* sid,const int64_t &id);
            void GetInfoRecorder(std::string &recID, int &index, std::string &path);
            void ScalePicture(const int w, const int h, const int flags);
            void UseGB28181();
            void UseCPU();
        void GB28181();
        void CPUDec();
        public: //decoder
            void BuildDecoder();
            void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id);
    public: // push stream
            void GetPacket(unsigned char **pktData, int *size, int *key);
        private:
            // stream 参数
            std::string input_url_;
            int scale_w_, scale_h_, scale_f_;
            int gb_, cpu_;
            bool use_decoder_;
        int gb_, cpu_;
            // decoder 参数
            std::unique_ptr<std::thread> thread_;
            std::atomic_bool    stop_stream_;
            // 业务类
            // 推流类
            stream* stream_;
            // 解码类
            decoder* decoder_;
            // 录像类
        // 录像类,一直存在
            rec* rec_;
        // 录像请求缓存,等待runstream后添加
        std::function<void(ffwrapper::FormatIn*)> fn_rec_lazy_;
    };
    uint8_t *DecodeJPEG(const char *file, int *w, int *h);