From 54ea1c13885725584a6a50d520f67e8a75f85b6f Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 19 八月 2019 11:33:13 +0800
Subject: [PATCH] fix rec
---
csrc/wrapper.cpp | 91 +++++++++++++++++++++++-------
csrc/wrapper.hpp | 11 ++-
csrc/buz/recorder.hpp | 4 +
csrc/buz/recorder.cpp | 17 ++++-
4 files changed, 94 insertions(+), 29 deletions(-)
diff --git a/csrc/buz/recorder.cpp b/csrc/buz/recorder.cpp
index 1dd71ab..2da437c 100644
--- a/csrc/buz/recorder.cpp
+++ b/csrc/buz/recorder.cpp
@@ -3,6 +3,7 @@
#include <thread>
#include <unistd.h>
+#include <chrono>
extern "C"{
#include <libavcodec/avcodec.h>
@@ -32,6 +33,7 @@
,file_path_("")
,func_rec_info_(nullptr)
,thrd_(nullptr)
+ ,error_occured_(false)
{
// logIt("RECODER ID: %s", id_.c_str());
}
@@ -41,8 +43,11 @@
try
{
if (thrd_){
- stop_recorder_.store(true);
- cv_.notify_one();
+ {
+ std::unique_lock<std::mutex> locker(mutex_pkt_);
+ stop_recorder_.store(true);
+ cv_.notify_one();
+ }
thrd_->join();
logIt("REC THREAD JOINED, QUIT!!!");
}
@@ -138,9 +143,15 @@
std::list<avpacket> pkts;
{
std::unique_lock<std::mutex> locker(mutex_pkt_);
- cv_.wait(locker,[&]{
+ auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
return !list_pkt_.empty() || stop_recorder_.load();
});
+
+ if (!status){
+ end_writer();
+ error_occured_ = true;
+ break;
+ }
if(stop_recorder_.load()){
end_writer();
break;
diff --git a/csrc/buz/recorder.hpp b/csrc/buz/recorder.hpp
index 7f12d2e..ba9cea4 100644
--- a/csrc/buz/recorder.hpp
+++ b/csrc/buz/recorder.hpp
@@ -40,6 +40,8 @@
void SetCallback(FUNC_REC_INFO cb){
func_rec_info_ = cb;
}
+
+ const bool ErrorOcurred(){return error_occured_;}
private:
void run_thread();
@@ -73,6 +75,8 @@
int file_frame_index_;
std::string file_path_;
FUNC_REC_INFO func_rec_info_;
+
+ bool error_occured_;
};
}
}
diff --git a/csrc/wrapper.cpp b/csrc/wrapper.cpp
index 3bf1b57..2fd825a 100644
--- a/csrc/wrapper.cpp
+++ b/csrc/wrapper.cpp
@@ -149,7 +149,10 @@
auto data(std::make_shared<CodedData>());
if(!in->readPacket(data)){
logIt("read packet error");
- pkt.id = -1; data = nullptr; id = 0;
+ data.reset();
+ data = nullptr;
+ pkt.id = -1;
+ id = 0;
}else{
pkt.id = id++;
}
@@ -160,7 +163,10 @@
run_worker(in.get(), pkt);
if(!data){
- map_rec_.clear();
+ {
+ std::lock_guard<std::mutex> l(mutex_rec_);
+ map_rec_.clear();
+ }
std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
list_rec_pkt_.clear();
@@ -205,20 +211,25 @@
}
}
cache_rec_pkt(pkt);
- for(auto &i : map_rec_){
- if (!i.second.rec){
- i.second.rec = i.second.fn_init(in);
- if (i.second.rec){
- std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
- for(auto &k : list_rec_pkt_){
- avpacket p = {k.data, k.id};
- i.second.rec->CachePacket(p);
+ {
+ std::lock_guard<std::mutex> l(mutex_rec_);
+
+ for(auto &i : map_rec_){
+ if (!i.second.rec){
+ i.second.rec = std::move(init_recorder(in, i.second.rid, i.second.dir, i.second.min, i.second.max));
+ if (i.second.rec){
+ std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
+ for(auto &k : list_rec_pkt_){
+ avpacket p = {k.data, k.id};
+ i.second.rec->CachePacket(p);
+ }
+ logIt("START REC %d FRAMES", list_rec_pkt_.size());
}
- logIt("START REC %d FRAMES", list_rec_pkt_.size());
+ }else if (i.second.rec){
+ i.second.rec->CachePacket(pkt);
}
- }else if (i.second.rec){
- i.second.rec->CachePacket(pkt);
}
+
}
}
@@ -255,13 +266,13 @@
}
//////////////recorder
- std::shared_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
+ std::unique_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
if(!in){
logIt("Init wrapper first");
return nullptr;
}
- auto rec = std::make_shared<Recorder>(in, id.c_str());
+ std::unique_ptr<Recorder> rec(new Recorder(in, id.c_str()));
rec->SetCallback([&](std::string &id, int &index, std::string &path){
cache_rec_info(id, index, path);
@@ -283,25 +294,50 @@
std::string rid(id);
std::string dir(output);
- auto fn = [=](FormatIn *in){
- return init_recorder(in, rid, dir, mindur, maxdur);
- };
- std::shared_ptr<Recorder> rec(nullptr);
-
- FnRec r = FnRec{fn, rec};
- map_rec_[rid] = r;
+ std::lock_guard<std::mutex> l(mutex_rec_);
+
+ // auto fn = [=](FormatIn *in){
+ // return init_recorder(in, rid, dir, mindur, maxdur);
+ // };
+ // FnRec r = FnRec{fn, nullptr};
+ if (map_rec_.find(rid) != map_rec_.end()){
+ map_rec_.erase(rid);
+ }
+ // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+ // if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+ // iter == map_rec_.erase(iter);
+ // }else{
+ // iter++;
+ // }
+ // }
+ FnRec fr;
+ fr.rid = rid;
+ fr.dir = dir;
+ fr.min = mindur;
+ fr.max = maxdur;
+ map_rec_[rid] = std::move(fr);
minduration = mindur * 25;
maxduration = maxdur * 25;
}
int Wrapper::FireRecorder(const char* sid,const int64_t &id){
+ std::lock_guard<std::mutex> l(mutex_rec_);
+
auto iter = map_rec_.find(sid);
if (iter != map_rec_.end()){
if(iter->second.rec){
iter->second.rec->FireRecorder(id);
}
}
+
+ // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+ // if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+ // iter == map_rec_.erase(iter);
+ // }else{
+ // iter++;
+ // }
+ // }
}
void Wrapper::cache_rec_info(std::string &id, int &index, std::string &path){
@@ -336,10 +372,21 @@
if (map_rec_.find(info.rec_id) != map_rec_.end())
map_rec_.erase(info.rec_id);
+
+ for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+ if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+ iter == map_rec_.erase(iter);
+ }else{
+ iter++;
+ }
+ }
+
// logIt("go get info index: %d, file: %s\n", index, path.c_str());
}
std::string Wrapper::GetRecorderID(const std::string &path){
+ std::lock_guard<std::mutex> l(mutex_rec_);
+
std::string ret("");
auto iter = list_rec_map_.find(path);
if (iter != list_rec_map_.end()){
diff --git a/csrc/wrapper.hpp b/csrc/wrapper.hpp
index b11c9ca..66f2aca 100644
--- a/csrc/wrapper.hpp
+++ b/csrc/wrapper.hpp
@@ -44,11 +44,14 @@
struct avpacket;
}
- typedef std::function<std::shared_ptr<buz::Recorder>(ffwrapper::FormatIn*)> FN_REC;
+ // typedef std::function<std::shared_ptr<buz::Recorder>(ffwrapper::FormatIn*)> FN_REC;
typedef struct _fn_rec{
- FN_REC fn_init;
- std::shared_ptr<buz::Recorder> rec;
+ std::string rid;
+ std::string dir;
+ int min;
+ int max;
+ std::unique_ptr<buz::Recorder> rec;
}FnRec;
class Wrapper{
@@ -60,7 +63,7 @@
std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
// ffwrapper::FormatIn* init_reader_gb28181(const char* input);
void run_worker(ffwrapper::FormatIn *in, buz::avpacket &pkt);
- std::shared_ptr<buz::Recorder> init_recorder(ffwrapper::FormatIn *in, std::string id,std::string dir, const int mind, const int maxd);
+ std::unique_ptr<buz::Recorder> init_recorder(ffwrapper::FormatIn *in, std::string id,std::string dir, const int mind, const int maxd);
void cache_rec_info(std::string &id, int &index, std::string &path);
void cache_pic(std::shared_ptr<ffwrapper::FrameData> &frame, int64_t &id);
--
Gitblit v1.8.0