QiaoJiaSystem/DataManagerServer/vss/dao/BaseDao.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/FFmpegDecoderJPG.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/FFmpegDecoderJPG.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/GB28181Server.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/GB28181Server.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/SpinLock.hpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/VideoCaptureElementWithRtp.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/GB28181DecoderModel/VideoCaptureElementWithRtp.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/VideoToImageMulth/RtspAnalysManager.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/VideoToImageMulth/RtspAnalysManager.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
syncDBTool/ErlangDbTool.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
QiaoJiaSystem/DataManagerServer/vss/dao/BaseDao.h
@@ -16,9 +16,9 @@ public: static QMutex m_mutexVisit; private: static mysqlpp::Connection* conn; static mysqlpp::Connection *conn; public: static mysqlpp::SimpleResult add(std::map<std::string, std::string>& columns, string tableName) { static mysqlpp::SimpleResult add(std::map<std::string, std::string> &columns, string tableName) { initConnection(); mysqlpp::SimpleResult simpleResult; try { @@ -32,17 +32,20 @@ } return simpleResult; } static bool del(string tableName, std::map<std::string, std::string>& whereColumns) { static bool del(string tableName, std::map<std::string, std::string> &whereColumns) { initConnection(); string sql = "DELETE FROM " + tableName + " where 1=1 " + getWhereColumnNameValuePair(whereColumns); mysqlpp::Query query = conn->query(sql); bool ret = query.exec(); if (!ret) { cout << "error " <<query.error() << endl; cout << "error " << query.error() << endl; } return ret; } static bool update(std::map<std::string, std::string>& columns, string tableName, std::map<std::string, std::string>& whereColumns) { static bool update(std::map<std::string, std::string> &columns, string tableName, std::map<std::string, std::string> &whereColumns) { string updateSql = getUpdateSql(columns, tableName, whereColumns); initConnection(); mysqlpp::Query query = conn->query(updateSql); @@ -62,7 +65,7 @@ map<string, string> rowData; mysqlpp::Row row = *it; auto field_list = row.field_list(); const mysqlpp::FieldNames* fieldNames = field_list.list; const mysqlpp::FieldNames *fieldNames = field_list.list; for (int i = 0; i < fieldNames->size(); i++) { string columnValue; row[i].to_string(columnValue); @@ -77,15 +80,17 @@ return rowDataList; } static Json::Value findJsonArray(string sql, std::map<std::string, std::string>& whereColumns) { static Json::Value findJsonArray(string sql, std::map<std::string, std::string> &whereColumns) { sql = sql + getWhereColumnNameValuePair(whereColumns); initConnection(); // conn->query("SET character_set_server = utf8;"); mysqlpp::Query query = conn->query(sql); std::cout << sql << std::endl; Json::Value rowList; if (auto res = query.store()) { for (auto it = res.begin(); it != res.end(); ++it) { Json::Value row; const mysqlpp::FieldNames* fieldNames = it->field_list().list; const mysqlpp::FieldNames *fieldNames = it->field_list().list; for (int i = 0; i < fieldNames->size(); i++) { string columnValue; (*it)[i].to_string(columnValue); @@ -93,12 +98,15 @@ columnValue = ""; } string columnName = fieldNames[0].at(i); // if (columnName == "Nickname") { // columnValue = columnValue;//.substr(0, 20); // } row[columnName] = columnValue; } rowList.append(row); } } else { cout << "error " <<query.error() << endl; cout << "error " << query.error() << endl; cout << "query failed" << endl; } return rowList; @@ -109,12 +117,13 @@ mysqlpp::Query query = conn->query(sql); bool ret = query.exec(); if (!ret) { cout << "error " <<query.error() << endl; cout << "error " << query.error() << endl; } return ret; } static string getInsertSql(std::map<std::string, std::string>& columns, string tableName) { string insertSql = "INSERT INTO "+tableName+" (" static string getInsertSql(std::map<std::string, std::string> &columns, string tableName) { string insertSql = "INSERT INTO " + tableName + " (" + getColumnNames(columns) + " ) values ( " + getColumnValues(columns) + @@ -122,17 +131,20 @@ cout << "insertSql " << insertSql << endl; return insertSql; } static string getUpdateSql(std::map<std::string, std::string>& columns, string tableName, std::map<std::string, std::string>& whereColumns) { string updateSql = "update "+tableName+" set " static string getUpdateSql(std::map<std::string, std::string> &columns, string tableName, std::map<std::string, std::string> &whereColumns) { string updateSql = "update " + tableName + " set " + getColumnNameValuePair(columns) + " where 1=1 "+ getWhereColumnNameValuePair(whereColumns) ; " where 1=1 " + getWhereColumnNameValuePair(whereColumns); cout << "updateSql " << updateSql << endl; return updateSql; } static void doConnect() { cout << "db_host=" << appConfig.getStringProperty("db_host").c_str() << endl; conn->set_option(new mysqlpp::SetCharsetNameOption("utf8")); if (conn->connect( appConfig.getStringProperty("database").c_str(), appConfig.getStringProperty("db_host").c_str(), @@ -148,22 +160,26 @@ // 3306 // )) { cout << "connect success" << endl; mysqlpp::Query query = conn->query("SET NAMES UTF8-"); } else { cout << "connect failed" << endl; } } static void initConnection() { static bool inited = false; if (!inited) { inited = true; conn = new mysqlpp::Connection(false); doConnect(); } if (!conn->connected() || !conn->ping()) { doConnect(); } } static string getColumnNames(std::map<std::string, std::string>& columns) { static string getColumnNames(std::map<std::string, std::string> &columns) { string columnNames; auto size = columns.size(); int i = 0; @@ -176,12 +192,13 @@ } return columnNames; } static string getColumnValues(std::map<std::string, std::string>& columns) { static string getColumnValues(std::map<std::string, std::string> &columns) { string columnValues; auto size = columns.size(); int i = 0; for (auto column : columns) { columnValues.append("'"+column.second+"'"); columnValues.append("'" + column.second + "'"); if (i != columns.size() - 1) { columnValues.append(","); } @@ -189,14 +206,15 @@ } return columnValues; } static string getColumnNameValuePair(std::map<std::string, std::string>& columns) { static string getColumnNameValuePair(std::map<std::string, std::string> &columns) { string columnNameValuePair; auto size = columns.size(); int i = 0; for (auto column : columns) { columnNameValuePair.append(column.first); columnNameValuePair.append("="); columnNameValuePair.append("'"+column.second+"'"); columnNameValuePair.append("'" + column.second + "'"); if (i != size - 1) { columnNameValuePair.append(","); @@ -205,7 +223,8 @@ } return columnNameValuePair; } static string getWhereColumnNameValuePair(std::map<std::string, std::string>& columns) { static string getWhereColumnNameValuePair(std::map<std::string, std::string> &columns) { string columnNameValuePair; auto size = columns.size(); int i = 0; @@ -213,7 +232,7 @@ columnNameValuePair.append(" and "); columnNameValuePair.append(column.first); columnNameValuePair.append("="); columnNameValuePair.append("'"+column.second+"' "); columnNameValuePair.append("'" + column.second + "' "); i++; } QiaoJiaSystem/GB28181DecoderModel/FFmpegDecoderJPG.cpp
@@ -32,7 +32,8 @@ dst.data, dst.linesize); sws_freeContext(convert_ctx); DBG("m.size is " << m.size()); // DBG("m.size is " << m.size()); // LOG_IF(); return m; } @@ -44,7 +45,9 @@ } BASICGB28181::FFmpegDecoderJPG::~FFmpegDecoderJPG() { //清空队列 while (m_rtpQueue.count_queue()) { //#todo delete frameBuffInfo* m_rtpQueue.popNotWait(); } @@ -74,7 +77,7 @@ } #ifdef TestCode DBG(" m_rtpQueue.push before "); DBG(" m_rtpQueue.push befores "); #endif m_rtpQueue.push(info); #ifdef TestCode @@ -94,10 +97,11 @@ do { // DBG(" m_rtpQueue.pop before "); //从缓存中获取buffinfo frameBuffInfo *buffinfo = fFmpegDecoderJPG->m_rtpQueue.pop(); // DBG(" m_rtpQueue.pop after "); diff = len - buffinfo->buffLen; // printf("bufsize is :%ld,len is :%ld, datalen:%d \n", bufsize, len, buffinfo->buffLen); //帧长大于bufsize if (diff < 0) { // DBG("/帧长大于bufsize" << diff); @@ -128,7 +132,7 @@ delete[] buffinfo->buff; delete buffinfo; } while (diff > 0); //#todo 触发信号 // DBG("emitSigal(\"read_dataOk\") begin"); // gSignalLock.emitSigal("read_dataOk"); fFmpegDecoderJPG->m_readData = true; @@ -149,12 +153,14 @@ // avformat_network_init(); p_this->ic = avformat_alloc_context(); //创建ffmpeg使用的内存空间 p_this->iobuffer = (unsigned char *) av_malloc(p_this->m_buf_size); if (!p_this->iobuffer) { ERR("av_malloc: err======" << p_this->m_camIdx); p_this->m_running = false; continue; } // 使用回调函数和内存块创建 AVIOContext p_this->avio = avio_alloc_context(p_this->iobuffer, p_this->m_buf_size, 0, p_this, p_this->read_data, NULL, NULL); if (!p_this->avio) { @@ -191,6 +197,12 @@ vi = i; break; } } if(vi == -1)//无视频包 { ERR("no video packet!!"); p_this->m_running = false; continue; } p_this->stream = p_this->ic->streams[vi]; p_this->video_st = p_this->stream; @@ -265,7 +277,7 @@ // BASICGB28181::avframe_to_cvmat(frame).copyTo(p_this->m_image); p_this->m_image = std::move(BASICGB28181::avframe_to_cvmat(p_this->frame)); // 将i帧保存为快照 if (p_this->m_SnapshotNotSaveRet && (p_this->pkt.flags & AV_PKT_FLAG_KEY)) { try { std::string strNewName = "./"; @@ -302,14 +314,15 @@ #ifdef TestCode DBG("emitSigal(\"DecoderImageOK\") begin"); #endif //触发信号 //触发上层信号 gSignalLock.emitSigal(p_this->m_camIdx + "DecoderImageOK"); //#ifdef TestCode // DBG("emitSigal(\"DecoderImageOK\") after"); //#endif DBG("emitSigal(\"DecoderImageOK\") after"); DBG("p_this->m_camIdx is " << p_this->m_camIdx << " queue size is " << p_this->m_rtpQueue.count_queue()); // DBG("emitSigal(\"DecoderImageOK\") after"); // DBG("p_this->m_camIdx is " << p_this->m_camIdx << " queue size is " << p_this->m_rtpQueue.count_queue()); #ifdef TestCode { @@ -346,6 +359,7 @@ } m_camIdx = camIdx; DBG("BareFlowDecoderThd camIdx : " << camIdx); // 启动解码线程 std::thread t_BareFlowDecoder(BareFlowDecoderThd, this); t_BareFlowDecoder.detach(); ); @@ -438,7 +452,7 @@ delete iter->m_packet.data; iter = m_packetsVec.erase(iter); while (!(iter->m_packet.flags & AV_PKT_FLAG_KEY)) { INFO("DropFrame: " << iter->m_frameId); // INFO("DropFrame: " << iter->m_frameId); delete iter->m_packet.data; iter = m_packetsVec.erase(iter); } QiaoJiaSystem/GB28181DecoderModel/FFmpegDecoderJPG.h
@@ -37,6 +37,7 @@ } #include <mutex> #include <basic/pipe/TimerElement.h> namespace BASICGB28181 { @@ -48,6 +49,11 @@ static void initFFmpeg(); /*** * AVframe转cvmat * @param frame * @return */ static cv::Mat avframe_to_cvmat(AVFrame *frame); typedef struct _buffInfo { @@ -56,6 +62,7 @@ std::string camIdx; } frameBuffInfo; //************录像模块使用的结构体********************// struct FrameIdPackage_s_t { int64_t m_frameId; AVPacket m_packet; @@ -82,6 +89,7 @@ RECORDING_VIDEO, STOP_RECORD, }; //********************************// // std::map<std::string, MyQueue<frameBuffInfo *> > MapMyQueue; static std::mutex g_mutex; QiaoJiaSystem/GB28181DecoderModel/GB28181Server.cpp
@@ -52,6 +52,7 @@ bool iRet = C_InitSDK(&GBServerParam, &MysqlConnParam, NULL, enventcallback); DBG("iRet is " << iRet); // 等待设备注册 sleep(90); return iRet; QiaoJiaSystem/GB28181DecoderModel/GB28181Server.h
@@ -17,6 +17,7 @@ #include <stdlib.h> #include <time.h> #include <string.h> #include <VideoToImageMulth/RtspAnalysManager.h> #include "28181SDK.h" #include "SpinLock.hpp" @@ -80,10 +81,20 @@ //打印事件回调信息 static void enventcallback(int eventtype, int eventparam, int datalen, char *data) { printf("eventtype:%d, eventparam:%d, datalen:%d, data:%s\n", eventtype, eventparam, datalen, data); if (eventtype == 2) { if (eventtype == EVENT_DEVICE_CATALOG) { // GB28181Server::bGetLoaclRes = true; } else if (eventtype == 1 && eventparam == 1) { } else if (eventtype == EVENT_REGISTER_STATUS && eventparam == 1) { C_GetResource(NULL); } else if (eventtype == EVENT_VIDEO_EXCEPTION){ if(gRtspAnalysManagerCamera) { auto cameraHandlePtr = (RtspAnalysManager *) gRtspAnalysManagerCamera; ERR("gRtspAnalysManagerCamera ADDR:" << gRtspAnalysManagerCamera); string camID(data, datalen); ERR("EVENT_VIDEO_EXCEPTION reopen camID:" << camID); cameraHandlePtr->addCamera(camID, camID); }else{ ERR("gRtspAnalysManagerCamera is nullptr"); } } } QiaoJiaSystem/GB28181DecoderModel/SpinLock.hpp
@@ -6,7 +6,9 @@ #define GB28181SDK_SPINLOCK_H #include <atomic> /*** * 自旋锁,可能会有没锁上的bug */ class SpinLock { public: SpinLock() : m_lock(ATOMIC_FLAG_INIT) {} QiaoJiaSystem/GB28181DecoderModel/VideoCaptureElementWithRtp.cpp
@@ -35,17 +35,21 @@ bool BASICGB28181::VideoCaptureElementWithRtp::startRtpStream(int streamTransType) { //等待下层ffmpeg将rtp包解码成为图片后触发信号,然后触发当前类的submit std::thread waitSignalAndEmit([&](BASICGB28181::VideoCaptureElementWithRtp *p_this) { p_this->m_waitSignal = true; //循环,由成员变量来维护这个线程的运行状态 while (p_this->m_waitSignal) { //#TODO wait test #ifdef TestCode DBG("waitSignal(\"DecoderImageOK\") begin"); #endif //等待信号触发 gSignalLock.waitSignal(p_this->m_chanPubID + "DecoderImageOK"); #ifdef TestCode DBG("waitSignal(\"DecoderImageOK\") after"); #endif /****录像模块代码*****/ p_this->m_picCount++; //几张选一张放入Redis if (p_this->m_picCount % m_nPicsPickOne != 0) { @@ -54,6 +58,7 @@ p_this->m_picCount.store(0); } // 从ffmpeg解码类中获取图片 p_this->m_fFmpegDecoderJPG.getImage().copyTo(p_this->m_image); { cv::Mat copyMat; @@ -61,6 +66,8 @@ p_this->m_image.copyTo(copyMat); m_pManager->SaveImageToRedis(p_this->m_chanPubID, imageName, copyMat); } /*********/ p_this->submit(); } INFO("waitSignalAndEmit is exit..."); @@ -69,6 +76,7 @@ TryCath( //--------------国标设备或则国标下级平台必须支持GB28181-2016---------------------------------------------- //解码线程,发起点播请求,启动ffmpeg解码模块 std::thread videoCaptureElementThd([&](VideoCaptureElementWithRtp *p_this, int streamType) { DBG("videoCaptureElementThd start..."); StreamTransType_E etype; @@ -93,28 +101,52 @@ } DBG("C_RealVideoStart start... m_chanPubID is " << p_this->m_chanPubID << " etype is " << etype << " m_userdata is " << m_userdata); //开始点播视频 long lrealhandle = C_RealVideoStart(const_cast<char *>(p_this->m_chanPubID.c_str()), etype, p_this->streamcallback, m_userdata); if (lrealhandle != -1) { //点播成功 DBG(p_this->m_chanPubID << " C_RealVideoStart ok ... type is " << etype); p_this->m_running = true; //启动ffmpeg解码模块 p_this->m_fFmpegDecoderJPG.startThd(p_this->m_chanPubID, p_this->m_fps, p_this->m_gpuIdx); usleep(1000000); //阻塞线程,等待外部触发关闭点播 while (p_this->m_running) { // if(p_this->m_fFmpegDecoderJPG.getRunning()) { if(p_this->m_fFmpegDecoderJPG.getRunning()) { usleep(300000); // }else{ // p_this->m_running = false; // break; // } } else { // 根据reopenTime判断是否需要重启 if (reopenTime < 0) { p_this->m_running = false; stop(); INFO("grabFrame faild, element stopping"); } else { //todo 业务死锁 usleep((6 - reopenTime--) * 1000000); INFO("grabFrame faild, try reopen video: "); //关闭ffmpeg解码模块 p_this->m_fFmpegDecoderJPG.stopThd(); //启动ffmpeg解码模块 p_this->m_fFmpegDecoderJPG.startThd(p_this->m_chanPubID, p_this->m_fps, p_this->m_gpuIdx); continue; } } } DBG("videoCaptureElementThd stop ..."); //停止点播 C_RealVideoStop(lrealhandle); //将waitSignalAndEmit 线程退出 p_this->m_waitSignal = false; DBG("videoCaptureElementThd stop ok..."); } else { //点播失败 p_this->m_waitSignal = false; p_this->m_running = false; //关闭ffmpeg解码模块 p_this->m_fFmpegDecoderJPG.stopThd(); ERR(p_this->m_chanPubID << " C_RealVideoStart is error lrealhandle is " << lrealhandle); } @@ -157,18 +189,21 @@ // fwrite(data, sizeof(char), datalen, fp11); } #endif //将底层组好的rtp包,存入ffmpeg中的缓存队列 CHKDBG(p_this->m_fFmpegDecoderJPG.pushInfo(data, datalen, p_this->m_chanPubID), true, "pushInfo is error !! handle is " << handle << " datatype is " << datatype << " frametype is " << frametype); } void BASICGB28181::VideoCaptureElementWithRtp::threadFunc() { if ((!m_running) || (!m_waitSignal)) { // 根据reopenTime判断是否需要重启 if (reopenTime < 0) { stop(); INFO("grabFrame faild, element stopping"); return; } else { //todo 业务死锁 usleep(reopenTime * 1000); INFO("grabFrame faild, try reopen video: "); startRtpStream(m_streamTransType); QiaoJiaSystem/GB28181DecoderModel/VideoCaptureElementWithRtp.h
@@ -17,6 +17,14 @@ class VideoCaptureElementWithRtp : public basic::PipeElement { public: /*** * VideoCaptureElementWithRtp 视频解码 * @param chanPubID 通道地(摄像机id) * @param fps * @param streamTransType 请求的视频流类型 * @param gpuIdx gpuindex * @param manager 上层类的指针 */ explicit VideoCaptureElementWithRtp(std::string &chanPubID, int fps, int streamTransType, int gpuIdx = -1, RtspAnalysManager *manager = nullptr); @@ -28,6 +36,11 @@ */ bool getRunning(); /*** * 获取图片 * @return */ cv::Mat getImage(); //保存视频接口,从RtspAnalysManager发起调用 @@ -40,7 +53,7 @@ std::string MakeDir(const std::string &timeStamp); private: int reopenTime{1000}; int reopenTime{10}; int m_gpuIdx; int m_fps; int m_streamTransType; @@ -74,7 +87,7 @@ /*** * rtp组包回调函数 * GB28181 SDK rtp组包回调函数 * @param handle * @param datatype * @param frametype QiaoJiaSystem/VideoToImageMulth/RtspAnalysManager.cpp
@@ -7,10 +7,11 @@ using std::string; void *gRtspAnalysManagerCamera = nullptr; RtspAnalysManager::RtspAnalysManager(LDBTool *_dbTool) : m_lDBTool(nullptr), m_maxCount(50), m_currentCount(0) { DBG("MYH DEBUG HERE"); m_lDBTool = new LDBTool; // INFO("MYH DEBUG HERE"); m_lDBTool = _dbTool; init(); //nsq set callback func @@ -134,7 +135,10 @@ //初始化函数 void RtspAnalysManager::init() { INFO("MYH DEBUG HERE") // INFO("MYH DEBUG HERE") gRtspAnalysManagerCamera = this; DBG("gRtspAnalysManagerCamera ADDR:" << gRtspAnalysManagerCamera); m_GB28181_Enable = appPref.getIntData("GB28181_Enable"); //#todo GB28181 @@ -267,7 +271,7 @@ //#todo end if (m_controllers_videoCapElem.find(index) == m_controllers_videoCapElem.end()) { INFO("MYH DEBUG HERE"); // INFO("MYH DEBUG HERE"); if (m_currentCount >= m_maxCount) { ERR("addCamera faild, camera's num is full!") return -1; @@ -369,7 +373,7 @@ * @return */ int RtspAnalysManager::removeAll() { INFO("MYH DEBUG HERE"); // INFO("MYH DEBUG HERE"); if (m_GB28181_Enable) { for (auto controller: m_controllers_videoCapElem) { @@ -390,7 +394,7 @@ } m_controllers.clear(); } INFO("MYH DEBUG HERE"); // INFO("MYH DEBUG HERE"); m_imgRedisCRwLock.wrlock(); for (auto controller: m_imgRedisControllers) { @@ -405,7 +409,7 @@ m_currentCount = 0; INFO("MYH DEBUG HERE"); // INFO("MYH DEBUG HERE"); return 0; } QiaoJiaSystem/VideoToImageMulth/RtspAnalysManager.h
@@ -21,7 +21,8 @@ //#include <VideoToImageMulth/rpc/RtspAnalysServer.h> //using BASICGB28181::VideoCaptureElementWithRtp; //外部使用的全局指针 extern void *gRtspAnalysManagerCamera; //用来实现recordVideo的RPC的接口类 class RtspAnalysManager : public ::RtspAnalys::RtspAnalysServer { syncDBTool/ErlangDbTool.cpp
@@ -13,6 +13,7 @@ #include <QtCore/QVariantList> #include <basic/util/app/AppPreference.hpp> using ErlangTool::map_DevDataCache; using ErlangTool::map_TabDataCache; using ErlangTool::map_BwDataCache;