From b73029149580370e62dd6c14a270aea902f85cf2 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 18 九月 2019 09:52:30 +0800
Subject: [PATCH] fix rec bug
---
csrc/buz/recorder.hpp | 17
csrc/buz/recorder.cpp | 235 +++++++++---------
csrc/ffmpeg/format/FormatIn.cpp | 22
/dev/null | 36 --
csrc/wrapper.cpp | 74 +++--
csrc/worker/rec.cpp | 173 +++++++-----
csrc/worker/stream.cpp | 51 +++
csrc/worker/decoder.cpp | 10
csrc/wrapper.hpp | 87 +++---
csrc/ffmpeg/format/FormatOut.cpp | 22
csrc/worker/rec.hpp | 35 +-
csrc/worker/stream.hpp | 4
csrc/worker/decoder.hpp | 0
csrc/cffmpeg.cpp | 6
14 files changed, 403 insertions(+), 369 deletions(-)
diff --git a/csrc/buz/recorder.cpp b/csrc/buz/recorder.cpp
index a7086fe..af5a1fc 100644
--- a/csrc/buz/recorder.cpp
+++ b/csrc/buz/recorder.cpp
@@ -25,11 +25,11 @@
,maxduration(30 * 25)
,minduration(10 * 25)
,end_frame(minduration)
- ,cur_frame(-1)
+ ,cur_frame(0)
,stop_recorder_(false)
,id_(id)
- ,id_frame_(0)
- ,file_frame_index_(-1)
+ ,id_frame_(-1)
+ ,id_frame_in_file_(-1)
,file_path_("")
,func_rec_info_(nullptr)
,thrd_(nullptr)
@@ -43,13 +43,13 @@
try
{
if (thrd_){
- {
- std::unique_lock<std::mutex> locker(mutex_pkt_);
+ if (!stop_recorder_.load()){
stop_recorder_.store(true);
cv_.notify_one();
}
+
thrd_->join();
- logIt("REC THREAD JOINED, QUIT!!!");
+ // logIt("REC THREAD JOINED, QUIT!!!, %s", id_.c_str());
}
}
catch(const std::exception& e)
@@ -71,121 +71,95 @@
out_ = new FormatOut(in_->getStream(), "mp4");
- return 0;
- }
-
- void Recorder::start_writer(){
- if (cur_frame == 0) {
-
- sole::uuid u4 = sole::uuid4();
- file_path_ = dir_ + "/" + u4.base62() + ".mp4";
- out_->JustWriter(in_->getStream(), file_path_.c_str());
- logIt("START RECORD %s", file_path_.c_str());
+ file_path_ = dir_ + "/" + sole::uuid4().base62() + ".mp4";
+ auto ret = out_->JustWriter(in_->getStream(), file_path_.c_str());
+ if (ret){
+ return 0;
}
+
+ return -1;
}
- int Recorder::write_correctly(const avpacket &pkt){
+ int Recorder::write_correctly(const CPacket &pkt){
//reader failed, break stream
if(pkt.id == -1 && !pkt.data){
- end_writer();
+ return -1;
+ }
+
+ if (cur_frame == end_frame){
return 1;
}
- // writer error, reinit writer
+
int64_t cur = cur_frame++;
AVPacket &op = pkt.data->getAVPacket();
AVPacket np(op);
av_copy_packet(&np, &op);
- if(!out_->writeFrame(np, cur)){
- av_packet_unref(&np);
- end_writer();
- return -1;
- }
+ auto ret = out_->writeFrame(np, cur);
av_packet_unref(&np);
+ if (!ret) return -1;
+
if(pkt.id == id_frame_){
- file_frame_index_ = cur_frame-1;
+ id_frame_in_file_ = cur_frame-1;
}
+
// logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_);
return 0;
}
void Recorder::end_writer(){
- if(cur_frame == -1) return;
+
out_->endWriter();
- logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
- file_frame_index_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
-
- //reinit cur_frame clear list pkt
- {
- std::lock_guard<std::mutex> locker(mutex_pkt_);
- cur_frame = -1;
- end_frame = minduration;
- list_pkt_.clear();
- }
-
- //callback to frame index and path
- if(func_rec_info_){
- func_rec_info_(id_, file_frame_index_, file_path_);
- }
- }
-
- void Recorder::run_thread(){
- bool reinit_writer = false;
- while(!stop_recorder_.load()){
- if (reinit_writer) {
- while(!stop_recorder_.load()){
- if(init_writer() == 0)
- break;
- usleep(300000);
- }
- if(stop_recorder_.load()) break;
- }
-
- std::list<avpacket> pkts;
- {
- std::unique_lock<std::mutex> locker(mutex_pkt_);
- auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
- return !list_pkt_.empty() || stop_recorder_.load();
- });
-
- if (!status){
- end_writer();
- error_occured_ = true;
- break;
- }
- if(stop_recorder_.load()){
- end_writer();
- break;
- }
- if(cur_frame == -1){
- continue;
- }
- list_pkt_.swap(pkts);
- }
-
- if (cur_frame == 0) {
- start_writer();
- }
-
- for(auto &i : pkts){
- if (cur_frame < end_frame){
- if(write_correctly(i) != 0){
- stop_recorder_.store(true);
- break;
- }
- }else{
- end_writer();
- stop_recorder_.store(true);
- break;
- }
- }
-
- }
-
if (out_){
delete out_;
out_ = NULL;
}
- // stop_recorder_.store(false);
+ {
+ std::lock_guard<std::mutex> l(mutex_pkt_);
+ list_pkt_.clear();
+ }
+ // logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
+ // id_frame_in_file_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
+
+ //callback to frame index and path
+ if(func_rec_info_){
+ func_rec_info_(id_,id_frame_in_file_, file_path_);
+ }
+
+ }
+
+ void Recorder::run_thread(){
+
+ while(!stop_recorder_.load()){
+
+ std::list<CPacket> pkts;
+ {
+ std::unique_lock<std::mutex> locker(mutex_pkt_);
+ auto status = cv_.wait_for(locker, std::chrono::seconds(3), [&]{
+ return !list_pkt_.empty() || stop_recorder_.load();
+ });
+
+ if (!status || stop_recorder_.load()){
+ error_occured_ = !status;
+ break;
+ }
+ list_pkt_.swap(pkts);
+ }
+
+ int ret = 0;
+ for(auto &i : pkts){
+ ret = write_correctly(i);
+ if (ret != 0){
+ break;
+ }
+ }
+
+ if (ret != 0){
+ break;
+ }
+ }
+
+ stop_recorder_.store(true);
+ end_writer();
}
int Recorder::Run(const char* output, const int mind, const int maxd){
@@ -204,7 +178,7 @@
end_frame = minduration;
}
- logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);
+ // logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);
thrd_.reset(new std::thread([&]{
run_thread();
@@ -215,38 +189,37 @@
}
int Recorder::FireRecorder(const int64_t &id){
- if(cur_frame == -1){
+ if (stop_recorder_.load()) return -1;
+
+ if(id_frame_ == -1){
id_frame_ = id;
- logIt("FIRST FIRE RECORD ID: %lld", id);
- {
- std::lock_guard<std::mutex> locker(mutex_pkt_);
- cur_frame = 0;
- if (list_pkt_.size() > end_frame){
- end_frame = list_pkt_.size() + minduration/2;
- if (end_frame > maxduration)
- end_frame = maxduration;
- }
+
+ std::lock_guard<std::mutex> locker(mutex_pkt_);
+ if (list_pkt_.size() > end_frame){
+ end_frame = list_pkt_.size() + minduration/2;
+ if (end_frame > maxduration)
+ end_frame = maxduration;
}
- }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){
+
+ // logIt("FIRST FIRE RECORD ID: %lld, cur_frame: %d, end_frame: %d", id, cur_frame, end_frame);
+
+ }else if(cur_frame > minduration/2 && end_frame < maxduration){
end_frame = end_frame + minduration / 2;
if(end_frame > maxduration){
end_frame = maxduration;
}
+ // logIt("PROLONG REC, cur_frame: %d, end_frame: %d", cur_frame, end_frame);
}
// logIt("FIRE REC FRAME ID: %lld", id);
return 0;
}
- int Recorder::CachePacket(const avpacket &pkt){
+ int Recorder::PushPacket(const CPacket &pkt){
+ if (stop_recorder_.load()) return 0;
std::lock_guard<std::mutex> locker(mutex_pkt_);
- if(cur_frame == -1){
- //error occur, stream break
- if(pkt.id == -1 && pkt.data == nullptr){
- list_pkt_.clear();
- return -1;
- }
+ if(id_frame_ == -1){
//wait I
if (list_pkt_.empty()) {
AVPacket &avpkt = pkt.data->getAVPacket();
@@ -258,17 +231,43 @@
maybe_dump_gop();
list_pkt_.push_back(pkt);
+ // cv_.notify_one();
+
}else{
list_pkt_.push_back(pkt);
cv_.notify_one();
}
- return 0;
+ return list_pkt_.size();
+ }
+
+ int Recorder::PushPackets(std::list<CPacket> &lst){
+
+ if (stop_recorder_.load()) return 0;
+
+ std::lock_guard<std::mutex> locker(mutex_pkt_);
+ bool i = false;
+ for (auto &p : lst){
+ if (!i){
+ AVPacket &avpkt = p.data->getAVPacket();
+ if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
+ continue;
+ }
+ i = true;
+ }
+
+ list_pkt_.push_back(p);
+ }
+ maybe_dump_gop();
+ cv_.notify_one();
+
+ // logIt("CACHE PACKET : %d", list_pkt_.size());
+ return list_pkt_.size();
}
void Recorder::maybe_dump_gop(){
//瓒呰繃min/2,涓㈠純gop
- while (list_pkt_.size() > maxduration) {
+ while (list_pkt_.size() > minduration) {
list_pkt_.pop_front();
while(!list_pkt_.empty()){
auto &cache = list_pkt_.front();
diff --git a/csrc/buz/recorder.hpp b/csrc/buz/recorder.hpp
index ba9cea4..8c3b550 100644
--- a/csrc/buz/recorder.hpp
+++ b/csrc/buz/recorder.hpp
@@ -22,10 +22,12 @@
namespace cffmpeg_wrap{
namespace buz{
- struct avpacket{
+ // 缂撳瓨鐨勮棰戝抚,绛夊緟fire瑙﹀彂寮�濮嬪綍鍍�
+ typedef struct _cache_pkt{
std::shared_ptr<ffwrapper::CodedData> data;
int64_t id;
- };
+ }CPacket;
+
class Recorder{
public:
@@ -34,7 +36,8 @@
public:
int Run(const char* output, const int mind, const int maxd);
- int CachePacket(const avpacket &pkt);
+ int PushPacket(const CPacket &pkt);
+ int PushPackets(std::list<CPacket> &lst);
int FireRecorder(const int64_t &id);
void SetCallback(FUNC_REC_INFO cb){
@@ -42,12 +45,12 @@
}
const bool ErrorOcurred(){return error_occured_;}
+ const std::string& RecID()const{return id_;}
private:
void run_thread();
int init_writer();
- void start_writer();
- int write_correctly(const avpacket &pkt);
+ int write_correctly(const CPacket &pkt);
void end_writer();
void maybe_dump_gop();
@@ -60,7 +63,7 @@
int end_frame;
int cur_frame;
- std::list<avpacket> list_pkt_;
+ std::list<CPacket> list_pkt_;
std::atomic_bool stop_recorder_;
std::mutex mutex_pkt_;
@@ -72,7 +75,7 @@
std::string id_;
int64_t id_frame_;
- int file_frame_index_;
+ int id_frame_in_file_;
std::string file_path_;
FUNC_REC_INFO func_rec_info_;
diff --git a/csrc/cffmpeg.cpp b/csrc/cffmpeg.cpp
index bb5a79f..3a97159 100644
--- a/csrc/cffmpeg.cpp
+++ b/csrc/cffmpeg.cpp
@@ -38,12 +38,12 @@
void c_ffmpeg_run_gb28181(const cffmpeg h){
Wrapper *s = (Wrapper*)h;
- s->UseGB28181();
+ s->GB28181();
}
void c_ffmepg_use_cpu(const cffmpeg h){
Wrapper *s = (Wrapper*)h;
- s->UseCPU();
+ s->CPUDec();
}
@@ -64,7 +64,7 @@
std::string p(""), id("");
s->GetInfoRecorder(id, i, p);
- // printf("cffmpeg get info : index : %d, file : %s\n", i, p.c_str());
+ // printf("cffmpeg get info : index : %d, file : %s, recid: %s\n", i, p.c_str(), id.c_str());
*index = i;
diff --git a/csrc/ffmpeg/format/FormatIn.cpp b/csrc/ffmpeg/format/FormatIn.cpp
index 4df2c0f..3e9dd51 100644
--- a/csrc/ffmpeg/format/FormatIn.cpp
+++ b/csrc/ffmpeg/format/FormatIn.cpp
@@ -106,11 +106,10 @@
}
ret = avformat_open_input(&ctx_, "", NULL, options);
- if(ret < 0){
- logIt("open %s failed:%s",filename,
- getAVErrorDesc(ret).c_str());
-
- }
+ // if(ret < 0){
+ // logIt("open %s failed:%s",filename,
+ // getAVErrorDesc(ret).c_str());
+ // }
return ret;
}
@@ -119,11 +118,10 @@
int FormatIn::open(const char *filename, AVDictionary **options){
const int ret = avformat_open_input(&ctx_, filename, NULL, options);
- if(ret < 0){
- logIt("open %s failed:%s",filename,
- getAVErrorDesc(ret).c_str());
-
- }
+ // if(ret < 0){
+ // logIt("open %s failed:%s",filename,
+ // getAVErrorDesc(ret).c_str());
+ // }
return ret;
}
@@ -266,8 +264,8 @@
while (!founded){
const int ret = av_read_frame(ctx_, &pkt_out);
if(ret < 0){
- logIt("read frame from %s failed:%s",
- ctx_->filename,getAVErrorDesc(ret).c_str());
+ // logIt("read frame from %s failed:%s",
+ // ctx_->filename,getAVErrorDesc(ret).c_str());
return false;
}
diff --git a/csrc/ffmpeg/format/FormatOut.cpp b/csrc/ffmpeg/format/FormatOut.cpp
index 804c8ed..a6e8c3c 100644
--- a/csrc/ffmpeg/format/FormatOut.cpp
+++ b/csrc/ffmpeg/format/FormatOut.cpp
@@ -404,34 +404,32 @@
pkt.dts = pkt.pts;
pkt.duration = av_rescale_q(calc_duration, time_base_q, time_base); //(double)(calc_duration)*(double)(av_q2d(time_base_q)) / (double)(av_q2d(time_base));
- // if (pkt.duration < 0 || time_base.den != 90000){
- // logIt("CALCULATE DURATION : %lld, fame count : %lld, TIMEBASE: %d", calc_duration,time_stamp, time_base.den);
- // }
-
+ // logIt("FRAME ID: %lld, PTS : %lld, DTS : %lld", frame_cnt, pkt.pts, pkt.dts);
}
bool FormatOut::writeFrame(AVPacket &pkt, const int64_t &frame_cnt,
bool interleaved/* = true*/){
adjustPTS(pkt, frame_cnt);
- return writeFrame2(pkt, interleaved);
+ auto ret = writeFrame2(pkt, interleaved);
+ if (!ret){
+ logIt("write to file failed, pkt.pts: %lld, dts: %lld, frame count: %d",
+ pkt.pts, pkt.dts, frame_cnt);
+ }
+ return ret;
}
bool FormatOut::writeFrame2(AVPacket &pkt, bool interleaved){
int ret = 0;
- if(interleaved)
+ if(interleaved){
ret = av_interleaved_write_frame(ctx_, &pkt);
- else
- {
+ }else{
// returns 1 if flushed and there is no more data to flush
ret = av_write_frame(ctx_, &pkt);
}
- if(ret < 0)
- {
- logIt("write packet to file failed:%s",
- getAVErrorDesc(ret).c_str());
+ if(ret < 0){
return false;
}
diff --git a/csrc/stream.cpp b/csrc/stream.cpp
deleted file mode 100644
index 42b8d73..0000000
--- a/csrc/stream.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "stream.hpp"
-
-#include "ffmpeg/data/CodedData.hpp"
-
-namespace cffmpeg_wrap{
- stream::stream(){
-
- }
- stream::~stream(){
-
- }
-
- int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){
- if (data){
- std::lock_guard<std::mutex> locker(mutex_avpkt_);
- list_avpkt_.push_back(data);
- return list_avpkt_.size();
- }
- return 0;
- }
-
- void stream::GetPacket(unsigned char **pktData, int *size, int *key){
- std::lock_guard<std::mutex> l(mutex_avpkt_);
- if(list_avpkt_.empty()){
- return;
- }
- auto data = list_avpkt_.front();
- auto pkt = data->getAVPacket();
- *key = pkt.flags & AV_PKT_FLAG_KEY;
- *size = pkt.size;
- *pktData = (unsigned char *)malloc(*size);
- memcpy(*pktData, pkt.data, pkt.size);
-
- list_avpkt_.pop_front();
- }
-}
\ No newline at end of file
diff --git a/csrc/decoder.cpp b/csrc/worker/decoder.cpp
similarity index 93%
rename from csrc/decoder.cpp
rename to csrc/worker/decoder.cpp
index 381642d..0a9fb46 100644
--- a/csrc/decoder.cpp
+++ b/csrc/worker/decoder.cpp
@@ -1,10 +1,10 @@
#include "decoder.hpp"
-#include "ffmpeg/bridge/cvbridge.hpp"
-#include "ffmpeg/format/FormatIn.hpp"
-#include "ffmpeg/data/CodedData.hpp"
-#include "ffmpeg/data/FrameData.hpp"
-#include "ffmpeg/log/log.hpp"
+#include "../ffmpeg/bridge/cvbridge.hpp"
+#include "../ffmpeg/format/FormatIn.hpp"
+#include "../ffmpeg/data/CodedData.hpp"
+#include "../ffmpeg/data/FrameData.hpp"
+#include "../ffmpeg/log/log.hpp"
extern "C"{
#include <libavformat/avformat.h>
diff --git a/csrc/decoder.hpp b/csrc/worker/decoder.hpp
similarity index 100%
rename from csrc/decoder.hpp
rename to csrc/worker/decoder.hpp
diff --git a/csrc/rec.cpp b/csrc/worker/rec.cpp
similarity index 66%
rename from csrc/rec.cpp
rename to csrc/worker/rec.cpp
index 7327977..4307d6e 100644
--- a/csrc/rec.cpp
+++ b/csrc/worker/rec.cpp
@@ -1,12 +1,12 @@
#include "rec.hpp"
#include <unistd.h>
+#include <sys/time.h>
-#include "ffmpeg/format/FormatIn.hpp"
-#include "ffmpeg/data/CodedData.hpp"
-#include "buz/recorder.hpp"
-#include "ffmpeg/log/log.hpp"
-#include "common/callback.hpp"
+#include "../ffmpeg/format/FormatIn.hpp"
+#include "../ffmpeg/data/CodedData.hpp"
+#include "../ffmpeg/log/log.hpp"
+#include "../common/callback.hpp"
using namespace logif;
using namespace ffwrapper;
@@ -14,48 +14,15 @@
namespace cffmpeg_wrap
{
- rec::rec(ffwrapper::FormatIn *in)
- :recRef_(in)
+ rec::rec()
+ :recRef_(NULL)
,minduration_(250)
,maxduration_(750)
{}
rec::~rec()
{
- {
- std::lock_guard<std::mutex> l(mtx_rec_);
- map_rec_.clear();
- }
-
- {
- std::lock_guard<std::mutex> l(mtx_pkt_);
- list_pkt_.clear();
- }
-
- }
-
- std::unique_ptr<Recorder> rec::newRec(std::string id, std::string dir, const int mind, const int maxd){
- if(!recRef_){
- logIt("Init wrapper first");
- return nullptr;
- }
-
- std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
-
- rec->SetCallback([&](std::string &id, int &index, std::string &path){
- setRecInfo(id, index, path);
- });
-
- int trycnt = 0;
- while(trycnt < 100){
- const int ret = rec->Run(dir.c_str(), mind, maxd);
- if(ret == 0) break;
- usleep(200000);
- }
- if (trycnt < 100){
- return rec;
- }
- return nullptr;
+ clear();
}
void rec::setRecInfo(std::string &id, int &index, std::string &path){
@@ -71,7 +38,33 @@
info.fPath = path;
info.recID = id;
list_recInfo_.emplace_back(info);
- logIt("LIST REC FILES COUNT : %d", list_recInfo_.size());
+ }
+
+ std::unique_ptr<buz::Recorder> rec::startRec(std::string id, std::string dir, const int mind, const int maxd){
+ if(!recRef_){
+ logIt("Init wrapper first");
+ return nullptr;
+ }
+
+ std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
+
+ rec->SetCallback([&](std::string &id, int &index, std::string &path){
+ setRecInfo(id, index, path);
+ });
+
+ int trycnt = 0;
+ while(trycnt < 100){
+ auto ret = rec->Run(dir.c_str(), mind, maxd);
+ if(ret == 0) break;
+ usleep(200000);
+ }
+ if (trycnt < 100){
+ std::lock_guard<std::mutex> locker(mtx_pkt_);
+ rec->PushPackets(list_pkt_);
+ return rec;
+ }
+
+ return nullptr;
}
void rec::GetRecInfo(std::string &recID, int &index, std::string &path){
@@ -79,27 +72,35 @@
// 鑾峰彇淇℃伅
{
std::lock_guard<std::mutex> l(mtx_recInfo_);
- if(list_recInfo_.empty()){
- index = -1;
- path = "";
- return;
+ if(!list_recInfo_.empty()){
+ auto info = list_recInfo_.front();
+ recID = info.recID;
+ index = info.frmIdx;
+ path = info.fPath;
+ list_recInfo_.pop_front();
}
- auto info = list_recInfo_.front();
- recID = info.recID;
- index = info.frmIdx;
- path = info.fPath;
- list_recInfo_.pop_front();
+
}
// 鍒犻櫎rec瀹炰緥
{
std::lock_guard<std::mutex> l(mtx_rec_);
- if (map_rec_.find(recID) != map_rec_.end())
+ if (map_rec_.empty()){
+ return;
+ }
+
+ if (map_rec_.find(recID) != map_rec_.end()){
map_rec_.erase(recID);
+ return;
+ }
for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
- if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+ if (iter->second && iter->second->ErrorOcurred()){
+ recID = iter->first;
+ index = -1;
+ path = "";
iter == map_rec_.erase(iter);
+ break;
}else{
iter++;
}
@@ -107,54 +108,73 @@
}
}
+ void rec::clear(){
+ {
+ std::lock_guard<std::mutex> l(mtx_rec_);
+ map_rec_.clear();
+ }
+
+ {
+ std::lock_guard<std::mutex> l(mtx_pkt_);
+ list_pkt_.clear();
+ }
+ }
+
+ void rec::Load(ffwrapper::FormatIn *in){
+ recRef_ = in;
+ }
+
+ void rec::Unload(){
+ recRef_ = NULL;
+ clear();
+ }
+
+ const bool rec::Loaded() const{
+ return recRef_ != NULL;
+ }
+
void rec::NewRec(const char* id, const char *output, const int mindur, const int maxdur){
std::string rid(id);
std::string dir(output);
+ minduration_ = mindur * 25;
+ maxduration_ = maxdur * 25;
+
{
std::lock_guard<std::mutex> l(mtx_rec_);
if (map_rec_.find(rid) != map_rec_.end()){
map_rec_.erase(rid);
}
- map_rec_[rid] = {rid, dir, mindur, maxdur, nullptr};
+ map_rec_[rid] = startRec(rid, dir, mindur, maxdur);
}
- minduration_ = mindur * 25;
- maxduration_ = maxdur * 25;
}
- void rec::FireRecSignal(const char* sid,const int64_t &id){
+ void rec::FireRecSignal(const char* sid, const int64_t &id){
+
+ std::lock_guard<std::mutex> l(mtx_rec_);
+
auto iter = map_rec_.find(sid);
if (iter != map_rec_.end()){
- if(iter->second.rec){
- iter->second.rec->FireRecorder(id);
+ if(iter->second){
+ iter->second->FireRecorder(id);
}
}
+
+ // logIt("recorders count: %d", map_rec_.size());
}
void rec::SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
if (!data) return;
- cachePacket(data, id);
-
std::lock_guard<std::mutex> l(mtx_rec_);
for(auto &i : map_rec_){
- if (!i.second.rec){
- i.second.rec = newRec(i.second.rid, i.second.dir, i.second.min, i.second.max);
- if (i.second.rec){
- //姝ゅ嚱鏁拌繕鏈��鍑�,涓嶉渶瑕佽繖涓攣
- // std::lock_guard<std::mutex> locker(mtx_pkt_);
- for(auto &k : list_pkt_){
- i.second.rec->CachePacket({k.data, k.id});
- }
- // 鏂扮殑鏁版嵁缂撳瓨
- i.second.rec->CachePacket({data, id});
- logIt("START REC %d FRAMES", list_pkt_.size());
- }
- }else if (i.second.rec){
- i.second.rec->CachePacket({data, id});
+ if (i.second){
+ i.second->PushPacket({data, id});
}
}
+
+ cachePacket(data, id);
}
void rec::cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
@@ -173,7 +193,8 @@
int rec::shrinkCache(){
//瓒呰繃鏈�澶х紦瀛�,涓㈠純gop
- while (list_pkt_.size() > minduration_) {
+ //缂撳瓨鏈�灏忛暱搴︾殑,鐢ㄤ簬璁板綍
+ while (list_pkt_.size() > minduration_/2) {
list_pkt_.pop_front();
while(!list_pkt_.empty()){
auto &cache = list_pkt_.front();
diff --git a/csrc/rec.hpp b/csrc/worker/rec.hpp
similarity index 73%
rename from csrc/rec.hpp
rename to csrc/worker/rec.hpp
index a356ba0..c489676 100644
--- a/csrc/rec.hpp
+++ b/csrc/worker/rec.hpp
@@ -7,6 +7,8 @@
#include <list>
#include <mutex>
+#include "../buz/recorder.hpp"
+
namespace ffwrapper
{
class FormatIn;
@@ -16,12 +18,6 @@
namespace cffmpeg_wrap
{
- namespace buz{
- class Recorder;
- struct avpacket;
- }
-
-
class rec
{
private:
@@ -30,14 +26,7 @@
int minduration_;
// 褰曞儚鐨勫疄渚�,瀵瑰簲浠诲姟
- typedef struct _fn_rec{
- std::string rid; //id瀵瑰簲浠诲姟id
- std::string dir;
- int min;
- int max;
- std::unique_ptr<buz::Recorder> rec;
- }FnRec;
- std::unordered_map<std::string, FnRec> map_rec_;
+ std::unordered_map<std::string, std::unique_ptr<buz::Recorder> > map_rec_;
// 澶氱嚎绋嬫坊鍔犱换鍔″疄渚�,鍦ㄨ娴佺嚎绋嬩娇鐢ㄥ綍鍍�,浣嗘槸娣诲姞鍦ㄥ彟涓�涓嚎绋�
std::mutex mtx_rec_;
@@ -52,26 +41,28 @@
std::mutex mtx_recInfo_;
// 缂撳瓨鐨勮棰戝抚,绛夊緟firerecsignal瑙﹀彂寮�濮嬪綍鍍�
- typedef struct _cache_pkt{
- std::shared_ptr<ffwrapper::CodedData> data;
- int64_t id;
- }CPacket;
- std::list<CPacket> list_pkt_;
+ std::list<buz::CPacket> list_pkt_;
// 澶氱嚎绋�,鐢熶骇鑰呯嚎绋媟eader push pkt,娑堣垂鑰�,褰曞儚绾跨▼pop
std::mutex mtx_pkt_;
private:
- // 鍒涘缓褰曞儚瀹炰緥
- std::unique_ptr<buz::Recorder> newRec(std::string id, std::string dir, const int mind, const int maxd);
// 褰曞儚瀹炰緥鐨勫洖璋冨嚱鏁�,褰曞儚瀹屾垚鍚庤缃綍鍍忔枃浠惰矾寰�,id鍜屽抚id
void setRecInfo(std::string &id, int &index, std::string &path);
// 缂撳瓨瑙嗛鍖�
void cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
// 涓㈠純缂撳瓨
int shrinkCache();
+ // 鍒涘缓褰曞儚瀹炰緥寮�濮嬪綍鍍�
+ std::unique_ptr<buz::Recorder> startRec(std::string id, std::string dir, const int mind, const int maxd);
+ // 娓呴櫎缂撳瓨,鏂嚎閲嶈繛鏃堕渶瑕�
+ void clear();
public:
void NewRec(const char* id, const char *output, const int mindur, const int maxdur);
+ // 鍑嗗濂藉綍鍍�
+ void Load(ffwrapper::FormatIn *in);
+ void Unload();
+ const bool Loaded() const;
// 缂撳瓨褰曞儚鐨勮棰戝寘,绛夊緟瑙﹀彂褰曞儚,鎴栫洿鎺ユ斁鍒板綍鍍忕紦瀛�
void SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
// 瑙﹀彂褰曞儚
@@ -80,7 +71,7 @@
void GetRecInfo(std::string &recID, int &index, std::string &path);
public:
- explicit rec(ffwrapper::FormatIn *in);
+ rec();
~rec();
};
} // namespace cffmpeg_wrap
diff --git a/csrc/worker/stream.cpp b/csrc/worker/stream.cpp
new file mode 100644
index 0000000..0f8dadc
--- /dev/null
+++ b/csrc/worker/stream.cpp
@@ -0,0 +1,51 @@
+#include "stream.hpp"
+
+#include "../ffmpeg/data/CodedData.hpp"
+
+namespace cffmpeg_wrap{
+ stream::stream(const int maxSize)
+ :max_size_(maxSize)
+ {}
+
+ stream::~stream(){
+ std::lock_guard<std::mutex> locker(mutex_avpkt_);
+ list_avpkt_.clear();
+ }
+
+ int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){
+ if (data){
+ std::lock_guard<std::mutex> locker(mutex_avpkt_);
+ list_avpkt_.push_back(data);
+
+ while(list_avpkt_.size() > max_size_){
+ list_avpkt_.pop_front();
+ while(!list_avpkt_.empty()){
+ auto &cache = list_avpkt_.front();
+ AVPacket &avpkt = cache->getAVPacket();
+ if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
+ list_avpkt_.pop_front();
+ }else{
+ break;
+ }
+ }
+ }
+ return list_avpkt_.size();
+ }
+ return 0;
+ }
+
+ void stream::GetPacket(unsigned char **pktData, int *size, int *key){
+ std::lock_guard<std::mutex> l(mutex_avpkt_);
+ if(list_avpkt_.empty()){
+ return;
+ }
+ auto data = list_avpkt_.front();
+ auto pkt = data->getAVPacket();
+ *key = pkt.flags & AV_PKT_FLAG_KEY;
+ *size = pkt.size;
+ *pktData = (unsigned char *)malloc(*size);
+ memcpy(*pktData, pkt.data, pkt.size);
+
+ list_avpkt_.pop_front();
+ }
+}
\ No newline at end of file
diff --git a/csrc/stream.hpp b/csrc/worker/stream.hpp
similarity index 87%
rename from csrc/stream.hpp
rename to csrc/worker/stream.hpp
index 5cbc44a..6c87ff6 100644
--- a/csrc/stream.hpp
+++ b/csrc/worker/stream.hpp
@@ -15,9 +15,9 @@
private:
std::list<std::shared_ptr<ffwrapper::CodedData> > list_avpkt_;
std::mutex mutex_avpkt_;
-
+ const int max_size_;
public:
- stream(/* args */);
+ explicit stream(const int maxSize);
~stream();
int SetPacket(std::shared_ptr<ffwrapper::CodedData> data);
diff --git a/csrc/wrapper.cpp b/csrc/wrapper.cpp
index 9fc2e25..63bb661 100644
--- a/csrc/wrapper.cpp
+++ b/csrc/wrapper.cpp
@@ -21,12 +21,20 @@
#include "buz/recorder.hpp"
-#include "stream.hpp"
-#include "decoder.hpp"
-#include "rec.hpp"
+#include "worker/stream.hpp"
+#include "worker/decoder.hpp"
+#include "worker/rec.hpp"
using namespace logif;
using namespace ffwrapper;
+
+#define DELETE_POINTER(p) \
+do \
+{ \
+if(NULL != p) \
+delete p; \
+p = NULL; \
+}while(0)
namespace cffmpeg_wrap{
using namespace buz;
@@ -38,12 +46,11 @@
,scale_f_(SWS_POINT)
,gb_(0)
,cpu_(0)
- ,use_decoder_(false)
,thread_(nullptr)
,stop_stream_(false)
,stream_(nullptr)
,decoder_(nullptr)
- ,rec_(nullptr)
+ ,rec_(new rec)
{
makeTheWorld();
}
@@ -57,6 +64,7 @@
stop_stream_.store(true);
thread_->join();
}
+ DELETE_POINTER(rec_);
}
catch(const std::exception& e)
{
@@ -70,11 +78,11 @@
scale_h_ = h;
}
- void Wrapper::UseGB28181(){
+ void Wrapper::GB28181(){
gb_ = 1;
}
- void Wrapper::UseCPU(){
+ void Wrapper::CPUDec(){
cpu_ = 1;
}
@@ -123,19 +131,24 @@
return 0;
}
- void Wrapper::init_stream(){
- if (stream_) delete stream_;
- stream_ = new stream;
+ void Wrapper::init_worker(ffwrapper::FormatIn *in){
+ if (rec_->Loaded() && stream_ && decoder_) return;
+ stream_ = new stream(3 * 25);
+ decoder_ = new decoder(in, scale_w_, scale_h_, scale_f_);
+ rec_->Load(in);
+ if(fn_rec_lazy_) fn_rec_lazy_(in);
+ }
+
+ void Wrapper::run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
+ if (stream_) stream_->SetPacket(data);
+ if (decoder_) decoder_->SetFrame(data, id);
+ if (rec_->Loaded()) rec_->SetPacket(data, id);
}
- void Wrapper::init_decoder(ffwrapper::FormatIn *in){
- if (decoder_) delete decoder_;
- decoder_ = new decoder(in, scale_w_, scale_h_,scale_f_);
- }
-
- void Wrapper::init_rec(ffwrapper::FormatIn *in){
- if (rec_) delete rec_;
- rec_ = new rec(in);
+ void Wrapper::deinit_worker(){
+ DELETE_POINTER(stream_);
+ DELETE_POINTER(decoder_);
+ rec_->Unload();
}
void Wrapper::run_stream_thread(){
@@ -148,11 +161,8 @@
usleep(200000);
continue;
}
- init_stream();
- if (use_decoder_){
- init_decoder(in.get());
- }
- init_rec(in.get());
+
+ init_worker(in.get());
int64_t id = 0;
while(!stop_stream_.load()){
@@ -161,24 +171,28 @@
logIt("read packet error");
break;
}
- if (stream_) stream_->SetPacket(data);
- if (use_decoder_ && decoder_) decoder_->SetFrame(data, id);
- if (rec_) rec_->SetPacket(data, id);
-
+
+ run_worker(data, id);
id++;
}
+
+ deinit_worker();
}
}
void Wrapper::BuildRecorder(const char* id, const char *output, const int mindur, const int maxdur){
- if (rec_){
+ if (rec_->Loaded()){
rec_->NewRec(id, output, mindur, maxdur);
+ }else{
+ std::string rid(id), dir(output);
+ fn_rec_lazy_ =
+ [=](ffwrapper::FormatIn *in){rec_->NewRec(rid.c_str(), dir.c_str(), mindur, maxdur);};
}
}
int Wrapper::FireRecorder(const char* sid,const int64_t &id){
- if (rec_){
+ if (rec_->Loaded()){
rec_->FireRecSignal(sid, id);
}
}
@@ -190,7 +204,7 @@
}
////////decoder
void Wrapper::BuildDecoder(){
- use_decoder_ = true;
+ // use_decoder_ = true;
}
void Wrapper::GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id){
diff --git a/csrc/wrapper.hpp b/csrc/wrapper.hpp
index d398ed4..044b4ba 100644
--- a/csrc/wrapper.hpp
+++ b/csrc/wrapper.hpp
@@ -12,7 +12,6 @@
#include <thread>
#include <atomic>
#include <mutex>
-#include <unordered_map>
#include <memory>
#include "common/callback.hpp"
@@ -31,53 +30,49 @@
class rec;
class Wrapper{
- public:
- Wrapper();
- ~Wrapper ();
+ public:
+ Wrapper();
+ ~Wrapper ();
+ private:
+ std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
- private:
- std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
- void init_stream();
- void init_decoder(ffwrapper::FormatIn *in);
- void init_rec(ffwrapper::FormatIn *in);
-
- public:
- int RunStream(const char* input);
- private:
- void run_stream_thread();
+ void init_worker(ffwrapper::FormatIn *in);
+ void run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
+ void deinit_worker();
+ public:
+ int RunStream(const char* input);
+ private:
+ void run_stream_thread();
+ public: //recorder
+ void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd);
+ int FireRecorder(const char* sid,const int64_t &id);
+ void GetInfoRecorder(std::string &recID, int &index, std::string &path);
+ void ScalePicture(const int w, const int h, const int flags);
+ void GB28181();
+ void CPUDec();
+ public: //decoder
+ void BuildDecoder();
+ void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id);
+ public: // push stream
+ void GetPacket(unsigned char **pktData, int *size, int *key);
+ private:
+ // stream 鍙傛暟
+ std::string input_url_;
+ int scale_w_, scale_h_, scale_f_;
- public: //recorder
- void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd);
- int FireRecorder(const char* sid,const int64_t &id);
- void GetInfoRecorder(std::string &recID, int &index, std::string &path);
-
- void ScalePicture(const int w, const int h, const int flags);
- void UseGB28181();
- void UseCPU();
- public: //decoder
- void BuildDecoder();
- void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id);
- void GetPacket(unsigned char **pktData, int *size, int *key);
-
- private:
- // stream 鍙傛暟
- std::string input_url_;
- int scale_w_, scale_h_, scale_f_;
- int gb_, cpu_;
- bool use_decoder_;
-
- // decoder 鍙傛暟
- std::unique_ptr<std::thread> thread_;
- std::atomic_bool stop_stream_;
-
- // 涓氬姟绫�
- // 鎺ㄦ祦绫�
- stream* stream_;
- // 瑙g爜绫�
- decoder* decoder_;
- // 褰曞儚绫�
- rec* rec_;
-
+ int gb_, cpu_;
+ // decoder 鍙傛暟
+ std::unique_ptr<std::thread> thread_;
+ std::atomic_bool stop_stream_;
+ // 涓氬姟绫�
+ // 鎺ㄦ祦绫�
+ stream* stream_;
+ // 瑙g爜绫�
+ decoder* decoder_;
+ // 褰曞儚绫�,涓�鐩村瓨鍦�
+ rec* rec_;
+ // 褰曞儚璇锋眰缂撳瓨,绛夊緟runstream鍚庢坊鍔�
+ std::function<void(ffwrapper::FormatIn*)> fn_rec_lazy_;
};
uint8_t *DecodeJPEG(const char *file, int *w, int *h);
--
Gitblit v1.8.0