From b73029149580370e62dd6c14a270aea902f85cf2 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 18 九月 2019 09:52:30 +0800 Subject: [PATCH] fix rec bug --- csrc/buz/recorder.hpp | 17 csrc/buz/recorder.cpp | 235 +++++++++--------- csrc/ffmpeg/format/FormatIn.cpp | 22 /dev/null | 36 -- csrc/wrapper.cpp | 74 +++-- csrc/worker/rec.cpp | 173 +++++++----- csrc/worker/stream.cpp | 51 +++ csrc/worker/decoder.cpp | 10 csrc/wrapper.hpp | 87 +++--- csrc/ffmpeg/format/FormatOut.cpp | 22 csrc/worker/rec.hpp | 35 +- csrc/worker/stream.hpp | 4 csrc/worker/decoder.hpp | 0 csrc/cffmpeg.cpp | 6 14 files changed, 403 insertions(+), 369 deletions(-) diff --git a/csrc/buz/recorder.cpp b/csrc/buz/recorder.cpp index a7086fe..af5a1fc 100644 --- a/csrc/buz/recorder.cpp +++ b/csrc/buz/recorder.cpp @@ -25,11 +25,11 @@ ,maxduration(30 * 25) ,minduration(10 * 25) ,end_frame(minduration) - ,cur_frame(-1) + ,cur_frame(0) ,stop_recorder_(false) ,id_(id) - ,id_frame_(0) - ,file_frame_index_(-1) + ,id_frame_(-1) + ,id_frame_in_file_(-1) ,file_path_("") ,func_rec_info_(nullptr) ,thrd_(nullptr) @@ -43,13 +43,13 @@ try { if (thrd_){ - { - std::unique_lock<std::mutex> locker(mutex_pkt_); + if (!stop_recorder_.load()){ stop_recorder_.store(true); cv_.notify_one(); } + thrd_->join(); - logIt("REC THREAD JOINED, QUIT!!!"); + // logIt("REC THREAD JOINED, QUIT!!!, %s", id_.c_str()); } } catch(const std::exception& e) @@ -71,121 +71,95 @@ out_ = new FormatOut(in_->getStream(), "mp4"); - return 0; - } - - void Recorder::start_writer(){ - if (cur_frame == 0) { - - sole::uuid u4 = sole::uuid4(); - file_path_ = dir_ + "/" + u4.base62() + ".mp4"; - out_->JustWriter(in_->getStream(), file_path_.c_str()); - logIt("START RECORD %s", file_path_.c_str()); + file_path_ = dir_ + "/" + sole::uuid4().base62() + ".mp4"; + auto ret = out_->JustWriter(in_->getStream(), file_path_.c_str()); + if (ret){ + return 0; } + + return -1; } - int Recorder::write_correctly(const avpacket &pkt){ + int Recorder::write_correctly(const CPacket &pkt){ //reader failed, break stream if(pkt.id == -1 && !pkt.data){ - end_writer(); + return -1; + } + + if (cur_frame == end_frame){ return 1; } - // writer error, reinit writer + int64_t cur = cur_frame++; AVPacket &op = pkt.data->getAVPacket(); AVPacket np(op); av_copy_packet(&np, &op); - if(!out_->writeFrame(np, cur)){ - av_packet_unref(&np); - end_writer(); - return -1; - } + auto ret = out_->writeFrame(np, cur); av_packet_unref(&np); + if (!ret) return -1; + if(pkt.id == id_frame_){ - file_frame_index_ = cur_frame-1; + id_frame_in_file_ = cur_frame-1; } + // logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_); return 0; } void Recorder::end_writer(){ - if(cur_frame == -1) return; + out_->endWriter(); - logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n", - file_frame_index_, id_frame_, file_path_.c_str(), cur_frame, end_frame); - - //reinit cur_frame clear list pkt - { - std::lock_guard<std::mutex> locker(mutex_pkt_); - cur_frame = -1; - end_frame = minduration; - list_pkt_.clear(); - } - - //callback to frame index and path - if(func_rec_info_){ - func_rec_info_(id_, file_frame_index_, file_path_); - } - } - - void Recorder::run_thread(){ - bool reinit_writer = false; - while(!stop_recorder_.load()){ - if (reinit_writer) { - while(!stop_recorder_.load()){ - if(init_writer() == 0) - break; - usleep(300000); - } - if(stop_recorder_.load()) break; - } - - std::list<avpacket> pkts; - { - std::unique_lock<std::mutex> locker(mutex_pkt_); - auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{ - return !list_pkt_.empty() || stop_recorder_.load(); - }); - - if (!status){ - end_writer(); - error_occured_ = true; - break; - } - if(stop_recorder_.load()){ - end_writer(); - break; - } - if(cur_frame == -1){ - continue; - } - list_pkt_.swap(pkts); - } - - if (cur_frame == 0) { - start_writer(); - } - - for(auto &i : pkts){ - if (cur_frame < end_frame){ - if(write_correctly(i) != 0){ - stop_recorder_.store(true); - break; - } - }else{ - end_writer(); - stop_recorder_.store(true); - break; - } - } - - } - if (out_){ delete out_; out_ = NULL; } - // stop_recorder_.store(false); + { + std::lock_guard<std::mutex> l(mutex_pkt_); + list_pkt_.clear(); + } + // logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n", + // id_frame_in_file_, id_frame_, file_path_.c_str(), cur_frame, end_frame); + + //callback to frame index and path + if(func_rec_info_){ + func_rec_info_(id_,id_frame_in_file_, file_path_); + } + + } + + void Recorder::run_thread(){ + + while(!stop_recorder_.load()){ + + std::list<CPacket> pkts; + { + std::unique_lock<std::mutex> locker(mutex_pkt_); + auto status = cv_.wait_for(locker, std::chrono::seconds(3), [&]{ + return !list_pkt_.empty() || stop_recorder_.load(); + }); + + if (!status || stop_recorder_.load()){ + error_occured_ = !status; + break; + } + list_pkt_.swap(pkts); + } + + int ret = 0; + for(auto &i : pkts){ + ret = write_correctly(i); + if (ret != 0){ + break; + } + } + + if (ret != 0){ + break; + } + } + + stop_recorder_.store(true); + end_writer(); } int Recorder::Run(const char* output, const int mind, const int maxd){ @@ -204,7 +178,7 @@ end_frame = minduration; } - logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame); + // logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame); thrd_.reset(new std::thread([&]{ run_thread(); @@ -215,38 +189,37 @@ } int Recorder::FireRecorder(const int64_t &id){ - if(cur_frame == -1){ + if (stop_recorder_.load()) return -1; + + if(id_frame_ == -1){ id_frame_ = id; - logIt("FIRST FIRE RECORD ID: %lld", id); - { - std::lock_guard<std::mutex> locker(mutex_pkt_); - cur_frame = 0; - if (list_pkt_.size() > end_frame){ - end_frame = list_pkt_.size() + minduration/2; - if (end_frame > maxduration) - end_frame = maxduration; - } + + std::lock_guard<std::mutex> locker(mutex_pkt_); + if (list_pkt_.size() > end_frame){ + end_frame = list_pkt_.size() + minduration/2; + if (end_frame > maxduration) + end_frame = maxduration; } - }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){ + + // logIt("FIRST FIRE RECORD ID: %lld, cur_frame: %d, end_frame: %d", id, cur_frame, end_frame); + + }else if(cur_frame > minduration/2 && end_frame < maxduration){ end_frame = end_frame + minduration / 2; if(end_frame > maxduration){ end_frame = maxduration; } + // logIt("PROLONG REC, cur_frame: %d, end_frame: %d", cur_frame, end_frame); } // logIt("FIRE REC FRAME ID: %lld", id); return 0; } - int Recorder::CachePacket(const avpacket &pkt){ + int Recorder::PushPacket(const CPacket &pkt){ + if (stop_recorder_.load()) return 0; std::lock_guard<std::mutex> locker(mutex_pkt_); - if(cur_frame == -1){ - //error occur, stream break - if(pkt.id == -1 && pkt.data == nullptr){ - list_pkt_.clear(); - return -1; - } + if(id_frame_ == -1){ //wait I if (list_pkt_.empty()) { AVPacket &avpkt = pkt.data->getAVPacket(); @@ -258,17 +231,43 @@ maybe_dump_gop(); list_pkt_.push_back(pkt); + // cv_.notify_one(); + }else{ list_pkt_.push_back(pkt); cv_.notify_one(); } - return 0; + return list_pkt_.size(); + } + + int Recorder::PushPackets(std::list<CPacket> &lst){ + + if (stop_recorder_.load()) return 0; + + std::lock_guard<std::mutex> locker(mutex_pkt_); + bool i = false; + for (auto &p : lst){ + if (!i){ + AVPacket &avpkt = p.data->getAVPacket(); + if (!(avpkt.flags & AV_PKT_FLAG_KEY)){ + continue; + } + i = true; + } + + list_pkt_.push_back(p); + } + maybe_dump_gop(); + cv_.notify_one(); + + // logIt("CACHE PACKET : %d", list_pkt_.size()); + return list_pkt_.size(); } void Recorder::maybe_dump_gop(){ //瓒呰繃min/2,涓㈠純gop - while (list_pkt_.size() > maxduration) { + while (list_pkt_.size() > minduration) { list_pkt_.pop_front(); while(!list_pkt_.empty()){ auto &cache = list_pkt_.front(); diff --git a/csrc/buz/recorder.hpp b/csrc/buz/recorder.hpp index ba9cea4..8c3b550 100644 --- a/csrc/buz/recorder.hpp +++ b/csrc/buz/recorder.hpp @@ -22,10 +22,12 @@ namespace cffmpeg_wrap{ namespace buz{ - struct avpacket{ + // 缂撳瓨鐨勮棰戝抚,绛夊緟fire瑙﹀彂寮�濮嬪綍鍍� + typedef struct _cache_pkt{ std::shared_ptr<ffwrapper::CodedData> data; int64_t id; - }; + }CPacket; + class Recorder{ public: @@ -34,7 +36,8 @@ public: int Run(const char* output, const int mind, const int maxd); - int CachePacket(const avpacket &pkt); + int PushPacket(const CPacket &pkt); + int PushPackets(std::list<CPacket> &lst); int FireRecorder(const int64_t &id); void SetCallback(FUNC_REC_INFO cb){ @@ -42,12 +45,12 @@ } const bool ErrorOcurred(){return error_occured_;} + const std::string& RecID()const{return id_;} private: void run_thread(); int init_writer(); - void start_writer(); - int write_correctly(const avpacket &pkt); + int write_correctly(const CPacket &pkt); void end_writer(); void maybe_dump_gop(); @@ -60,7 +63,7 @@ int end_frame; int cur_frame; - std::list<avpacket> list_pkt_; + std::list<CPacket> list_pkt_; std::atomic_bool stop_recorder_; std::mutex mutex_pkt_; @@ -72,7 +75,7 @@ std::string id_; int64_t id_frame_; - int file_frame_index_; + int id_frame_in_file_; std::string file_path_; FUNC_REC_INFO func_rec_info_; diff --git a/csrc/cffmpeg.cpp b/csrc/cffmpeg.cpp index bb5a79f..3a97159 100644 --- a/csrc/cffmpeg.cpp +++ b/csrc/cffmpeg.cpp @@ -38,12 +38,12 @@ void c_ffmpeg_run_gb28181(const cffmpeg h){ Wrapper *s = (Wrapper*)h; - s->UseGB28181(); + s->GB28181(); } void c_ffmepg_use_cpu(const cffmpeg h){ Wrapper *s = (Wrapper*)h; - s->UseCPU(); + s->CPUDec(); } @@ -64,7 +64,7 @@ std::string p(""), id(""); s->GetInfoRecorder(id, i, p); - // printf("cffmpeg get info : index : %d, file : %s\n", i, p.c_str()); + // printf("cffmpeg get info : index : %d, file : %s, recid: %s\n", i, p.c_str(), id.c_str()); *index = i; diff --git a/csrc/ffmpeg/format/FormatIn.cpp b/csrc/ffmpeg/format/FormatIn.cpp index 4df2c0f..3e9dd51 100644 --- a/csrc/ffmpeg/format/FormatIn.cpp +++ b/csrc/ffmpeg/format/FormatIn.cpp @@ -106,11 +106,10 @@ } ret = avformat_open_input(&ctx_, "", NULL, options); - if(ret < 0){ - logIt("open %s failed:%s",filename, - getAVErrorDesc(ret).c_str()); - - } + // if(ret < 0){ + // logIt("open %s failed:%s",filename, + // getAVErrorDesc(ret).c_str()); + // } return ret; } @@ -119,11 +118,10 @@ int FormatIn::open(const char *filename, AVDictionary **options){ const int ret = avformat_open_input(&ctx_, filename, NULL, options); - if(ret < 0){ - logIt("open %s failed:%s",filename, - getAVErrorDesc(ret).c_str()); - - } + // if(ret < 0){ + // logIt("open %s failed:%s",filename, + // getAVErrorDesc(ret).c_str()); + // } return ret; } @@ -266,8 +264,8 @@ while (!founded){ const int ret = av_read_frame(ctx_, &pkt_out); if(ret < 0){ - logIt("read frame from %s failed:%s", - ctx_->filename,getAVErrorDesc(ret).c_str()); + // logIt("read frame from %s failed:%s", + // ctx_->filename,getAVErrorDesc(ret).c_str()); return false; } diff --git a/csrc/ffmpeg/format/FormatOut.cpp b/csrc/ffmpeg/format/FormatOut.cpp index 804c8ed..a6e8c3c 100644 --- a/csrc/ffmpeg/format/FormatOut.cpp +++ b/csrc/ffmpeg/format/FormatOut.cpp @@ -404,34 +404,32 @@ pkt.dts = pkt.pts; pkt.duration = av_rescale_q(calc_duration, time_base_q, time_base); //(double)(calc_duration)*(double)(av_q2d(time_base_q)) / (double)(av_q2d(time_base)); - // if (pkt.duration < 0 || time_base.den != 90000){ - // logIt("CALCULATE DURATION : %lld, fame count : %lld, TIMEBASE: %d", calc_duration,time_stamp, time_base.den); - // } - + // logIt("FRAME ID: %lld, PTS : %lld, DTS : %lld", frame_cnt, pkt.pts, pkt.dts); } bool FormatOut::writeFrame(AVPacket &pkt, const int64_t &frame_cnt, bool interleaved/* = true*/){ adjustPTS(pkt, frame_cnt); - return writeFrame2(pkt, interleaved); + auto ret = writeFrame2(pkt, interleaved); + if (!ret){ + logIt("write to file failed, pkt.pts: %lld, dts: %lld, frame count: %d", + pkt.pts, pkt.dts, frame_cnt); + } + return ret; } bool FormatOut::writeFrame2(AVPacket &pkt, bool interleaved){ int ret = 0; - if(interleaved) + if(interleaved){ ret = av_interleaved_write_frame(ctx_, &pkt); - else - { + }else{ // returns 1 if flushed and there is no more data to flush ret = av_write_frame(ctx_, &pkt); } - if(ret < 0) - { - logIt("write packet to file failed:%s", - getAVErrorDesc(ret).c_str()); + if(ret < 0){ return false; } diff --git a/csrc/stream.cpp b/csrc/stream.cpp deleted file mode 100644 index 42b8d73..0000000 --- a/csrc/stream.cpp +++ /dev/null @@ -1,36 +0,0 @@ -#include "stream.hpp" - -#include "ffmpeg/data/CodedData.hpp" - -namespace cffmpeg_wrap{ - stream::stream(){ - - } - stream::~stream(){ - - } - - int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){ - if (data){ - std::lock_guard<std::mutex> locker(mutex_avpkt_); - list_avpkt_.push_back(data); - return list_avpkt_.size(); - } - return 0; - } - - void stream::GetPacket(unsigned char **pktData, int *size, int *key){ - std::lock_guard<std::mutex> l(mutex_avpkt_); - if(list_avpkt_.empty()){ - return; - } - auto data = list_avpkt_.front(); - auto pkt = data->getAVPacket(); - *key = pkt.flags & AV_PKT_FLAG_KEY; - *size = pkt.size; - *pktData = (unsigned char *)malloc(*size); - memcpy(*pktData, pkt.data, pkt.size); - - list_avpkt_.pop_front(); - } -} \ No newline at end of file diff --git a/csrc/decoder.cpp b/csrc/worker/decoder.cpp similarity index 93% rename from csrc/decoder.cpp rename to csrc/worker/decoder.cpp index 381642d..0a9fb46 100644 --- a/csrc/decoder.cpp +++ b/csrc/worker/decoder.cpp @@ -1,10 +1,10 @@ #include "decoder.hpp" -#include "ffmpeg/bridge/cvbridge.hpp" -#include "ffmpeg/format/FormatIn.hpp" -#include "ffmpeg/data/CodedData.hpp" -#include "ffmpeg/data/FrameData.hpp" -#include "ffmpeg/log/log.hpp" +#include "../ffmpeg/bridge/cvbridge.hpp" +#include "../ffmpeg/format/FormatIn.hpp" +#include "../ffmpeg/data/CodedData.hpp" +#include "../ffmpeg/data/FrameData.hpp" +#include "../ffmpeg/log/log.hpp" extern "C"{ #include <libavformat/avformat.h> diff --git a/csrc/decoder.hpp b/csrc/worker/decoder.hpp similarity index 100% rename from csrc/decoder.hpp rename to csrc/worker/decoder.hpp diff --git a/csrc/rec.cpp b/csrc/worker/rec.cpp similarity index 66% rename from csrc/rec.cpp rename to csrc/worker/rec.cpp index 7327977..4307d6e 100644 --- a/csrc/rec.cpp +++ b/csrc/worker/rec.cpp @@ -1,12 +1,12 @@ #include "rec.hpp" #include <unistd.h> +#include <sys/time.h> -#include "ffmpeg/format/FormatIn.hpp" -#include "ffmpeg/data/CodedData.hpp" -#include "buz/recorder.hpp" -#include "ffmpeg/log/log.hpp" -#include "common/callback.hpp" +#include "../ffmpeg/format/FormatIn.hpp" +#include "../ffmpeg/data/CodedData.hpp" +#include "../ffmpeg/log/log.hpp" +#include "../common/callback.hpp" using namespace logif; using namespace ffwrapper; @@ -14,48 +14,15 @@ namespace cffmpeg_wrap { - rec::rec(ffwrapper::FormatIn *in) - :recRef_(in) + rec::rec() + :recRef_(NULL) ,minduration_(250) ,maxduration_(750) {} rec::~rec() { - { - std::lock_guard<std::mutex> l(mtx_rec_); - map_rec_.clear(); - } - - { - std::lock_guard<std::mutex> l(mtx_pkt_); - list_pkt_.clear(); - } - - } - - std::unique_ptr<Recorder> rec::newRec(std::string id, std::string dir, const int mind, const int maxd){ - if(!recRef_){ - logIt("Init wrapper first"); - return nullptr; - } - - std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str())); - - rec->SetCallback([&](std::string &id, int &index, std::string &path){ - setRecInfo(id, index, path); - }); - - int trycnt = 0; - while(trycnt < 100){ - const int ret = rec->Run(dir.c_str(), mind, maxd); - if(ret == 0) break; - usleep(200000); - } - if (trycnt < 100){ - return rec; - } - return nullptr; + clear(); } void rec::setRecInfo(std::string &id, int &index, std::string &path){ @@ -71,7 +38,33 @@ info.fPath = path; info.recID = id; list_recInfo_.emplace_back(info); - logIt("LIST REC FILES COUNT : %d", list_recInfo_.size()); + } + + std::unique_ptr<buz::Recorder> rec::startRec(std::string id, std::string dir, const int mind, const int maxd){ + if(!recRef_){ + logIt("Init wrapper first"); + return nullptr; + } + + std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str())); + + rec->SetCallback([&](std::string &id, int &index, std::string &path){ + setRecInfo(id, index, path); + }); + + int trycnt = 0; + while(trycnt < 100){ + auto ret = rec->Run(dir.c_str(), mind, maxd); + if(ret == 0) break; + usleep(200000); + } + if (trycnt < 100){ + std::lock_guard<std::mutex> locker(mtx_pkt_); + rec->PushPackets(list_pkt_); + return rec; + } + + return nullptr; } void rec::GetRecInfo(std::string &recID, int &index, std::string &path){ @@ -79,27 +72,35 @@ // 鑾峰彇淇℃伅 { std::lock_guard<std::mutex> l(mtx_recInfo_); - if(list_recInfo_.empty()){ - index = -1; - path = ""; - return; + if(!list_recInfo_.empty()){ + auto info = list_recInfo_.front(); + recID = info.recID; + index = info.frmIdx; + path = info.fPath; + list_recInfo_.pop_front(); } - auto info = list_recInfo_.front(); - recID = info.recID; - index = info.frmIdx; - path = info.fPath; - list_recInfo_.pop_front(); + } // 鍒犻櫎rec瀹炰緥 { std::lock_guard<std::mutex> l(mtx_rec_); - if (map_rec_.find(recID) != map_rec_.end()) + if (map_rec_.empty()){ + return; + } + + if (map_rec_.find(recID) != map_rec_.end()){ map_rec_.erase(recID); + return; + } for (auto iter = map_rec_.begin(); iter != map_rec_.end();){ - if (iter->second.rec && iter->second.rec->ErrorOcurred()){ + if (iter->second && iter->second->ErrorOcurred()){ + recID = iter->first; + index = -1; + path = ""; iter == map_rec_.erase(iter); + break; }else{ iter++; } @@ -107,54 +108,73 @@ } } + void rec::clear(){ + { + std::lock_guard<std::mutex> l(mtx_rec_); + map_rec_.clear(); + } + + { + std::lock_guard<std::mutex> l(mtx_pkt_); + list_pkt_.clear(); + } + } + + void rec::Load(ffwrapper::FormatIn *in){ + recRef_ = in; + } + + void rec::Unload(){ + recRef_ = NULL; + clear(); + } + + const bool rec::Loaded() const{ + return recRef_ != NULL; + } + void rec::NewRec(const char* id, const char *output, const int mindur, const int maxdur){ std::string rid(id); std::string dir(output); + minduration_ = mindur * 25; + maxduration_ = maxdur * 25; + { std::lock_guard<std::mutex> l(mtx_rec_); if (map_rec_.find(rid) != map_rec_.end()){ map_rec_.erase(rid); } - map_rec_[rid] = {rid, dir, mindur, maxdur, nullptr}; + map_rec_[rid] = startRec(rid, dir, mindur, maxdur); } - minduration_ = mindur * 25; - maxduration_ = maxdur * 25; } - void rec::FireRecSignal(const char* sid,const int64_t &id){ + void rec::FireRecSignal(const char* sid, const int64_t &id){ + + std::lock_guard<std::mutex> l(mtx_rec_); + auto iter = map_rec_.find(sid); if (iter != map_rec_.end()){ - if(iter->second.rec){ - iter->second.rec->FireRecorder(id); + if(iter->second){ + iter->second->FireRecorder(id); } } + + // logIt("recorders count: %d", map_rec_.size()); } void rec::SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){ if (!data) return; - cachePacket(data, id); - std::lock_guard<std::mutex> l(mtx_rec_); for(auto &i : map_rec_){ - if (!i.second.rec){ - i.second.rec = newRec(i.second.rid, i.second.dir, i.second.min, i.second.max); - if (i.second.rec){ - //姝ゅ嚱鏁拌繕鏈��鍑�,涓嶉渶瑕佽繖涓攣 - // std::lock_guard<std::mutex> locker(mtx_pkt_); - for(auto &k : list_pkt_){ - i.second.rec->CachePacket({k.data, k.id}); - } - // 鏂扮殑鏁版嵁缂撳瓨 - i.second.rec->CachePacket({data, id}); - logIt("START REC %d FRAMES", list_pkt_.size()); - } - }else if (i.second.rec){ - i.second.rec->CachePacket({data, id}); + if (i.second){ + i.second->PushPacket({data, id}); } } + + cachePacket(data, id); } void rec::cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){ @@ -173,7 +193,8 @@ int rec::shrinkCache(){ //瓒呰繃鏈�澶х紦瀛�,涓㈠純gop - while (list_pkt_.size() > minduration_) { + //缂撳瓨鏈�灏忛暱搴︾殑,鐢ㄤ簬璁板綍 + while (list_pkt_.size() > minduration_/2) { list_pkt_.pop_front(); while(!list_pkt_.empty()){ auto &cache = list_pkt_.front(); diff --git a/csrc/rec.hpp b/csrc/worker/rec.hpp similarity index 73% rename from csrc/rec.hpp rename to csrc/worker/rec.hpp index a356ba0..c489676 100644 --- a/csrc/rec.hpp +++ b/csrc/worker/rec.hpp @@ -7,6 +7,8 @@ #include <list> #include <mutex> +#include "../buz/recorder.hpp" + namespace ffwrapper { class FormatIn; @@ -16,12 +18,6 @@ namespace cffmpeg_wrap { - namespace buz{ - class Recorder; - struct avpacket; - } - - class rec { private: @@ -30,14 +26,7 @@ int minduration_; // 褰曞儚鐨勫疄渚�,瀵瑰簲浠诲姟 - typedef struct _fn_rec{ - std::string rid; //id瀵瑰簲浠诲姟id - std::string dir; - int min; - int max; - std::unique_ptr<buz::Recorder> rec; - }FnRec; - std::unordered_map<std::string, FnRec> map_rec_; + std::unordered_map<std::string, std::unique_ptr<buz::Recorder> > map_rec_; // 澶氱嚎绋嬫坊鍔犱换鍔″疄渚�,鍦ㄨ娴佺嚎绋嬩娇鐢ㄥ綍鍍�,浣嗘槸娣诲姞鍦ㄥ彟涓�涓嚎绋� std::mutex mtx_rec_; @@ -52,26 +41,28 @@ std::mutex mtx_recInfo_; // 缂撳瓨鐨勮棰戝抚,绛夊緟firerecsignal瑙﹀彂寮�濮嬪綍鍍� - typedef struct _cache_pkt{ - std::shared_ptr<ffwrapper::CodedData> data; - int64_t id; - }CPacket; - std::list<CPacket> list_pkt_; + std::list<buz::CPacket> list_pkt_; // 澶氱嚎绋�,鐢熶骇鑰呯嚎绋媟eader push pkt,娑堣垂鑰�,褰曞儚绾跨▼pop std::mutex mtx_pkt_; private: - // 鍒涘缓褰曞儚瀹炰緥 - std::unique_ptr<buz::Recorder> newRec(std::string id, std::string dir, const int mind, const int maxd); // 褰曞儚瀹炰緥鐨勫洖璋冨嚱鏁�,褰曞儚瀹屾垚鍚庤缃綍鍍忔枃浠惰矾寰�,id鍜屽抚id void setRecInfo(std::string &id, int &index, std::string &path); // 缂撳瓨瑙嗛鍖� void cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id); // 涓㈠純缂撳瓨 int shrinkCache(); + // 鍒涘缓褰曞儚瀹炰緥寮�濮嬪綍鍍� + std::unique_ptr<buz::Recorder> startRec(std::string id, std::string dir, const int mind, const int maxd); + // 娓呴櫎缂撳瓨,鏂嚎閲嶈繛鏃堕渶瑕� + void clear(); public: void NewRec(const char* id, const char *output, const int mindur, const int maxdur); + // 鍑嗗濂藉綍鍍� + void Load(ffwrapper::FormatIn *in); + void Unload(); + const bool Loaded() const; // 缂撳瓨褰曞儚鐨勮棰戝寘,绛夊緟瑙﹀彂褰曞儚,鎴栫洿鎺ユ斁鍒板綍鍍忕紦瀛� void SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id); // 瑙﹀彂褰曞儚 @@ -80,7 +71,7 @@ void GetRecInfo(std::string &recID, int &index, std::string &path); public: - explicit rec(ffwrapper::FormatIn *in); + rec(); ~rec(); }; } // namespace cffmpeg_wrap diff --git a/csrc/worker/stream.cpp b/csrc/worker/stream.cpp new file mode 100644 index 0000000..0f8dadc --- /dev/null +++ b/csrc/worker/stream.cpp @@ -0,0 +1,51 @@ +#include "stream.hpp" + +#include "../ffmpeg/data/CodedData.hpp" + +namespace cffmpeg_wrap{ + stream::stream(const int maxSize) + :max_size_(maxSize) + {} + + stream::~stream(){ + std::lock_guard<std::mutex> locker(mutex_avpkt_); + list_avpkt_.clear(); + } + + int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){ + if (data){ + std::lock_guard<std::mutex> locker(mutex_avpkt_); + list_avpkt_.push_back(data); + + while(list_avpkt_.size() > max_size_){ + list_avpkt_.pop_front(); + while(!list_avpkt_.empty()){ + auto &cache = list_avpkt_.front(); + AVPacket &avpkt = cache->getAVPacket(); + if (!(avpkt.flags & AV_PKT_FLAG_KEY)){ + list_avpkt_.pop_front(); + }else{ + break; + } + } + } + return list_avpkt_.size(); + } + return 0; + } + + void stream::GetPacket(unsigned char **pktData, int *size, int *key){ + std::lock_guard<std::mutex> l(mutex_avpkt_); + if(list_avpkt_.empty()){ + return; + } + auto data = list_avpkt_.front(); + auto pkt = data->getAVPacket(); + *key = pkt.flags & AV_PKT_FLAG_KEY; + *size = pkt.size; + *pktData = (unsigned char *)malloc(*size); + memcpy(*pktData, pkt.data, pkt.size); + + list_avpkt_.pop_front(); + } +} \ No newline at end of file diff --git a/csrc/stream.hpp b/csrc/worker/stream.hpp similarity index 87% rename from csrc/stream.hpp rename to csrc/worker/stream.hpp index 5cbc44a..6c87ff6 100644 --- a/csrc/stream.hpp +++ b/csrc/worker/stream.hpp @@ -15,9 +15,9 @@ private: std::list<std::shared_ptr<ffwrapper::CodedData> > list_avpkt_; std::mutex mutex_avpkt_; - + const int max_size_; public: - stream(/* args */); + explicit stream(const int maxSize); ~stream(); int SetPacket(std::shared_ptr<ffwrapper::CodedData> data); diff --git a/csrc/wrapper.cpp b/csrc/wrapper.cpp index 9fc2e25..63bb661 100644 --- a/csrc/wrapper.cpp +++ b/csrc/wrapper.cpp @@ -21,12 +21,20 @@ #include "buz/recorder.hpp" -#include "stream.hpp" -#include "decoder.hpp" -#include "rec.hpp" +#include "worker/stream.hpp" +#include "worker/decoder.hpp" +#include "worker/rec.hpp" using namespace logif; using namespace ffwrapper; + +#define DELETE_POINTER(p) \ +do \ +{ \ +if(NULL != p) \ +delete p; \ +p = NULL; \ +}while(0) namespace cffmpeg_wrap{ using namespace buz; @@ -38,12 +46,11 @@ ,scale_f_(SWS_POINT) ,gb_(0) ,cpu_(0) - ,use_decoder_(false) ,thread_(nullptr) ,stop_stream_(false) ,stream_(nullptr) ,decoder_(nullptr) - ,rec_(nullptr) + ,rec_(new rec) { makeTheWorld(); } @@ -57,6 +64,7 @@ stop_stream_.store(true); thread_->join(); } + DELETE_POINTER(rec_); } catch(const std::exception& e) { @@ -70,11 +78,11 @@ scale_h_ = h; } - void Wrapper::UseGB28181(){ + void Wrapper::GB28181(){ gb_ = 1; } - void Wrapper::UseCPU(){ + void Wrapper::CPUDec(){ cpu_ = 1; } @@ -123,19 +131,24 @@ return 0; } - void Wrapper::init_stream(){ - if (stream_) delete stream_; - stream_ = new stream; + void Wrapper::init_worker(ffwrapper::FormatIn *in){ + if (rec_->Loaded() && stream_ && decoder_) return; + stream_ = new stream(3 * 25); + decoder_ = new decoder(in, scale_w_, scale_h_, scale_f_); + rec_->Load(in); + if(fn_rec_lazy_) fn_rec_lazy_(in); + } + + void Wrapper::run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){ + if (stream_) stream_->SetPacket(data); + if (decoder_) decoder_->SetFrame(data, id); + if (rec_->Loaded()) rec_->SetPacket(data, id); } - void Wrapper::init_decoder(ffwrapper::FormatIn *in){ - if (decoder_) delete decoder_; - decoder_ = new decoder(in, scale_w_, scale_h_,scale_f_); - } - - void Wrapper::init_rec(ffwrapper::FormatIn *in){ - if (rec_) delete rec_; - rec_ = new rec(in); + void Wrapper::deinit_worker(){ + DELETE_POINTER(stream_); + DELETE_POINTER(decoder_); + rec_->Unload(); } void Wrapper::run_stream_thread(){ @@ -148,11 +161,8 @@ usleep(200000); continue; } - init_stream(); - if (use_decoder_){ - init_decoder(in.get()); - } - init_rec(in.get()); + + init_worker(in.get()); int64_t id = 0; while(!stop_stream_.load()){ @@ -161,24 +171,28 @@ logIt("read packet error"); break; } - if (stream_) stream_->SetPacket(data); - if (use_decoder_ && decoder_) decoder_->SetFrame(data, id); - if (rec_) rec_->SetPacket(data, id); - + + run_worker(data, id); id++; } + + deinit_worker(); } } void Wrapper::BuildRecorder(const char* id, const char *output, const int mindur, const int maxdur){ - if (rec_){ + if (rec_->Loaded()){ rec_->NewRec(id, output, mindur, maxdur); + }else{ + std::string rid(id), dir(output); + fn_rec_lazy_ = + [=](ffwrapper::FormatIn *in){rec_->NewRec(rid.c_str(), dir.c_str(), mindur, maxdur);}; } } int Wrapper::FireRecorder(const char* sid,const int64_t &id){ - if (rec_){ + if (rec_->Loaded()){ rec_->FireRecSignal(sid, id); } } @@ -190,7 +204,7 @@ } ////////decoder void Wrapper::BuildDecoder(){ - use_decoder_ = true; + // use_decoder_ = true; } void Wrapper::GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id){ diff --git a/csrc/wrapper.hpp b/csrc/wrapper.hpp index d398ed4..044b4ba 100644 --- a/csrc/wrapper.hpp +++ b/csrc/wrapper.hpp @@ -12,7 +12,6 @@ #include <thread> #include <atomic> #include <mutex> -#include <unordered_map> #include <memory> #include "common/callback.hpp" @@ -31,53 +30,49 @@ class rec; class Wrapper{ - public: - Wrapper(); - ~Wrapper (); + public: + Wrapper(); + ~Wrapper (); + private: + std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input); - private: - std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input); - void init_stream(); - void init_decoder(ffwrapper::FormatIn *in); - void init_rec(ffwrapper::FormatIn *in); - - public: - int RunStream(const char* input); - private: - void run_stream_thread(); + void init_worker(ffwrapper::FormatIn *in); + void run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id); + void deinit_worker(); + public: + int RunStream(const char* input); + private: + void run_stream_thread(); + public: //recorder + void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd); + int FireRecorder(const char* sid,const int64_t &id); + void GetInfoRecorder(std::string &recID, int &index, std::string &path); + void ScalePicture(const int w, const int h, const int flags); + void GB28181(); + void CPUDec(); + public: //decoder + void BuildDecoder(); + void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id); + public: // push stream + void GetPacket(unsigned char **pktData, int *size, int *key); + private: + // stream 鍙傛暟 + std::string input_url_; + int scale_w_, scale_h_, scale_f_; - public: //recorder - void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd); - int FireRecorder(const char* sid,const int64_t &id); - void GetInfoRecorder(std::string &recID, int &index, std::string &path); - - void ScalePicture(const int w, const int h, const int flags); - void UseGB28181(); - void UseCPU(); - public: //decoder - void BuildDecoder(); - void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id); - void GetPacket(unsigned char **pktData, int *size, int *key); - - private: - // stream 鍙傛暟 - std::string input_url_; - int scale_w_, scale_h_, scale_f_; - int gb_, cpu_; - bool use_decoder_; - - // decoder 鍙傛暟 - std::unique_ptr<std::thread> thread_; - std::atomic_bool stop_stream_; - - // 涓氬姟绫� - // 鎺ㄦ祦绫� - stream* stream_; - // 瑙g爜绫� - decoder* decoder_; - // 褰曞儚绫� - rec* rec_; - + int gb_, cpu_; + // decoder 鍙傛暟 + std::unique_ptr<std::thread> thread_; + std::atomic_bool stop_stream_; + // 涓氬姟绫� + // 鎺ㄦ祦绫� + stream* stream_; + // 瑙g爜绫� + decoder* decoder_; + // 褰曞儚绫�,涓�鐩村瓨鍦� + rec* rec_; + // 褰曞儚璇锋眰缂撳瓨,绛夊緟runstream鍚庢坊鍔� + std::function<void(ffwrapper::FormatIn*)> fn_rec_lazy_; }; uint8_t *DecodeJPEG(const char *file, int *w, int *h); -- Gitblit v1.8.0