video analysis2.0拆分,ffmpeg封装go接口库
zhangmeng
2019-09-18 b73029149580370e62dd6c14a270aea902f85cf2
csrc/buz/recorder.cpp
@@ -25,11 +25,11 @@
        ,maxduration(30 * 25)
        ,minduration(10 * 25)
        ,end_frame(minduration)
        ,cur_frame(-1)
        ,cur_frame(0)
        ,stop_recorder_(false)
        ,id_(id)
        ,id_frame_(0)
        ,file_frame_index_(-1)
        ,id_frame_(-1)
        ,id_frame_in_file_(-1)
        ,file_path_("")
        ,func_rec_info_(nullptr)
        ,thrd_(nullptr)
@@ -43,13 +43,13 @@
            try
            {
                if (thrd_){
                    {
                        std::unique_lock<std::mutex> locker(mutex_pkt_);
                    if (!stop_recorder_.load()){
                        stop_recorder_.store(true);
                        cv_.notify_one();
                    }
                    thrd_->join();
                    logIt("REC THREAD JOINED, QUIT!!!");
                    // logIt("REC THREAD JOINED, QUIT!!!, %s", id_.c_str());
                }
            }
            catch(const std::exception& e)
@@ -71,121 +71,95 @@
            out_ = new FormatOut(in_->getStream(), "mp4");
            return 0;
        }
        void Recorder::start_writer(){
            if (cur_frame == 0) {
                sole::uuid u4 = sole::uuid4();
                file_path_ = dir_ + "/" + u4.base62() + ".mp4";
                out_->JustWriter(in_->getStream(), file_path_.c_str());
                logIt("START RECORD %s", file_path_.c_str());
            file_path_ = dir_ + "/" + sole::uuid4().base62() + ".mp4";
            auto ret = out_->JustWriter(in_->getStream(), file_path_.c_str());
            if (ret){
                return 0;
            }
            return -1;
        }
        int Recorder::write_correctly(const avpacket &pkt){
        int Recorder::write_correctly(const CPacket &pkt){
            //reader failed, break stream
            if(pkt.id == -1 && !pkt.data){
                end_writer();
                return -1;
            }
            if (cur_frame == end_frame){
                return 1;
            }
            // writer error, reinit writer
            int64_t cur = cur_frame++;
            AVPacket &op = pkt.data->getAVPacket();
            AVPacket np(op);
            av_copy_packet(&np, &op);
            if(!out_->writeFrame(np, cur)){
                av_packet_unref(&np);
                end_writer();
                return -1;
            }
            auto ret = out_->writeFrame(np, cur);
            av_packet_unref(&np);
            if (!ret) return -1;
            if(pkt.id == id_frame_){
                file_frame_index_ = cur_frame-1;
                id_frame_in_file_ = cur_frame-1;
            }
            // logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_);
            return 0;
        }
        void Recorder::end_writer(){
            if(cur_frame == -1) return;
            out_->endWriter();
            logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
                 file_frame_index_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
            //reinit cur_frame clear list pkt
            {
                std::lock_guard<std::mutex> locker(mutex_pkt_);
                cur_frame = -1;
                end_frame = minduration;
                list_pkt_.clear();
            }
            //callback to frame index and path
            if(func_rec_info_){
                func_rec_info_(id_, file_frame_index_, file_path_);
            }
        }
        void Recorder::run_thread(){
            bool reinit_writer = false;
            while(!stop_recorder_.load()){
                if (reinit_writer) {
                    while(!stop_recorder_.load()){
                        if(init_writer() == 0)
                            break;
                        usleep(300000);
                    }
                    if(stop_recorder_.load()) break;
                }
                std::list<avpacket> pkts;
                {
                    std::unique_lock<std::mutex> locker(mutex_pkt_);
                    auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
                        return !list_pkt_.empty() || stop_recorder_.load();
                    });
                    if (!status){
                        end_writer();
                        error_occured_ = true;
                        break;
                    }
                    if(stop_recorder_.load()){
                        end_writer();
                        break;
                    }
                    if(cur_frame == -1){
                        continue;
                    }
                    list_pkt_.swap(pkts);
                }
                if (cur_frame == 0) {
                    start_writer();
                }
                for(auto &i : pkts){
                    if (cur_frame < end_frame){
                        if(write_correctly(i) != 0){
                            stop_recorder_.store(true);
                            break;
                        }
                    }else{
                        end_writer();
                        stop_recorder_.store(true);
                        break;
                    }
                }
            }
            if (out_){
                delete out_;
                out_ = NULL;
            }
            // stop_recorder_.store(false);
            {
                std::lock_guard<std::mutex> l(mutex_pkt_);
                list_pkt_.clear();
            }
            // logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
            //      id_frame_in_file_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
            //callback to frame index and path
            if(func_rec_info_){
                func_rec_info_(id_,id_frame_in_file_, file_path_);
            }
        }
        void Recorder::run_thread(){
            while(!stop_recorder_.load()){
                std::list<CPacket> pkts;
                {
                    std::unique_lock<std::mutex> locker(mutex_pkt_);
                    auto status = cv_.wait_for(locker, std::chrono::seconds(3), [&]{
                        return !list_pkt_.empty() || stop_recorder_.load();
                    });
                    if (!status || stop_recorder_.load()){
                        error_occured_ = !status;
                        break;
                    }
                    list_pkt_.swap(pkts);
                }
                int ret = 0;
                for(auto &i : pkts){
                    ret = write_correctly(i);
                    if (ret != 0){
                        break;
                    }
                }
                if (ret != 0){
                    break;
                }
            }
            stop_recorder_.store(true);
            end_writer();
        }
        int Recorder::Run(const char* output, const int mind, const int maxd){
@@ -204,7 +178,7 @@
                end_frame = minduration;
            }
            logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);
            // logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);
            thrd_.reset(new std::thread([&]{
                run_thread();
@@ -215,38 +189,37 @@
        }
        int Recorder::FireRecorder(const int64_t &id){
            if(cur_frame == -1){
            if (stop_recorder_.load()) return -1;
            if(id_frame_ == -1){
                id_frame_ = id;
                logIt("FIRST FIRE RECORD ID: %lld", id);
                {
                    std::lock_guard<std::mutex> locker(mutex_pkt_);
                    cur_frame = 0;
                    if (list_pkt_.size() > end_frame){
                        end_frame = list_pkt_.size() + minduration/2;
                        if (end_frame > maxduration)
                            end_frame = maxduration;
                    }
                std::lock_guard<std::mutex> locker(mutex_pkt_);
                if (list_pkt_.size() > end_frame){
                    end_frame = list_pkt_.size() + minduration/2;
                    if (end_frame > maxduration)
                        end_frame = maxduration;
                }
            }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){
                // logIt("FIRST FIRE RECORD ID: %lld, cur_frame: %d, end_frame: %d", id, cur_frame, end_frame);
            }else if(cur_frame > minduration/2 && end_frame < maxduration){
                end_frame = end_frame + minduration / 2;
                if(end_frame > maxduration){
                    end_frame = maxduration;
                }
                // logIt("PROLONG REC, cur_frame: %d, end_frame: %d", cur_frame, end_frame);
            }
            // logIt("FIRE REC FRAME ID: %lld", id);
            return 0;
        }
        int Recorder::CachePacket(const avpacket &pkt){
        int Recorder::PushPacket(const CPacket &pkt){
            if (stop_recorder_.load()) return 0;
            std::lock_guard<std::mutex> locker(mutex_pkt_);
            if(cur_frame == -1){
                //error occur, stream break
                if(pkt.id == -1 && pkt.data == nullptr){
                    list_pkt_.clear();
                    return -1;
                }
            if(id_frame_ == -1){
                //wait I 
                if (list_pkt_.empty()) {
                    AVPacket &avpkt = pkt.data->getAVPacket();
@@ -258,17 +231,43 @@
                maybe_dump_gop();
                list_pkt_.push_back(pkt);
                // cv_.notify_one();
            }else{
                list_pkt_.push_back(pkt);
                cv_.notify_one();
            }
            return 0;
            return list_pkt_.size();
        }
        int Recorder::PushPackets(std::list<CPacket> &lst){
            if (stop_recorder_.load()) return 0;
            std::lock_guard<std::mutex> locker(mutex_pkt_);
            bool i = false;
            for (auto &p : lst){
                if (!i){
                    AVPacket &avpkt = p.data->getAVPacket();
                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
                        continue;
                    }
                    i = true;
                }
                list_pkt_.push_back(p);
            }
            maybe_dump_gop();
            cv_.notify_one();
            // logIt("CACHE PACKET : %d", list_pkt_.size());
            return list_pkt_.size();
        }
        void Recorder::maybe_dump_gop(){
            //超过min/2,丢弃gop
            while (list_pkt_.size() > maxduration) {
            while (list_pkt_.size() > minduration) {
                list_pkt_.pop_front();
                while(!list_pkt_.empty()){
                    auto &cache = list_pkt_.front();