video analysis2.0拆分,ffmpeg封装go接口库
zhangmeng
2020-07-24 f93ee1a42e8c47e472332287b7350b66a6b0fa11
csrc/buz/recorder.cpp
@@ -1,7 +1,11 @@
#include "recorder.hpp"
#include "sole.hpp"
#include <thread>
#include <unistd.h>
#include <chrono>
#include <sys/stat.h>
extern "C"{
#include <libavcodec/avcodec.h>
@@ -17,217 +21,544 @@
namespace cffmpeg_wrap{
    namespace buz{
        Recorder::Recorder(FormatIn *in)
        Recorder::Recorder(FormatIn *in, const std::string &id)
        :in_(in)
        ,out_(NULL)
        ,maxduration(30 * 25)
        ,minduration(10 * 25)
        ,end_frame(minduration)
        ,cur_frame(-1)
        ,thread_(nullptr)
        ,fp_(NULL)
        ,stop_recorder_(false)
        ,id_(id)
        ,id_frame_(0)
        ,file_frame_index_(-1)
        ,id_frame_in_file_(0)
        ,file_path_("")
        ,func_rec_info_(nullptr)
        {}
        Recorder::~Recorder(){
            if(thread_){
                stop_recorder_.store(true);
                cv_.notify_one();
                thread_->join();
        ,thrd_(nullptr)
        ,audio_(false)
        ,end_frame_(0)
        ,v_cur_frame_(0)
        ,a_cur_frame_(0)
        ,error_occured_(false)
        ,last_rec_id_(-1)
        {
            if (in){
                maxduration = 30 * in->getFPS();
                minduration = 10 * in->getFPS();
            }
            if(out_)
                delete out_;
        }
        int Recorder::init_writer(){
        Recorder::~Recorder(){
            try
            {
                if (thrd_){
                    if (!stop_recorder_.load()){
                        stop_recorder_.store(true);
                        cv_.notify_one();
                    }
                    thrd_->join();
                    // logIt("REC THREAD JOINED, QUIT!!!, %s", id_.c_str());
                }
            }
            catch(const std::exception& e)
            {
                logIt("RECODER DESTRUCTOR EXCEPTION: ", e.what());
            }
            if (fp_) {
                fclose(fp_);
                fp_ = NULL;
            }
        }
        int Recorder::init_write_h264(const bool audio){
            if (out_) {
                delete out_;
            }
            out_ = new FormatOut(in_->getFPS(), "mp4");
            int pid = getpid();
            std::string filename(sole::uuid4().base62() + "-" + std::to_string(pid) + ".mp4");
            file_path_ = dir_ + "/" + filename;
            std::string backup_dir("./video");
            size_t pos = dir_.rfind("/");
            if (pos != std::string::npos){
                backup_dir = dir_.substr(0, pos);
            }
            auto v = in_->getStream(AVMEDIA_TYPE_VIDEO);
            if (!v){
                return -2;
            }
            AVStream *a = in_->getStream(AVMEDIA_TYPE_AUDIO);
            if (!audio){
                a = NULL;
            }
            bool ret = out_->JustWriter(v, a, file_path_.c_str());
            if (ret){
                logIt("start record h264 file: %s", file_path_.c_str());
                return 0;
            }else{
                mkdir(backup_dir.c_str(), 0777);
                file_path_ = backup_dir + "/" + filename;
                logIt("failed in dir %s, try file %s to start record file", dir_.c_str(), file_path_.c_str());
                ret = out_->JustWriter(v, a, file_path_.c_str());
                if (ret){
                    logIt("start record h264 file: %s", file_path_.c_str());
                    return 0;
                }
            }
            logIt("failed to start record: %s", file_path_.c_str());
            return -1;
        }
        int Recorder::init_write_hevc(const bool audio){
            if (fp_){
                fclose(fp_);
            }
            int pid = getpid();
            std::string filename(sole::uuid4().base62() + "-" + std::to_string(pid) + ".hevc");
            file_path_ = dir_ + "/" + filename;
            std::string backup_dir("./video");
            size_t pos = dir_.rfind("/");
            if (pos != std::string::npos){
                backup_dir = dir_.substr(0, pos);
            }
            fp_ = fopen(file_path_.c_str(), "wb");
            if (!fp_){
                mkdir(backup_dir.c_str(), 0777);
                file_path_ = backup_dir + "/" + filename;
                logIt("failed in dir %s, try file %s to start record hevc file", dir_.c_str(), file_path_.c_str());
                fp_ = fopen(file_path_.c_str(), "wb");
                if (!fp_){
                    logIt("failed start record hevc file: %s", file_path_.c_str());
                    return -1;
                }
            }
            logIt("start record hevc file: %s", file_path_.c_str());
            return 0;
        }
        int Recorder::init_writer(const bool audio){
            if(!in_){
                logIt("init_writer FormatIn not init");
                return -1;
            }
            out_ = new FormatOut(in_->getStream(), "mp4");
            if (in_->IsHEVC()){
                return init_write_hevc(audio);
            }else{
                return init_write_h264(audio);
            }
            return -2;
        }
////////////////////////
        int Recorder::write_h264(const CPacket &pkt){
            //reader failed, break stream
            if(!pkt.data){
                return -1;
            }
            if (v_cur_frame_ == end_frame_){
                return 1;
            }
            AVPacket &op = pkt.data->getAVPacket();
            if (!audio_ && in_->isAudioPkt(&op)) {
                return 0;
            }
            AVPacket np(op);
            av_copy_packet(&np, &op);
            int64_t cur = v_cur_frame_;
            if (in_->isVideoPkt(&np)){
                if(pkt.v_id == id_frame_){
                    id_frame_in_file_ = v_cur_frame_;
                }
                v_cur_frame_++;
            }else if (in_->isAudioPkt(&np)) {
                cur = a_cur_frame_++;
            }
            auto ret = out_->writeFrame(&np, cur);
            av_packet_unref(&np);
            if (!ret) return -1;
            // logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_);
            return 0;
        }
        int Recorder::write_hevc(const CPacket &pkt){
            if (!fp_){
                logIt("write hevc packet error, file not open");
                return -1;
            }
            if (v_cur_frame_ == end_frame_){
                return 1;
            }
            AVPacket &op = pkt.data->getAVPacket();
            if (in_->isAudioPkt(&op)) {
                return 0;
            }
            if (op.data == NULL){
                logIt("hevc avpacket data null");
                return 0;
            }
            if (in_->isVideoPkt(&op)){
                if(pkt.v_id == id_frame_){
                    id_frame_in_file_ = v_cur_frame_;
                }
                v_cur_frame_++;
            }
            fwrite(op.data, op.size, 1, fp_);
            return 0;
        }
        void Recorder::start_writer(){
            if (cur_frame == 0) {
                file_path_ = dir_ + "/" + std::to_string(random()) + ".mp4";
                out_->JustWriter(in_->getStream(), file_path_.c_str());
                logIt("start record %s", file_path_.c_str());
        int Recorder::write_correctly(const CPacket &pkt){
            if (in_->IsHEVC()){
                return write_hevc(pkt);
            }
            return write_h264(pkt);
        }
        int Recorder::write_correctly(const avpacket &pkt){
            //reader failed, break stream
            if(pkt.id == -1 && !pkt.data){
                end_writer();
                return 1;
        int Recorder::end_write_h264(){
            if (!out_) return -1;
            out_->endWriter();
            if (out_){
                delete out_;
                out_ = NULL;
            }
            // writer error, reinit writer
            int64_t cur = cur_frame++;
            if(!out_->writeFrame(pkt.data->getAVPacket(), cur)){
                end_writer();
            return 0;
        }
        int Recorder::end_write_hevc(){
            if (fp_){
                fclose(fp_);
                fp_ = NULL;
            }
            std::string hevc_file(file_path_);
            auto pos = file_path_.rfind(".hevc");
            if (pos != std::string::npos){
                file_path_ = file_path_.substr(0, pos) + ".mp4";
                logIt("mux hevc real file : %s", file_path_.c_str());
            }
            FILE *fp = fopen(hevc_file.c_str(), "rb");
            if (!fp) return 0;
            int ret = mux_hevc(fp, file_path_.c_str());
            fclose(fp);
            if (ret == 0){
                if (remove(hevc_file.c_str()) != 0){
                    logIt("mux hevc remove file %s failed", hevc_file.c_str());
                }
            }else{
                logIt("mux hevc to mp4 error, use raw hevc");
                file_path_ = hevc_file;
            }
            return 0;
        }
        static int read_buffer(void *opaque, uint8_t *buf, int buf_size){
            FILE *fp_open = (FILE*)opaque;
            if (!fp_open) logIt("mux hevc open file error");
           if(!feof(fp_open)){
              int true_size=fread(buf,1,buf_size,fp_open);
              return true_size;
           }else{
              return -1;
           }
        }
        int Recorder::mux_hevc(FILE *fp, const char *outfile){
            if (!fp) {
                logIt("mux hevc file handle is null");
                return -1;
            }
            if(pkt.id == id_frame_){
                file_frame_index_ = cur_frame;
            std::unique_ptr<FormatIn> in(nullptr);
            int tryTime = 0;
            while(true){
                std::unique_ptr<FormatIn> tmp(new FormatIn(false));
                auto ret = tmp->openWithCustomIO(fp, read_buffer);
                if (ret == 0){
                    in = std::move(tmp);
                    break;
                }
                usleep(10000);
                if (tryTime++ > 100){
                    logIt("mux hevc try %d time to open custom io %s, failed", tryTime, outfile);
                    return -2;
                }
            }
            if (in->open(NULL, NULL) < 0){
                logIt("mux hevc open stream error");
                return -3;
            }
            if (!in->findStreamInfo(NULL)) {
                logIt("mux hevc can't find streams");
                return -4;
            }
            std::unique_ptr<FormatOut> out(new FormatOut(in_->getFPS(), "mp4"));
            auto v = in->getStream(AVMEDIA_TYPE_VIDEO);
            if (!v){
                logIt("mux hevc file can't find video stream");
                return -5;
            }
            if (out->JustWriter(v, NULL, outfile)){
                logIt("mux hevc start record file: %s", outfile);
            }
            int64_t id = 0;
            while(true){
                AVPacket pkt;
                if (in->readPacket(&pkt) != 0){
                    logIt("mux hevc read packet error, id: %lld", id);
                    break;
                }
                out->writeFrame(&pkt, id);
                // logIt("read frame: %d", id);
                av_packet_unref(&pkt);
                id++;
            }
            out->endWriter();
            return 0;
        }
        void Recorder::end_writer(){
            if(cur_frame == -1) return;
            out_->endWriter();
            //reinit cur_frame clear list pkt
            if (in_->IsHEVC()){
                end_write_hevc();
            }else{
                end_write_h264();
            }
            logIt("finished record : %s frames: %d, frame in file id: %d",
                    file_path_.c_str(), end_frame_, id_frame_in_file_);
            {
                std::lock_guard<std::mutex> locker(mutex_pkt_);
                cur_frame = -1;
                end_frame = minduration;
                std::lock_guard<std::mutex> l(mutex_pkt_);
                list_pkt_.clear();
            }
            //callback to frame index and path
            if(func_rec_info_){
                func_rec_info_(file_frame_index_, file_path_);
                func_rec_info_(id_,id_frame_in_file_, file_path_);
            }
        }
        void Recorder::run_thread(){
            bool reinit_writer = false;
            while(!stop_recorder_.load()){
                if (reinit_writer) {
                    while(!stop_recorder_.load()){
                        if(init_writer() == 0)
                            break;
                        usleep(300000);
                    }
                    if(stop_recorder_.load()) break;
                }
                std::list<avpacket> pkts;
            while(!stop_recorder_.load()){
                std::list<CPacket> pkts;
                {
                    std::unique_lock<std::mutex> locker(mutex_pkt_);
                    cv_.wait(locker,[&]{
                    int sec = minduration/50;
                    if (in_) sec = minduration/in_->getFPS()/2;
                    auto status = cv_.wait_for(locker, std::chrono::seconds(sec), [&]{
                        return !list_pkt_.empty() || stop_recorder_.load();
                    });
                    if(stop_recorder_.load()){
                        end_writer();
                    if (!status || stop_recorder_.load()){
                        error_occured_ = !status;
                        break;
                    }
                    if(cur_frame == -1){
                        continue;
                    }
                    list_pkt_.swap(pkts);
                }
                if (cur_frame == 0) {
                    start_writer();
                }
                int ret = 0;
                for(auto &i : pkts){
                    if (cur_frame < end_frame){
                        const int ret = write_correctly(i);
                        if(ret != 0){
                            if(ret == -1) reinit_writer = true;
                            break;
                        }
                    }else{
                        end_writer();
                    ret = write_correctly(i);
                    if (ret != 0){
                        break;
                    }
                }
                if (ret != 0){
                    break;
                }
            }
            stop_recorder_.store(true);
            end_writer();
            list_pkt_.clear();
        }
        int Recorder::Run(const char* output, const int mind, const int maxd){
            if(thread_){
                logIt("recorder already run");
                return 0;
            }
        int Recorder::Run(const char* output, const int mind, const int maxd, const bool audio){
            bool a = audio;
            if (in_->IsHEVC()) a = false;
            dir_ = output;
            int ret = init_writer();
            int ret = init_writer(a);
            if(ret != 0){
                logIt("recorder init writer error");
                return -1;
            }
            double fps = out_->getFPS();
            double fps = in_->getFPS();
            if(fps > 1.0){
                maxduration = fps * maxd;
                minduration = fps * mind;
                end_frame = minduration;
                end_frame_ = minduration;
            }
            logIt("min %d max %d endcount %d", minduration, maxduration, end_frame);
            audio_ = a;
            thread_.reset(new std::thread([&]{
            logIt("minduration %d maxduration %d", minduration, maxduration);
            thrd_.reset(new std::thread([&]{
                run_thread();
            }));
            //.detach();
            return 0;
        }
        int Recorder::FireRecorder(const int64_t &id){
            if(cur_frame == -1){
            if (stop_recorder_.load()) return -1;
            if(id_frame_ == 0){
                id_frame_ = id;
                {
                    std::lock_guard<std::mutex> locker(mutex_pkt_);
                    cur_frame = 0;
                std::lock_guard<std::mutex> locker(mutex_pkt_);
                if (list_pkt_.size() > end_frame_){
                    end_frame_ = list_pkt_.size() + minduration/2;
                    if (end_frame_ > maxduration)
                        end_frame_ = maxduration;
                }
            }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){
                end_frame = end_frame + minduration / 2;
                if(end_frame > maxduration){
                    end_frame = maxduration;
            }else if(v_cur_frame_ > minduration/2 && end_frame_ < maxduration){
                logIt("cur frame: %d, end frame: %d, duration: [%d-%d]",
                        v_cur_frame_, end_frame_, minduration, maxduration);
                end_frame_ = end_frame_ + minduration / 2;
                if(end_frame_ > maxduration){
                    end_frame_ = maxduration;
                }
            }
            return 0;
        }
        int Recorder::CachePacket(const avpacket &pkt){
        int Recorder::PushPacket(std::list<CPacket> &lst){
            if (stop_recorder_.load()) return 0;
            std::lock_guard<std::mutex> locker(mutex_pkt_);
            if(cur_frame == -1){
                //error occur, stream break
                if(pkt.id == -1 && pkt.data == nullptr){
                    list_pkt_.clear();
                    return -1;
            // 没有开始录制
            if (last_rec_id_ < 0){
                logIt("last rec id is 0 cache size: %ld", lst.size());
                for (auto &i : lst){
                    // 从第一个非音频关键帧开始
                    if (last_rec_id_ < 0){
                        if (!in_->isVideoPkt(&i.data->getAVPacket())){
                            continue;
                        }
                        if (!(i.data->getAVPacket().flags & AV_PKT_FLAG_KEY)){
                            continue;
                        }
                    }
                    last_rec_id_ = i.id;
                    list_pkt_.push_back(i);
                }
                //wait I
                if (list_pkt_.empty()) {
                    AVPacket &avpkt = pkt.data->getAVPacket();
                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
                        return -1;
            }else{
                for(auto &i : lst){
                    if (i.id > last_rec_id_){
                        list_pkt_.push_back(i);
                        last_rec_id_++;
                    }
                }
            }
                maybe_dump_gop();
            cv_.notify_one();
                list_pkt_.push_back(pkt);
            }else{
                list_pkt_.push_back(pkt);
                cv_.notify_one();
            return list_pkt_.size();
        }
        int Recorder::StartWritePacket(std::list<CPacket> &lst, const int64_t &id, const int start, const int end){
            if (stop_recorder_.load()) return 0;
            // 第一次录像,设置触发帧id
            id_frame_ = id;
            if (start < 0) {
                logIt("start write packet [%d-%d] in pkt size: %d, frame id: %lld, "
                    "cur frame: %d, end frame: %d, duration: [%d-%d], last rec id: %lld",
                    start, end, lst.size(), id_frame_,
                    v_cur_frame_, end_frame_, minduration, maxduration, last_rec_id_);
                return -1;
            }
            return 0;
            std::lock_guard<std::mutex> locker(mutex_pkt_);
            // 将传入的所有packets保存如缓存
            int index = -1;
            for (auto &p : lst){
                index++;
                if (index < start) continue;
                list_pkt_.push_back(p);
                if (index == end){
                    last_rec_id_ = p.id;
                    break;
                }
            }
            logIt("start write packet [%d-%d] in pkt size: %d, frame id: %lld, "
                "cur frame: %d, end frame: %d, duration: [%d-%d], last rec id: %lld",
                start, end, lst.size(), id_frame_,
                v_cur_frame_, end_frame_, minduration, maxduration, last_rec_id_);
            // maybe_dump_gop();
            cv_.notify_one();
            return list_pkt_.size();
        }
        void Recorder::maybe_dump_gop(){
            //超过min/2,丢弃gop
            while (list_pkt_.size() > minduration /2) {
            while (list_pkt_.size() > maxduration) {
                list_pkt_.pop_front();
                while(!list_pkt_.empty()){
                    auto &cache = list_pkt_.front();
                    AVPacket &avpkt = cache.data->getAVPacket();
                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
                    auto &i = list_pkt_.front();
                    if (!(i.data->getAVPacket().flags & AV_PKT_FLAG_KEY)){
                        list_pkt_.pop_front();
                    }else{
                        break;
@@ -235,5 +566,5 @@
                }
            }
        }
    }
}
    }// end clase
}// end namespace