From 54ea1c13885725584a6a50d520f67e8a75f85b6f Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 19 八月 2019 11:33:13 +0800
Subject: [PATCH] fix rec

---
 csrc/wrapper.cpp      |   91 +++++++++++++++++++++++-------
 csrc/wrapper.hpp      |   11 ++-
 csrc/buz/recorder.hpp |    4 +
 csrc/buz/recorder.cpp |   17 ++++-
 4 files changed, 94 insertions(+), 29 deletions(-)

diff --git a/csrc/buz/recorder.cpp b/csrc/buz/recorder.cpp
index 1dd71ab..2da437c 100644
--- a/csrc/buz/recorder.cpp
+++ b/csrc/buz/recorder.cpp
@@ -3,6 +3,7 @@
 
 #include <thread>
 #include <unistd.h>
+#include <chrono>
 
 extern "C"{
 #include <libavcodec/avcodec.h>
@@ -32,6 +33,7 @@
         ,file_path_("")
         ,func_rec_info_(nullptr)
         ,thrd_(nullptr)
+        ,error_occured_(false)
         {
             // logIt("RECODER ID: %s", id_.c_str());
         }
@@ -41,8 +43,11 @@
             try
             {
                 if (thrd_){
-                    stop_recorder_.store(true);
-                    cv_.notify_one();
+                    {
+                        std::unique_lock<std::mutex> locker(mutex_pkt_);
+                        stop_recorder_.store(true);
+                        cv_.notify_one();
+                    }
                     thrd_->join();
                     logIt("REC THREAD JOINED, QUIT!!!");
                 }
@@ -138,9 +143,15 @@
                 std::list<avpacket> pkts;
                 {
                     std::unique_lock<std::mutex> locker(mutex_pkt_);
-                    cv_.wait(locker,[&]{
+                    auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
                         return !list_pkt_.empty() || stop_recorder_.load();
                     });
+
+                    if (!status){
+                        end_writer();
+                        error_occured_ = true;
+                        break;
+                    }
                     if(stop_recorder_.load()){
                         end_writer();
                         break;
diff --git a/csrc/buz/recorder.hpp b/csrc/buz/recorder.hpp
index 7f12d2e..ba9cea4 100644
--- a/csrc/buz/recorder.hpp
+++ b/csrc/buz/recorder.hpp
@@ -40,6 +40,8 @@
                 void SetCallback(FUNC_REC_INFO cb){
                     func_rec_info_ = cb;
                 }
+
+                const bool ErrorOcurred(){return error_occured_;}
             private:
                 void run_thread();
 
@@ -73,6 +75,8 @@
                 int                     file_frame_index_;
                 std::string             file_path_;
                 FUNC_REC_INFO           func_rec_info_;
+
+                bool                    error_occured_;
         };
     }
 }
diff --git a/csrc/wrapper.cpp b/csrc/wrapper.cpp
index 3bf1b57..2fd825a 100644
--- a/csrc/wrapper.cpp
+++ b/csrc/wrapper.cpp
@@ -149,7 +149,10 @@
                 auto data(std::make_shared<CodedData>());
     	        if(!in->readPacket(data)){
                     logIt("read packet error");
-                    pkt.id = -1; data = nullptr; id = 0;
+                    data.reset();
+                    data = nullptr;
+                    pkt.id = -1; 
+                    id = 0;
     	        }else{
                     pkt.id = id++;
                 }
@@ -160,7 +163,10 @@
 
                 run_worker(in.get(), pkt);
                 if(!data){
-                    map_rec_.clear();
+                    {
+                        std::lock_guard<std::mutex> l(mutex_rec_);
+                        map_rec_.clear();
+                    }
                     std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
                     list_rec_pkt_.clear();
 
@@ -205,20 +211,25 @@
             }
         }
         cache_rec_pkt(pkt);
