From b73029149580370e62dd6c14a270aea902f85cf2 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 18 九月 2019 09:52:30 +0800
Subject: [PATCH] fix rec bug

---
 csrc/buz/recorder.hpp            |   17 
 csrc/buz/recorder.cpp            |  235 +++++++++---------
 csrc/ffmpeg/format/FormatIn.cpp  |   22 
 /dev/null                        |   36 --
 csrc/wrapper.cpp                 |   74 +++--
 csrc/worker/rec.cpp              |  173 +++++++-----
 csrc/worker/stream.cpp           |   51 +++
 csrc/worker/decoder.cpp          |   10 
 csrc/wrapper.hpp                 |   87 +++---
 csrc/ffmpeg/format/FormatOut.cpp |   22 
 csrc/worker/rec.hpp              |   35 +-
 csrc/worker/stream.hpp           |    4 
 csrc/worker/decoder.hpp          |    0 
 csrc/cffmpeg.cpp                 |    6 
 14 files changed, 403 insertions(+), 369 deletions(-)

diff --git a/csrc/buz/recorder.cpp b/csrc/buz/recorder.cpp
index a7086fe..af5a1fc 100644
--- a/csrc/buz/recorder.cpp
+++ b/csrc/buz/recorder.cpp
@@ -25,11 +25,11 @@
         ,maxduration(30 * 25)
         ,minduration(10 * 25)
         ,end_frame(minduration)
-        ,cur_frame(-1)
+        ,cur_frame(0)
         ,stop_recorder_(false)
         ,id_(id)
-        ,id_frame_(0)
-        ,file_frame_index_(-1)
+        ,id_frame_(-1)
+        ,id_frame_in_file_(-1)
         ,file_path_("")
         ,func_rec_info_(nullptr)
         ,thrd_(nullptr)
