#include "recorder.hpp" #include "sole.hpp" #include #include extern "C"{ #include } #include "../ffmpeg/format/FormatIn.hpp" #include "../ffmpeg/format/FormatOut.hpp" #include "../ffmpeg/data/CodedData.hpp" #include "../ffmpeg/log/log.hpp" using namespace logif; using namespace ffwrapper; namespace cffmpeg_wrap{ namespace buz{ Recorder::Recorder(FormatIn *in, const std::string &id) :in_(in) ,out_(NULL) ,maxduration(30 * 25) ,minduration(10 * 25) ,end_frame(minduration) ,cur_frame(-1) ,stop_recorder_(false) ,id_(id) ,id_frame_(0) ,file_frame_index_(-1) ,file_path_("") ,func_rec_info_(nullptr) ,thrd_(nullptr) { // logIt("RECODER ID: %s", id_.c_str()); } Recorder::~Recorder(){ try { if (thrd_){ stop_recorder_.store(true); cv_.notify_one(); thrd_->join(); logIt("REC THREAD JOINED, QUIT!!!"); } } catch(const std::exception& e) { logIt("RECODER DESTRUCTOR EXCEPTION: ", e.what()); } } int Recorder::init_writer(){ if (out_) { delete out_; } if(!in_){ logIt("init_writer FormatIn not init"); return -1; } out_ = new FormatOut(in_->getStream(), "mp4"); return 0; } void Recorder::start_writer(){ if (cur_frame == 0) { sole::uuid u4 = sole::uuid4(); file_path_ = dir_ + "/" + u4.base62() + ".mp4"; out_->JustWriter(in_->getStream(), file_path_.c_str()); logIt("START RECORD %s", file_path_.c_str()); } } int Recorder::write_correctly(const avpacket &pkt){ //reader failed, break stream if(pkt.id == -1 && !pkt.data){ end_writer(); return 1; } // writer error, reinit writer int64_t cur = cur_frame++; AVPacket &op = pkt.data->getAVPacket(); AVPacket np(op); av_copy_packet(&np, &op); if(!out_->writeFrame(np, cur)){ av_packet_unref(&np); end_writer(); return -1; } av_packet_unref(&np); if(pkt.id == id_frame_){ file_frame_index_ = cur_frame-1; } // logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_); return 0; } void Recorder::end_writer(){ if(cur_frame == -1) return; out_->endWriter(); logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n", file_frame_index_, id_frame_, file_path_.c_str(), cur_frame, end_frame); //reinit cur_frame clear list pkt { std::lock_guard locker(mutex_pkt_); cur_frame = -1; end_frame = minduration; list_pkt_.clear(); } //callback to frame index and path if(func_rec_info_){ func_rec_info_(id_,file_frame_index_, file_path_); } } void Recorder::run_thread(){ bool reinit_writer = false; while(!stop_recorder_.load()){ if (reinit_writer) { while(!stop_recorder_.load()){ if(init_writer() == 0) break; usleep(300000); } if(stop_recorder_.load()) break; } std::list pkts; { std::unique_lock locker(mutex_pkt_); cv_.wait(locker,[&]{ return !list_pkt_.empty() || stop_recorder_.load(); }); if(stop_recorder_.load()){ end_writer(); break; } if(cur_frame == -1){ continue; } list_pkt_.swap(pkts); } if (cur_frame == 0) { start_writer(); } for(auto &i : pkts){ if (cur_frame < end_frame){ if(write_correctly(i) != 0){ stop_recorder_.store(true); break; } }else{ end_writer(); stop_recorder_.store(true); break; } } } if (out_){ delete out_; out_ = NULL; } // stop_recorder_.store(false); } int Recorder::Run(const char* output, const int mind, const int maxd){ dir_ = output; int ret = init_writer(); if(ret != 0){ logIt("recorder init writer error"); return -1; } double fps = out_->getFPS(); if(fps > 1.0){ maxduration = fps * maxd; minduration = fps * mind; end_frame = minduration; } logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame); thrd_.reset(new std::thread([&]{ run_thread(); })); //.detach(); return 0; } int Recorder::FireRecorder(const int64_t &id){ if(cur_frame == -1){ id_frame_ = id; logIt("FIRST FIRE RECORD ID: %lld", id); { std::lock_guard locker(mutex_pkt_); cur_frame = 0; if (list_pkt_.size() > end_frame){ end_frame = list_pkt_.size() + minduration/2; if (end_frame > maxduration) end_frame = maxduration; } } }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){ end_frame = end_frame + minduration / 2; if(end_frame > maxduration){ end_frame = maxduration; } } // logIt("FIRE REC FRAME ID: %lld", id); return 0; } int Recorder::CachePacket(const avpacket &pkt){ std::lock_guard locker(mutex_pkt_); if(cur_frame == -1){ //error occur, stream break if(pkt.id == -1 && pkt.data == nullptr){ list_pkt_.clear(); return -1; } //wait I if (list_pkt_.empty()) { AVPacket &avpkt = pkt.data->getAVPacket(); if (!(avpkt.flags & AV_PKT_FLAG_KEY)){ return -1; } } maybe_dump_gop(); list_pkt_.push_back(pkt); }else{ list_pkt_.push_back(pkt); cv_.notify_one(); } return 0; } void Recorder::maybe_dump_gop(){ //超过min/2,丢弃gop while (list_pkt_.size() > maxduration) { list_pkt_.pop_front(); while(!list_pkt_.empty()){ auto &cache = list_pkt_.front(); AVPacket &avpkt = cache.data->getAVPacket(); if (!(avpkt.flags & AV_PKT_FLAG_KEY)){ list_pkt_.pop_front(); }else{ break; } } } } } }