From a6dd7933e0bd8ae1fd083639758f7fee9fc7a151 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期二, 10 九月 2019 16:34:10 +0800
Subject: [PATCH] Merge branch 'master' of ssh://192.168.1.14:29418/valib/goffmpeg

---
 csrc/wrapper.cpp |  170 ++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 145 insertions(+), 25 deletions(-)

diff --git a/csrc/wrapper.cpp b/csrc/wrapper.cpp
index 4e3d8b8..513055e 100644
--- a/csrc/wrapper.cpp
+++ b/csrc/wrapper.cpp
@@ -39,6 +39,8 @@
     ,gb_(0)
     ,cpu_(0)
     ,use_decoder_(false)
+    ,minduration(250)
+    ,maxduration(750)
     {
         makeTheWorld();
     }
@@ -46,14 +48,29 @@
 
     Wrapper::~Wrapper()
     {
-        if(thread_){
-            stop_stream_.store(true);
-            thread_->join();
+        try
+        {
+            if(thread_){
+                stop_stream_.store(true);
+                thread_->join();
+            }
+            if(bridge_){
+                delete bridge_; bridge_ = NULL;
+            }
+
+            map_rec_.clear();
+            list_rec_pkt_.clear();
+
+            for(auto &i : list_pic_){
+                free(i.data);
+            }
         }
-        if(bridge_){
-            delete bridge_; bridge_ = NULL;
+        catch(const std::exception& e)
+        {
+            logIt("WRAPPER EXCEPTION: ", e.what());
         }
-    
+        
+        
     }
 
     void Wrapper::ScalePicture(const int w, const int h, const int flags){
@@ -132,7 +149,10 @@
                 auto data(std::make_shared<CodedData>());
     	        if(!in->readPacket(data)){
                     logIt("read packet error");
-                    pkt.id = -1; data = nullptr; id = 0;
+                    data.reset();
+                    data = nullptr;
+                    pkt.id = -1; 
+                    id = 0;
     	        }else{
                     pkt.id = id++;
                 }
@@ -143,6 +163,13 @@
 
                 run_worker(in.get(), pkt);
                 if(!data){
+                    {
+                        std::lock_guard<std::mutex> l(mutex_rec_);
+                        map_rec_.clear();
+                    }
+                    std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
+                    list_rec_pkt_.clear();
+
                     break;
                 }
                 //test
@@ -180,27 +207,72 @@
             auto ret = in->decode(frame, pkt.data);
             if(ret == 1){
                 //鍚愬嚭鏁版嵁
-                cache_pic(frame);
+                cache_pic(frame, pkt.id);
             }
         }
+        cache_rec_pkt(pkt);
+        {
+            std::lock_guard<std::mutex> l(mutex_rec_);
 
-        for(auto &i : map_rec_){
-            if (!i.second.rec){
-                i.second.rec = i.second.fn_init(in);
+            for(auto &i : map_rec_){
+                if (!i.second.rec){
+                    i.second.rec = std::move(init_recorder(in, i.second.rid, i.second.dir, i.second.min, i.second.max));
+                    if (i.second.rec){
+                        std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
+                        for(auto &k : list_rec_pkt_){
+                            avpacket p = {k.data, k.id};
+                            i.second.rec->CachePacket(p);
+                        }
+                        logIt("START REC %d FRAMES", list_rec_pkt_.size());
+                    }
+                }else if (i.second.rec){
+                    i.second.rec->CachePacket(pkt);
+                }
             }
-            if (i.second.rec){
-                i.second.rec->CachePacket(pkt);
+
+        }
+    }
+
+    int Wrapper::cache_rec_pkt(const avpacket &pkt){
+
+        std::lock_guard<std::mutex> locker(mtx_rec_pkt_);
+        //wait I 
+        if (list_rec_pkt_.empty()) {
+            AVPacket &avpkt = pkt.data->getAVPacket();
+            if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
+                return -1;
+            }
+        }
+        maybe_dump_rec_pkt();
+        recpkt k = {pkt.data, pkt.id};
+        list_rec_pkt_.push_back(k);
+        
+        return 0;
+    }
+    void Wrapper::maybe_dump_rec_pkt(){
+        //瓒呰繃min/2,涓㈠純gop
+        while (list_rec_pkt_.size() > minduration) {
+            list_rec_pkt_.pop_front();
+            while(!list_rec_pkt_.empty()){
+                auto &cache = list_rec_pkt_.front();
+                AVPacket &avpkt = cache.data->getAVPacket();
+                if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
+                    list_rec_pkt_.pop_front();
+                }else{
+                    break;
+                }
             }
         }
     }
+
     //////////////recorder
-    std::shared_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id,std::string dir, const int mind, const int maxd){
+    std::unique_ptr<Recorder> Wrapper::init_recorder(FormatIn *in, std::string id, std::string dir, const int mind, const int maxd){
         if(!in){
             logIt("Init wrapper first");
             return nullptr;
         }
 
-        auto rec = std::make_shared<Recorder>(in, id);
+        std::unique_ptr<Recorder> rec(new Recorder(in, id.c_str()));
 
         rec->SetCallback([&](std::string &id, int &index, std::string &path){
             cache_rec_info(id, index, path);
@@ -221,22 +293,51 @@
     void Wrapper::BuildRecorder(const char* id, const char *output, const int mindur, const int maxdur){
         std::string rid(id);
         std::string dir(output);
-        auto fn = [=](FormatIn *in){
-            return init_recorder(in, rid, dir, mindur, maxdur);
-        };
-        std::shared_ptr<Recorder> rec(nullptr);
         
-        FnRec r = FnRec{fn, rec};
-        map_rec_[rid] = r;
+        std::lock_guard<std::mutex> l(mutex_rec_);
+
+        // auto fn = [=](FormatIn *in){
+        //     return init_recorder(in, rid, dir, mindur, maxdur);
+        // };
+        // FnRec r = FnRec{fn, nullptr};
+        if (map_rec_.find(rid) != map_rec_.end()){
+            map_rec_.erase(rid);
+        }
+        // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+        //     if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+        //         iter == map_rec_.erase(iter);
+        //     }else{
+        //         iter++;
+        //     }
+        // }
+        FnRec fr;
+        fr.rid = rid;
+        fr.dir = dir;
+        fr.min = mindur;
+        fr.max = maxdur;
+        map_rec_[rid] = std::move(fr);
+
+        minduration = mindur * 25;
+        maxduration = maxdur * 25;
     }
 
     int Wrapper::FireRecorder(const char* sid,const int64_t &id){
+        std::lock_guard<std::mutex> l(mutex_rec_);
+
         auto iter = map_rec_.find(sid);
         if (iter != map_rec_.end()){
             if(iter->second.rec){
                 iter->second.rec->FireRecorder(id);
             }
         }
+
+        // for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+        //     if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+        //         iter == map_rec_.erase(iter);
+        //     }else{
+        //         iter++;
+        //     }
+        // }
     }
 
     void Wrapper::cache_rec_info(std::string &id, int &index, std::string &path){
@@ -250,10 +351,11 @@
         struct record_file_info info;
         info.file_frame_index = index;
         info.file_path = path;
+        info.rec_id = id;
         list_rec_.emplace_back(info);
         list_rec_map_[path] = id;
-        logIt("list rec files count : %d", list_rec_.size());
-        map_rec_.erase(id);
+        logIt("LIST REC FILES COUNT : %d", list_rec_.size());
+        
     }
 
     void Wrapper::GetInfoRecorder(int &index, std::string &path){
@@ -267,16 +369,32 @@
         index = info.file_frame_index;
         path = info.file_path;
         list_rec_.pop_front();
+
+        if (map_rec_.find(info.rec_id) != map_rec_.end())
+            map_rec_.erase(info.rec_id);
+
+        for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
+            if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+                iter == map_rec_.erase(iter);
+            }else{
+                iter++;
+            }
+        }
+
         // logIt("go get info index: %d, file: %s\n", index, path.c_str());
     }
 
     std::string Wrapper::GetRecorderID(const std::string &path){
+        std::lock_guard<std::mutex> l(mutex_rec_);
+
         std::string ret("");
         auto iter = list_rec_map_.find(path);
         if (iter != list_rec_map_.end()){
             ret = iter->second;
             list_rec_map_.erase(iter);
         }
+        
+
         return ret;
     }
     ////////decoder
@@ -284,7 +402,7 @@
         use_decoder_ = true;
     }
 
-    void Wrapper::cache_pic(std::shared_ptr<ffwrapper::FrameData> &frame){
+    void Wrapper::cache_pic(std::shared_ptr<ffwrapper::FrameData> &frame, int64_t &id){
 
         pic_bgr24 pic;
         if(bridge_){
@@ -295,6 +413,7 @@
             unsigned char *data = (unsigned char*)malloc(pic.w * pic.h * 3);
             bridge_->copyPicture(data, frm);
             pic.data = data;
+            pic.id = id;
         }
         
         {
@@ -311,7 +430,7 @@
 
     }
 
-    void Wrapper::GetPicDecoder(unsigned char **data, int *w, int *h){
+    void Wrapper::GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id){
         std::lock_guard<std::mutex> l(mutex_pic_);
         if(list_pic_.empty()){
             *data = NULL;
@@ -321,6 +440,7 @@
         }
         auto p = list_pic_.front();
         *data = p.data; *w = p.w; *h = p.h;
+        *id = p.id;
         list_pic_.pop_front();
     }
 

--
Gitblit v1.8.0