@@ -43,13 +43,13 @@
             try
             {
                 if (thrd_){
-                    {
-                        std::unique_lock<std::mutex> locker(mutex_pkt_);
+                    if (!stop_recorder_.load()){
                         stop_recorder_.store(true);
                         cv_.notify_one();
                     }
+
                     thrd_->join();
-                    logIt("REC THREAD JOINED, QUIT!!!");
+                    // logIt("REC THREAD JOINED, QUIT!!!, %s", id_.c_str());
                 }
             }
             catch(const std::exception& e)
@@ -71,121 +71,95 @@
 
             out_ = new FormatOut(in_->getStream(), "mp4");
 
-            return 0;
-        }
-
-        void Recorder::start_writer(){
-            if (cur_frame == 0) {
-                
-                sole::uuid u4 = sole::uuid4();
-                file_path_ = dir_ + "/" + u4.base62() + ".mp4";
-                out_->JustWriter(in_->getStream(), file_path_.c_str());
-                logIt("START RECORD %s", file_path_.c_str());
+            file_path_ = dir_ + "/" + sole::uuid4().base62() + ".mp4";
+            auto ret = out_->JustWriter(in_->getStream(), file_path_.c_str());
+            if (ret){
+                return 0;
             }
+
+            return -1;
         }
 
-        int Recorder::write_correctly(const avpacket &pkt){
+        int Recorder::write_correctly(const CPacket &pkt){
             //reader failed, break stream
             if(pkt.id == -1 && !pkt.data){
-                end_writer();
+                return -1;
+            }
+
+            if (cur_frame == end_frame){
                 return 1;
             }
-            // writer error, reinit writer
+
             int64_t cur = cur_frame++;
             AVPacket &op = pkt.data->getAVPacket();
             AVPacket np(op);
             av_copy_packet(&np, &op);
-            if(!out_->writeFrame(np, cur)){
-                av_packet_unref(&np);
-                end_writer();
-                return -1;
-            }
+            auto ret = out_->writeFrame(np, cur);
             av_packet_unref(&np);
+            if (!ret) return -1;
+            
             if(pkt.id == id_frame_){
-                file_frame_index_ = cur_frame-1;
+                id_frame_in_file_ = cur_frame-1;
             }
+            
             // logIt("WRITE FRAME ID: %d, RECORD ID: %d", pkt.id, id_frame_);
             return 0;
         }
 
         void Recorder::end_writer(){
-            if(cur_frame == -1) return;
+
             out_->endWriter();
-            logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
-                 file_frame_index_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
-
-            //reinit cur_frame clear list pkt
-            {
-                std::lock_guard<std::mutex> locker(mutex_pkt_);
-                cur_frame = -1;
-                end_frame = minduration;
-                list_pkt_.clear();
-            }
-
-            //callback to frame index and path
-            if(func_rec_info_){
-                func_rec_info_(id_, file_frame_index_, file_path_);
-            }
-        }
-
-        void Recorder::run_thread(){
-            bool reinit_writer = false;
-            while(!stop_recorder_.load()){
-                if (reinit_writer) {
-                    while(!stop_recorder_.load()){
-                        if(init_writer() == 0)
-                            break;
-                        usleep(300000);
-                    }
-                    if(stop_recorder_.load()) break;
-                }
-
-                std::list<avpacket> pkts;
-                {
-                    std::unique_lock<std::mutex> locker(mutex_pkt_);
-                    auto status = cv_.wait_for(locker, std::chrono::seconds(10), [&]{
-                        return !list_pkt_.empty() || stop_recorder_.load();
-                    });
-
-                    if (!status){
-                        end_writer();
-                        error_occured_ = true;
-                        break;
-                    }
-                    if(stop_recorder_.load()){
-                        end_writer();
-                        break;
-                    }
-                    if(cur_frame == -1){
-                        continue;
-                    }
-                    list_pkt_.swap(pkts);
-                }
-
-                if (cur_frame == 0) {
-                    start_writer();
-                }
-
-                for(auto &i : pkts){
-                    if (cur_frame < end_frame){
-                        if(write_correctly(i) != 0){
-                            stop_recorder_.store(true);
-                            break;
-                        }
-                    }else{
-                        end_writer();
-                        stop_recorder_.store(true);
-                        break;
-                    }
-                }
-                
-            }
-
             if (out_){
                 delete out_;
                 out_ = NULL;
             }
-            // stop_recorder_.store(false);
+            {
+                std::lock_guard<std::mutex> l(mutex_pkt_);
+                list_pkt_.clear();
+            }
+            // logIt("INDEX %d, REAL-FRAME-ID %d, FILE %s, CURFrame %d, ENDFrame %d\n",
+            //      id_frame_in_file_, id_frame_, file_path_.c_str(), cur_frame, end_frame);
+
+            //callback to frame index and path
+            if(func_rec_info_){
+                func_rec_info_(id_,id_frame_in_file_, file_path_);
+            }
+
+        }
+
+        void Recorder::run_thread(){
+
+            while(!stop_recorder_.load()){
+
+                std::list<CPacket> pkts;
+                {
+                    std::unique_lock<std::mutex> locker(mutex_pkt_);
+                    auto status = cv_.wait_for(locker, std::chrono::seconds(3), [&]{
+                        return !list_pkt_.empty() || stop_recorder_.load();
+                    });
+
+                    if (!status || stop_recorder_.load()){
+                        error_occured_ = !status;
+                        break;
+                    }
+                    list_pkt_.swap(pkts);
+                }
+                
+                int ret = 0;
+                for(auto &i : pkts){
+                    ret = write_correctly(i);
+                    if (ret != 0){
+                        break;
+                    }
+                }
+
+                if (ret != 0){
+                    break;
+                }
+            }
+
+            stop_recorder_.store(true);
+            end_writer();
         }
 
         int Recorder::Run(const char* output, const int mind, const int maxd){
@@ -204,7 +178,7 @@
                 end_frame = minduration;
             }
 
-            logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);    
+            // logIt("minduration %d maxduration %d curduration %d", minduration, maxduration, end_frame);    
 
             thrd_.reset(new std::thread([&]{
                 run_thread();
@@ -215,38 +189,37 @@
         }
 
         int Recorder::FireRecorder(const int64_t &id){
-            if(cur_frame == -1){
+            if (stop_recorder_.load()) return -1;
+
+            if(id_frame_ == -1){
                 id_frame_ = id;
-                logIt("FIRST FIRE RECORD ID: %lld", id);
-                {
-                    std::lock_guard<std::mutex> locker(mutex_pkt_);
-                    cur_frame = 0;
-                    if (list_pkt_.size() > end_frame){
-                        end_frame = list_pkt_.size() + minduration/2;
-                        if (end_frame > maxduration)
-                            end_frame = maxduration;
-                    }
+
+                std::lock_guard<std::mutex> locker(mutex_pkt_);
+                if (list_pkt_.size() > end_frame){
+                    end_frame = list_pkt_.size() + minduration/2;
+                    if (end_frame > maxduration)
+                        end_frame = maxduration;
                 }
-            }else if(end_frame - cur_frame > minduration/2 && end_frame < maxduration){
+
+                // logIt("FIRST FIRE RECORD ID: %lld, cur_frame: %d, end_frame: %d", id, cur_frame, end_frame);
+
+            }else if(cur_frame > minduration/2 && end_frame < maxduration){
                 end_frame = end_frame + minduration / 2;
                 if(end_frame > maxduration){
                     end_frame = maxduration;
                 }
+                // logIt("PROLONG REC, cur_frame: %d, end_frame: %d", cur_frame, end_frame);
             }
             // logIt("FIRE REC FRAME ID: %lld", id);
             return 0;
         }
 
-        int Recorder::CachePacket(const avpacket &pkt){
+        int Recorder::PushPacket(const CPacket &pkt){
+            if (stop_recorder_.load()) return 0;
 
             std::lock_guard<std::mutex> locker(mutex_pkt_);
 
-            if(cur_frame == -1){
-                //error occur, stream break
-                if(pkt.id == -1 && pkt.data == nullptr){
-                    list_pkt_.clear();
-                    return -1;
-                }
+            if(id_frame_ == -1){
                 //wait I 
                 if (list_pkt_.empty()) {
                     AVPacket &avpkt = pkt.data->getAVPacket();
@@ -258,17 +231,43 @@
                 maybe_dump_gop();
 
                 list_pkt_.push_back(pkt);
+                // cv_.notify_one();
+
             }else{
                 list_pkt_.push_back(pkt);
                 cv_.notify_one();
             }
 
-            return 0;
+            return list_pkt_.size();
+        }
+
+        int Recorder::PushPackets(std::list<CPacket> &lst){
+            
+            if (stop_recorder_.load()) return 0;
+
+            std::lock_guard<std::mutex> locker(mutex_pkt_);
+            bool i = false;
+            for (auto &p : lst){
+                if (!i){
+                    AVPacket &avpkt = p.data->getAVPacket();
+                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
+                        continue;
+                    }
+                    i = true;
+                }
+                
+                list_pkt_.push_back(p);
+            }
+            maybe_dump_gop();
+            cv_.notify_one();
+
+            // logIt("CACHE PACKET : %d", list_pkt_.size());
+            return list_pkt_.size();
         }
 
         void Recorder::maybe_dump_gop(){
             //瓒呰繃min/2,涓㈠純gop
-            while (list_pkt_.size() > maxduration) {
+            while (list_pkt_.size() > minduration) {
                 list_pkt_.pop_front();
                 while(!list_pkt_.empty()){
                     auto &cache = list_pkt_.front();
diff --git a/csrc/buz/recorder.hpp b/csrc/buz/recorder.hpp
index ba9cea4..8c3b550 100644
--- a/csrc/buz/recorder.hpp
+++ b/csrc/buz/recorder.hpp
@@ -22,10 +22,12 @@
 
 namespace cffmpeg_wrap{
     namespace buz{
-        struct avpacket{
+        // 缂撳瓨鐨勮棰戝抚,绛夊緟fire瑙﹀彂寮�濮嬪綍鍍�
+        typedef struct _cache_pkt{
             std::shared_ptr<ffwrapper::CodedData> data;
             int64_t id;
-        };
+        }CPacket;
+
 
         class Recorder{
             public:
@@ -34,7 +36,8 @@
 
             public: 
                 int Run(const char* output, const int mind, const int maxd);
-                int CachePacket(const avpacket &pkt);
+                int PushPacket(const CPacket &pkt);
+                int PushPackets(std::list<CPacket> &lst);
                 int FireRecorder(const int64_t &id);
 
                 void SetCallback(FUNC_REC_INFO cb){
@@ -42,12 +45,12 @@
                 }
 
                 const bool ErrorOcurred(){return error_occured_;}
+                const std::string& RecID()const{return id_;}
             private:
                 void run_thread();
 
                 int init_writer();
-                void start_writer();
-                int write_correctly(const avpacket &pkt);
+                int write_correctly(const CPacket &pkt);
                 void end_writer();
 
                 void maybe_dump_gop();
@@ -60,7 +63,7 @@
                 int     end_frame;
                 int     cur_frame;
 
-                std::list<avpacket>     list_pkt_;
+                std::list<CPacket>     list_pkt_;
 
                 std::atomic_bool        stop_recorder_;
                 std::mutex              mutex_pkt_;
@@ -72,7 +75,7 @@
                 std::string             id_;
 
                 int64_t                 id_frame_;
-                int                     file_frame_index_;
+                int                     id_frame_in_file_;
                 std::string             file_path_;
                 FUNC_REC_INFO           func_rec_info_;
 
diff --git a/csrc/cffmpeg.cpp b/csrc/cffmpeg.cpp
index bb5a79f..3a97159 100644
--- a/csrc/cffmpeg.cpp
+++ b/csrc/cffmpeg.cpp
@@ -38,12 +38,12 @@
 
 void c_ffmpeg_run_gb28181(const cffmpeg h){
     Wrapper *s = (Wrapper*)h;
-    s->UseGB28181();
+    s->GB28181();
 }
 
 void c_ffmepg_use_cpu(const cffmpeg h){
     Wrapper *s = (Wrapper*)h;
-    s->UseCPU();
+    s->CPUDec();
 }
 
 
@@ -64,7 +64,7 @@
     std::string p(""), id("");
     s->GetInfoRecorder(id, i, p);
 
-    // printf("cffmpeg get info : index : %d, file : %s\n", i, p.c_str());
+    // printf("cffmpeg get info : index : %d, file : %s, recid: %s\n", i, p.c_str(), id.c_str());
 
     *index = i;
     
diff --git a/csrc/ffmpeg/format/FormatIn.cpp b/csrc/ffmpeg/format/FormatIn.cpp
index 4df2c0f..3e9dd51 100644
--- a/csrc/ffmpeg/format/FormatIn.cpp
+++ b/csrc/ffmpeg/format/FormatIn.cpp
@@ -106,11 +106,10 @@
         }
 
         ret = avformat_open_input(&ctx_, "", NULL, options);
-        if(ret < 0){
-            logIt("open %s failed:%s",filename,
-                  getAVErrorDesc(ret).c_str());
-
-        }
+        // if(ret < 0){
+            // logIt("open %s failed:%s",filename,
+            //       getAVErrorDesc(ret).c_str());
+        // }
 
 		return ret;
 	}
@@ -119,11 +118,10 @@
 	int FormatIn::open(const char *filename, AVDictionary **options){
 
 		const int ret = avformat_open_input(&ctx_, filename, NULL, options);
-		if(ret < 0){
-			logIt("open %s failed:%s",filename,
-					getAVErrorDesc(ret).c_str()); 
-
-		}
+		// if(ret < 0){
+		// 	logIt("open %s failed:%s",filename,
+		// 			getAVErrorDesc(ret).c_str()); 
+		// }
 
 		return ret;
 	}
@@ -266,8 +264,8 @@
 		while (!founded){
 			const int ret = av_read_frame(ctx_, &pkt_out);
 			if(ret < 0){
-				logIt("read frame from %s failed:%s",
-						ctx_->filename,getAVErrorDesc(ret).c_str()); 
+				// logIt("read frame from %s failed:%s",
+				// 		ctx_->filename,getAVErrorDesc(ret).c_str()); 
 	
 				return false;
 			}
diff --git a/csrc/ffmpeg/format/FormatOut.cpp b/csrc/ffmpeg/format/FormatOut.cpp
index 804c8ed..a6e8c3c 100644
--- a/csrc/ffmpeg/format/FormatOut.cpp
+++ b/csrc/ffmpeg/format/FormatOut.cpp
@@ -404,34 +404,32 @@
         pkt.dts = pkt.pts;
         pkt.duration = av_rescale_q(calc_duration, time_base_q, time_base); //(double)(calc_duration)*(double)(av_q2d(time_base_q)) / (double)(av_q2d(time_base));
         
-        // if (pkt.duration < 0 || time_base.den != 90000){
-            // logIt("CALCULATE DURATION : %lld, fame count : %lld, TIMEBASE: %d", calc_duration,time_stamp, time_base.den);
-        // }
-        
+        // logIt("FRAME ID: %lld, PTS : %lld, DTS : %lld", frame_cnt, pkt.pts, pkt.dts);        
     }
 
     bool FormatOut::writeFrame(AVPacket &pkt, const int64_t &frame_cnt,
                               bool interleaved/* = true*/){
 
         adjustPTS(pkt, frame_cnt);
-        return writeFrame2(pkt, interleaved);
+        auto ret = writeFrame2(pkt, interleaved);
+        if (!ret){
+            logIt("write to file failed, pkt.pts: %lld, dts: %lld, frame count: %d",
+                    pkt.pts, pkt.dts, frame_cnt);
+        }
+        return ret;
     }
 
     bool FormatOut::writeFrame2(AVPacket &pkt, bool interleaved){
         
         int ret = 0;
-        if(interleaved)
+        if(interleaved){
             ret = av_interleaved_write_frame(ctx_, &pkt);
-        else
-        {
+        }else{
             // returns 1 if flushed and there is no more data to flush
             ret = av_write_frame(ctx_, &pkt);
         }
     
-        if(ret < 0)
-        {
-            logIt("write packet to file failed:%s",
-                    getAVErrorDesc(ret).c_str()); 
+        if(ret < 0){
             return false;
         }
 
diff --git a/csrc/stream.cpp b/csrc/stream.cpp
deleted file mode 100644
index 42b8d73..0000000
--- a/csrc/stream.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "stream.hpp"
-
-#include "ffmpeg/data/CodedData.hpp"
-
-namespace cffmpeg_wrap{
-    stream::stream(){
-
-    }
-    stream::~stream(){
-
-    }
-
-    int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){
-        if (data){
-            std::lock_guard<std::mutex> locker(mutex_avpkt_);
-            list_avpkt_.push_back(data);
-            return list_avpkt_.size();
-        }
-        return 0;
-    }
-
-    void stream::GetPacket(unsigned char **pktData, int *size, int *key){
-        std::lock_guard<std::mutex> l(mutex_avpkt_);
-        if(list_avpkt_.empty()){
-            return;
-        }
-        auto data = list_avpkt_.front();
-        auto pkt = data->getAVPacket();
-        *key = pkt.flags & AV_PKT_FLAG_KEY;
-        *size = pkt.size;
-        *pktData = (unsigned char *)malloc(*size);
-        memcpy(*pktData, pkt.data, pkt.size);
-
-        list_avpkt_.pop_front();
-    }
-}
\ No newline at end of file
diff --git a/csrc/decoder.cpp b/csrc/worker/decoder.cpp
similarity index 93%
rename from csrc/decoder.cpp
rename to csrc/worker/decoder.cpp
index 381642d..0a9fb46 100644
--- a/csrc/decoder.cpp
+++ b/csrc/worker/decoder.cpp
@@ -1,10 +1,10 @@
 #include "decoder.hpp"
 
-#include "ffmpeg/bridge/cvbridge.hpp"
-#include "ffmpeg/format/FormatIn.hpp"
-#include "ffmpeg/data/CodedData.hpp"
-#include "ffmpeg/data/FrameData.hpp"
-#include "ffmpeg/log/log.hpp"
+#include "../ffmpeg/bridge/cvbridge.hpp"
+#include "../ffmpeg/format/FormatIn.hpp"
+#include "../ffmpeg/data/CodedData.hpp"
+#include "../ffmpeg/data/FrameData.hpp"
+#include "../ffmpeg/log/log.hpp"
 
 extern "C"{
 #include <libavformat/avformat.h>
diff --git a/csrc/decoder.hpp b/csrc/worker/decoder.hpp
similarity index 100%
rename from csrc/decoder.hpp
rename to csrc/worker/decoder.hpp
diff --git a/csrc/rec.cpp b/csrc/worker/rec.cpp
similarity index 66%
rename from csrc/rec.cpp
rename to csrc/worker/rec.cpp
index 7327977..4307d6e 100644
--- a/csrc/rec.cpp
+++ b/csrc/worker/rec.cpp
@@ -1,12 +1,12 @@
 #include "rec.hpp"
 
 #include <unistd.h>
+#include <sys/time.h>
 
-#include "ffmpeg/format/FormatIn.hpp"
-#include "ffmpeg/data/CodedData.hpp"
-#include "buz/recorder.hpp"
-#include "ffmpeg/log/log.hpp"
-#include "common/callback.hpp"
+#include "../ffmpeg/format/FormatIn.hpp"
+#include "../ffmpeg/data/CodedData.hpp"
+#include "../ffmpeg/log/log.hpp"
+#include "../common/callback.hpp"
 
 using namespace logif;
 using namespace ffwrapper;
@@ -14,48 +14,15 @@
 
 namespace cffmpeg_wrap
 {
-    rec::rec(ffwrapper::FormatIn *in)
-    :recRef_(in)
+    rec::rec()
+    :recRef_(NULL)
     ,minduration_(250)
     ,maxduration_(750)
     {}
 
     rec::~rec()
     {
-        {
-            std::lock_guard<std::mutex> l(mtx_rec_);
-            map_rec_.clear();
-        }
-        
-        {
-            std::lock_guard<std::mutex> l(mtx_pkt_);
-            list_pkt_.clear();
-        }
-
-    }
-
-    std::unique_ptr<Recorder> rec::newRec(std::string id, std::string dir, const int mind, const int maxd){
-        if(!recRef_){
-            logIt("Init wrapper first");
-            return nullptr;
-        }
-
-        std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
-
-        rec->SetCallback([&](std::string &id, int &index, std::string &path){
-            setRecInfo(id, index, path);
-        });
-
-        int trycnt = 0;
-        while(trycnt < 100){
-            const int ret = rec->Run(dir.c_str(), mind, maxd);
-            if(ret == 0) break;
-            usleep(200000);
-        }
-        if (trycnt < 100){
-            return rec;
-        }
-        return nullptr;
+        clear();
     }
 
     void rec::setRecInfo(std::string &id, int &index, std::string &path){
@@ -71,7 +38,33 @@
         info.fPath = path;
         info.recID = id;
         list_recInfo_.emplace_back(info);
-        logIt("LIST REC FILES COUNT : %d", list_recInfo_.size());
+    }
+
+    std::unique_ptr<buz::Recorder> rec::startRec(std::string id, std::string dir, const int mind, const int maxd){
+        if(!recRef_){
+            logIt("Init wrapper first");
+            return nullptr;
+        }
+
+        std::unique_ptr<Recorder> rec(new Recorder(recRef_, id.c_str()));
+
+        rec->SetCallback([&](std::string &id, int &index, std::string &path){
+            setRecInfo(id, index, path);
+        });
+
+        int trycnt = 0;
+        while(trycnt < 100){
+            auto ret = rec->Run(dir.c_str(), mind, maxd);
+            if(ret == 0) break;
+            usleep(200000);
+        }
+        if (trycnt < 100){
+            std::lock_guard<std::mutex> locker(mtx_pkt_);
+            rec->PushPackets(list_pkt_);
+            return rec;
+        }
+
+        return nullptr;
     }
 
     void rec::GetRecInfo(std::string &recID, int &index, std::string &path){
@@ -79,27 +72,35 @@
         // 鑾峰彇淇℃伅
         {
             std::lock_guard<std::mutex> l(mtx_recInfo_);
-            if(list_recInfo_.empty()){
-                index = -1;
-                path = "";
-                return;
+            if(!list_recInfo_.empty()){
+                auto info = list_recInfo_.front();
+                recID = info.recID;
+                index = info.frmIdx;
+                path = info.fPath;
+                list_recInfo_.pop_front();
             }
-            auto info = list_recInfo_.front();
-            recID = info.recID;
-            index = info.frmIdx;
-            path = info.fPath;
-            list_recInfo_.pop_front();
+           
         }
 
         // 鍒犻櫎rec瀹炰緥
         {
             std::lock_guard<std::mutex> l(mtx_rec_);
-            if (map_rec_.find(recID) != map_rec_.end())
+            if (map_rec_.empty()){
+                return;
+            }
+
+            if (map_rec_.find(recID) != map_rec_.end()){
                 map_rec_.erase(recID);
+                return;
+            }
 
             for (auto iter = map_rec_.begin(); iter != map_rec_.end();){
-                if (iter->second.rec && iter->second.rec->ErrorOcurred()){
+                if (iter->second && iter->second->ErrorOcurred()){
+                    recID = iter->first;
+                    index = -1;
+                    path = "";
                     iter == map_rec_.erase(iter);
+                    break;
                 }else{
                     iter++;
                 }
@@ -107,54 +108,73 @@
         }
     }
 
+    void rec::clear(){
+        {
+            std::lock_guard<std::mutex> l(mtx_rec_);
+            map_rec_.clear();
+        }
+        
+        {
+            std::lock_guard<std::mutex> l(mtx_pkt_);
+            list_pkt_.clear();
+        }
+    }
+
+    void rec::Load(ffwrapper::FormatIn *in){
+        recRef_ = in;
+    }
+
+    void rec::Unload(){
+        recRef_ = NULL;
+        clear();
+    }
+
+    const bool rec::Loaded() const{
+        return recRef_ != NULL;
+    }
+    
     void rec::NewRec(const char* id, const char *output, const int mindur, const int maxdur){
         std::string rid(id);
         std::string dir(output);
         
+        minduration_ = mindur * 25;
+        maxduration_ = maxdur * 25;
+
         {
             std::lock_guard<std::mutex> l(mtx_rec_);
             if (map_rec_.find(rid) != map_rec_.end()){
                 map_rec_.erase(rid);
             }
-            map_rec_[rid] = {rid, dir, mindur, maxdur, nullptr};
+            map_rec_[rid] = startRec(rid, dir, mindur, maxdur);
         }
         
-        minduration_ = mindur * 25;
-        maxduration_ = maxdur * 25;
     }
 
-    void rec::FireRecSignal(const char* sid,const int64_t &id){
+    void rec::FireRecSignal(const char* sid, const int64_t &id){
+        
+        std::lock_guard<std::mutex> l(mtx_rec_);
+        
         auto iter = map_rec_.find(sid);
         if (iter != map_rec_.end()){
-            if(iter->second.rec){
-                iter->second.rec->FireRecorder(id);
+            if(iter->second){
+                iter->second->FireRecorder(id);
             }
         }
+        
+        // logIt("recorders count: %d", map_rec_.size());
     }
 
     void rec::SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
         if (!data) return;
 
-        cachePacket(data, id);
-
         std::lock_guard<std::mutex> l(mtx_rec_);
         for(auto &i : map_rec_){
-            if (!i.second.rec){
-                i.second.rec = newRec(i.second.rid, i.second.dir, i.second.min, i.second.max);
-                if (i.second.rec){
-                    //姝ゅ嚱鏁拌繕鏈��鍑�,涓嶉渶瑕佽繖涓攣
-                    // std::lock_guard<std::mutex> locker(mtx_pkt_);
-                    for(auto &k : list_pkt_){
-                        i.second.rec->CachePacket({k.data, k.id});
-                    }
-                    // 鏂扮殑鏁版嵁缂撳瓨
-                    i.second.rec->CachePacket({data, id});
-                    logIt("START REC %d FRAMES", list_pkt_.size());
-                }
-            }else if (i.second.rec){
-                i.second.rec->CachePacket({data, id});
+            if (i.second){
+                i.second->PushPacket({data, id});
             }
         }
+
+        cachePacket(data, id);
     }
 
     void rec::cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
@@ -173,7 +193,8 @@
 
     int rec::shrinkCache(){
         //瓒呰繃鏈�澶х紦瀛�,涓㈠純gop
-        while (list_pkt_.size() > minduration_) {
+        //缂撳瓨鏈�灏忛暱搴︾殑,鐢ㄤ簬璁板綍
+        while (list_pkt_.size() > minduration_/2) {
             list_pkt_.pop_front();
             while(!list_pkt_.empty()){
                 auto &cache = list_pkt_.front();
diff --git a/csrc/rec.hpp b/csrc/worker/rec.hpp
similarity index 73%
rename from csrc/rec.hpp
rename to csrc/worker/rec.hpp
index a356ba0..c489676 100644
--- a/csrc/rec.hpp
+++ b/csrc/worker/rec.hpp
@@ -7,6 +7,8 @@
 #include <list>
 #include <mutex>
 
+#include "../buz/recorder.hpp"
+
 namespace ffwrapper
 {
     class FormatIn;
@@ -16,12 +18,6 @@
 
 namespace cffmpeg_wrap
 {
-    namespace buz{
-        class Recorder;
-        struct avpacket;
-    }
-
-
     class rec
     {
     private:
@@ -30,14 +26,7 @@
         int     minduration_;
 
         // 褰曞儚鐨勫疄渚�,瀵瑰簲浠诲姟
-        typedef struct _fn_rec{
-            std::string rid;        //id瀵瑰簲浠诲姟id
-            std::string dir;
-            int min;
-            int max;
-            std::unique_ptr<buz::Recorder> rec;    
-        }FnRec;
-        std::unordered_map<std::string, FnRec> map_rec_;
+        std::unordered_map<std::string, std::unique_ptr<buz::Recorder> > map_rec_;
         // 澶氱嚎绋嬫坊鍔犱换鍔″疄渚�,鍦ㄨ娴佺嚎绋嬩娇鐢ㄥ綍鍍�,浣嗘槸娣诲姞鍦ㄥ彟涓�涓嚎绋�
         std::mutex mtx_rec_;
 
@@ -52,26 +41,28 @@
         std::mutex mtx_recInfo_;
 
         // 缂撳瓨鐨勮棰戝抚,绛夊緟firerecsignal瑙﹀彂寮�濮嬪綍鍍�
-        typedef struct _cache_pkt{
-            std::shared_ptr<ffwrapper::CodedData> data;
-            int64_t id;
-        }CPacket;
-        std::list<CPacket> list_pkt_;
+        std::list<buz::CPacket> list_pkt_;
         // 澶氱嚎绋�,鐢熶骇鑰呯嚎绋媟eader push pkt,娑堣垂鑰�,褰曞儚绾跨▼pop
         std::mutex mtx_pkt_;
 
     private: 
-        // 鍒涘缓褰曞儚瀹炰緥
-        std::unique_ptr<buz::Recorder> newRec(std::string id, std::string dir, const int mind, const int maxd);
         // 褰曞儚瀹炰緥鐨勫洖璋冨嚱鏁�,褰曞儚瀹屾垚鍚庤缃綍鍍忔枃浠惰矾寰�,id鍜屽抚id
         void setRecInfo(std::string &id, int &index, std::string &path);
         // 缂撳瓨瑙嗛鍖�
         void cachePacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
         // 涓㈠純缂撳瓨
         int shrinkCache();
+        // 鍒涘缓褰曞儚瀹炰緥寮�濮嬪綍鍍�
+        std::unique_ptr<buz::Recorder> startRec(std::string id, std::string dir, const int mind, const int maxd);
+        // 娓呴櫎缂撳瓨,鏂嚎閲嶈繛鏃堕渶瑕�
+        void clear();
     public:
         void NewRec(const char* id, const char *output, const int mindur, const int maxdur);
 
+        // 鍑嗗濂藉綍鍍�
+        void Load(ffwrapper::FormatIn *in);
+        void Unload();
+        const bool Loaded() const;
         // 缂撳瓨褰曞儚鐨勮棰戝寘,绛夊緟瑙﹀彂褰曞儚,鎴栫洿鎺ユ斁鍒板綍鍍忕紦瀛�
         void SetPacket(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
         // 瑙﹀彂褰曞儚
@@ -80,7 +71,7 @@
         void GetRecInfo(std::string &recID, int &index, std::string &path);
         
     public:
-        explicit rec(ffwrapper::FormatIn *in);
+        rec();
         ~rec();
     };
 } // namespace cffmpeg_wrap
diff --git a/csrc/worker/stream.cpp b/csrc/worker/stream.cpp
new file mode 100644
index 0000000..0f8dadc
--- /dev/null
+++ b/csrc/worker/stream.cpp
@@ -0,0 +1,51 @@
+#include "stream.hpp"
+
+#include "../ffmpeg/data/CodedData.hpp"
+
+namespace cffmpeg_wrap{
+    stream::stream(const int maxSize)
+    :max_size_(maxSize)
+    {}
+
+    stream::~stream(){
+        std::lock_guard<std::mutex> locker(mutex_avpkt_);
+        list_avpkt_.clear();
+    }
+
+    int stream::SetPacket(std::shared_ptr<ffwrapper::CodedData> data){
+        if (data){
+            std::lock_guard<std::mutex> locker(mutex_avpkt_);
+            list_avpkt_.push_back(data);
+            
+            while(list_avpkt_.size() > max_size_){
+                list_avpkt_.pop_front();
+                while(!list_avpkt_.empty()){
+                    auto &cache = list_avpkt_.front();
+                    AVPacket &avpkt = cache->getAVPacket();
+                    if (!(avpkt.flags & AV_PKT_FLAG_KEY)){
+                        list_avpkt_.pop_front();
+                    }else{
+                        break;
+                    }
+                }
+            }
+            return list_avpkt_.size();
+        }
+        return 0;
+    }
+
+    void stream::GetPacket(unsigned char **pktData, int *size, int *key){
+        std::lock_guard<std::mutex> l(mutex_avpkt_);
+        if(list_avpkt_.empty()){
+            return;
+        }
+        auto data = list_avpkt_.front();
+        auto pkt = data->getAVPacket();
+        *key = pkt.flags & AV_PKT_FLAG_KEY;
+        *size = pkt.size;
+        *pktData = (unsigned char *)malloc(*size);
+        memcpy(*pktData, pkt.data, pkt.size);
+
+        list_avpkt_.pop_front();
+    }
+}
\ No newline at end of file
diff --git a/csrc/stream.hpp b/csrc/worker/stream.hpp
similarity index 87%
rename from csrc/stream.hpp
rename to csrc/worker/stream.hpp
index 5cbc44a..6c87ff6 100644
--- a/csrc/stream.hpp
+++ b/csrc/worker/stream.hpp
@@ -15,9 +15,9 @@
     private:
         std::list<std::shared_ptr<ffwrapper::CodedData> > list_avpkt_;
         std::mutex mutex_avpkt_;
-
+        const int max_size_;
     public:
-        stream(/* args */);
+        explicit stream(const int maxSize);
         ~stream();
 
         int SetPacket(std::shared_ptr<ffwrapper::CodedData> data);
diff --git a/csrc/wrapper.cpp b/csrc/wrapper.cpp
index 9fc2e25..63bb661 100644
--- a/csrc/wrapper.cpp
+++ b/csrc/wrapper.cpp
@@ -21,12 +21,20 @@
 
 #include "buz/recorder.hpp"
 
-#include "stream.hpp"
-#include "decoder.hpp"
-#include "rec.hpp"
+#include "worker/stream.hpp"
+#include "worker/decoder.hpp"
+#include "worker/rec.hpp"
 
 using namespace logif;
 using namespace ffwrapper;
+
+#define DELETE_POINTER(p) \
+do \
+{ \
+if(NULL != p) \
+delete p; \
+p = NULL; \
+}while(0)
 
 namespace cffmpeg_wrap{
     using namespace buz;
@@ -38,12 +46,11 @@
     ,scale_f_(SWS_POINT)
     ,gb_(0)
     ,cpu_(0)
-    ,use_decoder_(false)
     ,thread_(nullptr)
     ,stop_stream_(false)
     ,stream_(nullptr)
     ,decoder_(nullptr)
-    ,rec_(nullptr)
+    ,rec_(new rec)
     {
         makeTheWorld();
     }
@@ -57,6 +64,7 @@
                 stop_stream_.store(true);
                 thread_->join();
             }
+            DELETE_POINTER(rec_);
         }
         catch(const std::exception& e)
         {
@@ -70,11 +78,11 @@
         scale_h_ = h;
     }
 
-    void Wrapper::UseGB28181(){
+    void Wrapper::GB28181(){
         gb_ = 1;
     }
 
-    void Wrapper::UseCPU(){
+    void Wrapper::CPUDec(){
         cpu_ = 1;
     }
 
@@ -123,19 +131,24 @@
         return 0;
     }
 
-    void Wrapper::init_stream(){
-        if (stream_) delete stream_;
-        stream_ = new stream;
+    void Wrapper::init_worker(ffwrapper::FormatIn *in){
+        if (rec_->Loaded() && stream_ && decoder_) return;
+        stream_ = new stream(3 * 25);
+        decoder_ = new decoder(in, scale_w_, scale_h_, scale_f_);
+        rec_->Load(in);
+        if(fn_rec_lazy_) fn_rec_lazy_(in);
+    }
+    
+    void Wrapper::run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id){
+        if (stream_) stream_->SetPacket(data);
+        if (decoder_) decoder_->SetFrame(data, id);
+        if (rec_->Loaded()) rec_->SetPacket(data, id);
     }
 
-    void Wrapper::init_decoder(ffwrapper::FormatIn *in){
-        if (decoder_) delete decoder_;
-        decoder_ = new decoder(in, scale_w_, scale_h_,scale_f_);
-    }
-
-    void Wrapper::init_rec(ffwrapper::FormatIn *in){
-        if (rec_) delete rec_;
-        rec_ = new rec(in);
+    void Wrapper::deinit_worker(){
+        DELETE_POINTER(stream_);
+        DELETE_POINTER(decoder_);
+        rec_->Unload();
     }
 
     void Wrapper::run_stream_thread(){
@@ -148,11 +161,8 @@
                 usleep(200000);
                 continue;
             }
-            init_stream();
-            if (use_decoder_){
-                init_decoder(in.get());
-            }
-            init_rec(in.get());
+            
+            init_worker(in.get());
 
             int64_t id = 0;
             while(!stop_stream_.load()){
@@ -161,24 +171,28 @@
                     logIt("read packet error");
                     break;
     	        }
-                if (stream_) stream_->SetPacket(data);
-                if (use_decoder_ && decoder_) decoder_->SetFrame(data, id);
-                if (rec_) rec_->SetPacket(data, id);
-
+                
+                run_worker(data, id);
                 id++;
             }
+
+            deinit_worker();
         }
     }
 
     void Wrapper::BuildRecorder(const char* id, const char *output, const int mindur, const int maxdur){
         
-        if (rec_){
+        if (rec_->Loaded()){
             rec_->NewRec(id, output, mindur, maxdur);
+        }else{
+            std::string rid(id), dir(output);
+            fn_rec_lazy_ = 
+            [=](ffwrapper::FormatIn *in){rec_->NewRec(rid.c_str(), dir.c_str(), mindur, maxdur);};
         }
     }
 
     int Wrapper::FireRecorder(const char* sid,const int64_t &id){
-        if (rec_){
+        if (rec_->Loaded()){
             rec_->FireRecSignal(sid, id);
         }
     }
@@ -190,7 +204,7 @@
     }
     ////////decoder
     void Wrapper::BuildDecoder(){
-        use_decoder_ = true;
+        // use_decoder_ = true;
     }
 
     void Wrapper::GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id){
diff --git a/csrc/wrapper.hpp b/csrc/wrapper.hpp
index d398ed4..044b4ba 100644
--- a/csrc/wrapper.hpp
+++ b/csrc/wrapper.hpp
@@ -12,7 +12,6 @@
 #include <thread>
 #include <atomic>
 #include <mutex>
-#include <unordered_map>
 #include <memory>
 #include "common/callback.hpp"
 
@@ -31,53 +30,49 @@
     class rec;
 
     class Wrapper{
-        public:
-            Wrapper();
-            ~Wrapper ();
+    public:
+        Wrapper();
+        ~Wrapper ();
+    private: 
+        std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
 
-        private: 
-            std::unique_ptr<ffwrapper::FormatIn> init_reader(const char* input);
-            void init_stream();
-            void init_decoder(ffwrapper::FormatIn *in);
-            void init_rec(ffwrapper::FormatIn *in);
-            
-        public: 
-            int RunStream(const char* input);
-        private: 
-            void run_stream_thread();
+        void init_worker(ffwrapper::FormatIn *in);
+        void run_worker(std::shared_ptr<ffwrapper::CodedData> data, int64_t &id);
+        void deinit_worker();
+    public: 
+        int RunStream(const char* input);
+    private: 
+        void run_stream_thread();
+    public: //recorder
+        void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd);
+        int FireRecorder(const char* sid,const int64_t &id);
+        void GetInfoRecorder(std::string &recID, int &index, std::string &path);
+        void ScalePicture(const int w, const int h, const int flags);
+        void GB28181();
+        void CPUDec();
+    public: //decoder
+        void BuildDecoder();
+        void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id);
+    public: // push stream
+        void GetPacket(unsigned char **pktData, int *size, int *key);
+    private:
+        // stream 鍙傛暟
+        std::string input_url_;
+        int scale_w_, scale_h_, scale_f_;
 
-        public: //recorder
-            void BuildRecorder(const char* id,const char *dir, const int mind, const int maxd);
-            int FireRecorder(const char* sid,const int64_t &id);
-            void GetInfoRecorder(std::string &recID, int &index, std::string &path);
-
-            void ScalePicture(const int w, const int h, const int flags);
-            void UseGB28181();
-            void UseCPU();
-        public: //decoder
-            void BuildDecoder();
-            void GetPicDecoder(unsigned char **data, int *w, int *h, int64_t *id);
-            void GetPacket(unsigned char **pktData, int *size, int *key);
-
-        private:
-            // stream 鍙傛暟
-            std::string input_url_;
-            int scale_w_, scale_h_, scale_f_;
-            int gb_, cpu_;
-            bool use_decoder_;
-
-            // decoder 鍙傛暟
-            std::unique_ptr<std::thread> thread_;
-            std::atomic_bool    stop_stream_;
-
-            // 涓氬姟绫�
-            // 鎺ㄦ祦绫�
-            stream* stream_;
-            // 瑙g爜绫�
-            decoder* decoder_;
-            // 褰曞儚绫�
-            rec* rec_;
-
+        int gb_, cpu_;
+        // decoder 鍙傛暟
+        std::unique_ptr<std::thread> thread_;
+        std::atomic_bool    stop_stream_;
+        // 涓氬姟绫�
+        // 鎺ㄦ祦绫�
+        stream* stream_;
+        // 瑙g爜绫�
+        decoder* decoder_;
+        // 褰曞儚绫�,涓�鐩村瓨鍦�
+        rec* rec_;
+        // 褰曞儚璇锋眰缂撳瓨,绛夊緟runstream鍚庢坊鍔�
+        std::function<void(ffwrapper::FormatIn*)> fn_rec_lazy_;
     };
 
     uint8_t *DecodeJPEG(const char *file, int *w, int *h);

--
Gitblit v1.8.0