-        for(auto &i : map_rec_){
-            if (!i.second.rec){
-                i.second.rec = i.second.fn_init(in);
-                if (i.second.rec){
-                    std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
-                    for(auto &k : list_rec_pkt_){
-                        avpacket p = {k.data, k.id};
-                        i.second.rec->CachePacket(p);
+        {
+            std::lock_guard<std::mutex> l(mutex_rec_);
+
+            for(auto &i : map_rec_){
+                if (!i.second.rec){
+                    i.second.rec = std::move(init_recorder(in, i.second.rid, i.second.dir, i.second.min, i.second.max));
+                    if (i.second.rec){
+                        std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
+                        for(auto &k : list_rec_pkt_){
+                            avpacket p = {k.data, k.id};
+                            i.second.rec->CachePacket(p);
+                        }
+                        logIt("START REC %d FRAMES", list_rec_pkt_.size());
                     }
-                    logIt("START REC %d FRAMES", list_rec_pkt_.size());
+                }else if (i.second.rec){
+                    i.second.rec->CachePacket(pkt);
                 }
-            }else if (i.second.rec){
-                i.second.rec->CachePacket(pkt);
             }
+
         }
     }
 
@@ -255,13 +266,13 @@
     }
 
     //////////////recorder
-    std::shared_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
+    std::unique_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
         if(!in){
             logIt("Init wrapper first");
             return nullptr;
         }
 
-        auto rec = std::make_shared<Recorder>(in, id.c_str());
+        std::unique_ptr<Recorder> rec(new Recorder(in, id.c_str()));
 
         rec->SetCallback([&](std::string &id, int &index, std::string &path){
             cache_rec_info(id, index, path);
@@ -283,25 +294,50 @@
         std::string rid(id);
         std::string dir(output);
         
-        auto fn = [=](FormatIn *in){
-            return init_recorder(in, rid, dir, mindur, maxdur);
-        };
-        std::shared_ptr<Recorder> rec(nullptr);
-        
-        FnRec r = FnRec{fn, rec};
-        map_rec_[rid] = r;
+        std::lock_guard<std::mutex> l(mutex_rec_);
+
+        // auto fn = [=](FormatIn *in){
+        //     return init_recorder(in, rid, dir, mindur, maxdur);
+        // };
+        // FnRec r = FnRec{fn, nullptr};
+        if (map_rec_.find(rid) != map_rec_.end()){
+            map_rec_.erase(rid);
+        }
+        // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+        //     if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+        //         iter == map_rec_.erase(iter);
+        //     }else{
+        //         iter++;
+        //     }
+        // }
+        FnRec fr;
+        fr.rid = rid;
+        fr.dir = dir;
+        fr.min = mindur;
+        fr.max = maxdur;
+        map_rec_[rid] = std::move(fr);
 
         minduration = mindur * 25;
         maxduration = maxdur * 25;
     }
 
     int Wrapper::FireRecorder(const char* sid,const int64_t &id){
+        std::lock_guard<std::mutex> l(mutex_rec_);
+
         auto iter = map_rec_.find(sid);
         if (iter != map_rec_.end()){
             if(iter->second.rec){
                 iter->second.rec->FireRecorder(id);
             }
         }
+
+        // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+        //     if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+        //         iter == map_rec_.erase(iter);
+        //     }else{
+        //         iter++;
+        //     }
+        // }
     }
 
     void Wrapper::cache_rec_info(std::string &id, int &index, std::string &path){
@@ -336,10 +372,21 @@
 
         if (map_rec_.find(info.rec_id) != map_rec_.end())
             map_rec_.erase(info.rec_id);
+
+        for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+            if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+                iter == map_rec_.erase(iter);
+            }else{
+                iter++;
+            }
+        }
+
         // logIt("go get info index: %d, file: %s\n", index, path.c_str());
     }
 
     std::string Wrapper::GetRecorderID(const std::string &path){
+        std::lock_guard<std::mutex> l(mutex_rec_);
+
         std::string ret("");
         auto iter = list_rec_map_.find(path);
         if (iter != list_rec_map_.end()){
diff --git a/csrc/wrapper.hpp b/csrc/wrapper.hpp
index b11c9ca..66f2aca 100644
--- a/csrc/wrapper.hpp
+++ b/csrc/wrapper.hpp
@@ -44,11 +44,14 @@
         struct avpacket;
     }
 
-    typedef std::function<std::shared_ptr<buz::Recorder>(ffwrapper::FormatIn*)> FN_REC;
+    // typedef std::function<std::shared_ptr<buz::Recorder>(ffwrapper::FormatIn*)> FN_REC;
 
     typedef struct _fn_rec{
-        FN_REC fn_init;
-        std::shared_ptr<buz::Recorder> rec;    
+        std::string rid;
+        std::string dir;
+        int min;
+        int max;
+        std::unique_ptr<buz::Recorder> rec;    
     }FnRec;
 
     class Wrapper{
@@ -60,7 +63,7 @@
             std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
             // ffwrapper::FormatIn* init_reader_gb28181(const char* input);
             void run_worker(ffwrapper::FormatIn *in, buz::avpacket &pkt);
-            std::shared_ptr<buz::Recorder> init_recorder(ffwrapper::FormatIn *in, std::string id,std::string dir, const int mind, const int maxd);
+            std::unique_ptr<buz::Recorder> init_recorder(ffwrapper::FormatIn *in, std::string id,std::string dir, const int mind, const int maxd);
 
             void cache_rec_info(std::string &id, int &index, std::string &path);
             void cache_pic(std::shared_ptr<ffwrapper::FrameData> &frame, int64_t &id);

--
Gitblit v1.8.0