#include #include #include #include #include #include #include "librtsp.h" #include #include using namespace std; template class MyQueue { public: MyQueue():mtx(PTHREAD_MUTEX_INITIALIZER){ pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&cond, &attr); pthread_condattr_destroy(&attr); } ~MyQueue() { pthread_cond_destroy(&cond); } public: void push(T v) { pthread_mutex_lock(&mtx); q.push_back(v); pthread_cond_signal(&cond); pthread_mutex_unlock(&mtx); } //向队列的前面插入元素 void push_front_one(T v) { pthread_mutex_lock(&mtx); q.push_front(v); pthread_cond_signal(&cond); pthread_mutex_unlock(&mtx); } T pop() { struct timespec to; clock_gettime(CLOCK_MONOTONIC, &to); static uint64_t waitMS = 620; // wait uint64_t sec = waitMS / 1000; uint64_t nsec = (waitMS % 1000) * 1e6; to.tv_sec = to.tv_sec + sec; nsec += to.tv_nsec; sec = nsec / 1000000000; nsec = nsec % 1000000000; to.tv_sec += sec; to.tv_nsec = nsec; // printf("======>>wait stream data\n"); pthread_mutex_lock(&mtx); while(q.empty()){ if(pthread_cond_timedwait(&cond, &mtx, &to) == ETIMEDOUT){ printf("======>>timeout quit\n"); break; } } // printf("======>>queue size %lu\n", q.size()); if (q.empty()) { pthread_mutex_unlock(&mtx); return 0; } T value = q.front(); q.pop_front(); pthread_mutex_unlock(&mtx); return value; } T popNotWait() { pthread_mutex_lock(&mtx); T value = q.front(); q.pop_front(); pthread_mutex_unlock(&mtx); return value; } int count_queue() { return q.size(); } T clearAll(){ pthread_mutex_lock(&mtx); while (!q.empty()) q.pop_front(); pthread_mutex_unlock(&mtx); } template void clearAll(F&& fn){ pthread_mutex_lock(&mtx); while (!q.empty()){ T value = q.front(); fn(value); q.pop_front(); } pthread_mutex_unlock(&mtx); } private: deque q; pthread_mutex_t mtx; pthread_cond_t cond; }; typedef struct _buffInfo { unsigned char *buff; int buffLen; } frameBuffInfo; typedef enum { E_VIDEO_STREAM_NONE = -1, E_VIDEO_STREAM_H264 = 0, E_VIDEO_STREAM_MPEG2 = 1, // MPEG4 E_VIDEO_STREAM_MPEG4 = 2, // MPEG4 E_VIDEO_STREAM_SVAC = 3, // SVAC E_VIDEO_STREAM_3GP = 4, // 3GP E_VIDEO_STREAM_H265 = 5, //H265 }VideoStreamType_E; class GB28181API{ public: static int capturePic(void *opaque, char *buf, int *bufsize, const int tt) { GB28181API *_this = (GB28181API *) opaque; int len = 0; *bufsize = 0; int ttt = 0; do { if (ttt > tt) return 0; ttt++; //从缓存中获取buffinfo if (_this->m_rtpQueue.count_queue() == 0) { // printf(" count_queue == 0 \n"); usleep(200000); continue; } frameBuffInfo *buffinfo = _this->m_rtpQueue.pop(); if (buffinfo == nullptr) { printf(" buffinfo == nullptr \n"); return 0; } //////////////////////////////////////////////////////// FILE* fpJpg = NULL; char fileJpgName[32] = "./tmpCaptureJpg.jpg"; char fileIFrameName[32] = "./tmpCaptureX264IFrame"; char cmd[512] = {0}; for(int i = 0; i < 10 * 25; i++){ if (!buffinfo){ buffinfo = _this->m_rtpQueue.pop(); } if (!buffinfo) continue; auto fpIframe = fopen(fileIFrameName, "wb+"); fwrite(buffinfo->buff, buffinfo->buffLen, 1, fpIframe); fflush(fpIframe); fclose(fpIframe); memset(cmd, 0, 512); sprintf(cmd, "ffmpeg -i %s -y -f image2 -ss 00:00:00 -vframes 1 %s >/dev/null", fileIFrameName, fileJpgName); int rr = system(cmd); delete[] buffinfo->buff; delete buffinfo; buffinfo = nullptr; fpJpg = fopen(fileJpgName, "rb"); if (fpJpg) { break; } } /////////////////////////////////////////////////////////// fseek(fpJpg, 0, SEEK_END); len = ftell(fpJpg); fseek(fpJpg, 0, SEEK_SET); *bufsize = fread(buf, sizeof(char), len, fpJpg); fclose(fpJpg); memset(cmd, 0, 128); sprintf(cmd, "rm %s %s >/dev/null", fileIFrameName, fileJpgName); system(cmd); } while (*bufsize == 0); return *bufsize; } GB28181API(/*string rtspUrl*/){ // handle = addCamera(rtspUrl); } ~GB28181API(){ printf("GB28181API end!\n"); // m_rtpQueue.clearAll(); m_rtpQueue.clearAll([](frameBuffInfo *info){ if (info){ delete[] info->buff; delete info; } }); deleteCamera(); } static const int keep_queue_count = 126; bool pushInfo(unsigned char *data, int datalen) { while(m_rtpQueue.count_queue() > keep_queue_count){ auto p = m_rtpQueue.popNotWait(); if (p){ delete[] p->buff; delete p; } } frameBuffInfo *info = new frameBuffInfo(); info->buff = new unsigned char[datalen]; info->buffLen = datalen; memcpy(info->buff, data, datalen); //printf(" m_rtpQueue.push befores "); m_rtpQueue.push(info); //printf(" m_rtpQueue.push after "); return true; } static int readData(void *opaque, unsigned char *buf, int bufsize) { GB28181API *_this = (GB28181API *)opaque; int len = bufsize; int diff = 0; do { // printf(" m_rtpQueue.pop before \n"); //从缓存中获取buffinfo frameBuffInfo *buffinfo = _this->m_rtpQueue.pop(); // printf(" m_rtpQueue.pop after \n"); if(buffinfo != nullptr){ diff = len - buffinfo->buffLen; }else{ return 0; } //帧长大于bufsize if (diff < 0) { // printf("/帧长大于bufsize:%d\n", diff); memcpy(buf + bufsize - len, buffinfo->buff, len); frameBuffInfo *info = new frameBuffInfo(); info->buffLen = buffinfo->buffLen - len; info->buff = new unsigned char[buffinfo->buffLen - len]{}; memcpy(info->buff, buffinfo->buff + len, buffinfo->buffLen - len); while(_this->m_rtpQueue.count_queue() > keep_queue_count){ auto p = _this->m_rtpQueue.popNotWait(); if (p){ delete[] p->buff; delete p; } } // printf("/帧长大于info->buffLen:%d\n", info->buffLen); _this->m_rtpQueue.push_front_one(info); // printf("/帧长大于info->buffLen\n"); } else if (diff == 0) { // printf("/帧长等于bufsize:%d\n", diff); memcpy(buf + bufsize - len, buffinfo->buff, buffinfo->buffLen); } else if (diff > 0) { // printf("/帧长小于bufsize:%d\n", diff); memcpy(buf + bufsize - len, buffinfo->buff, buffinfo->buffLen); len = len - buffinfo->buffLen; //还需要填充的大小 memset(buf + bufsize - len, 0, len); //不等待填充,直接进行解码 diff = 0; } delete[] buffinfo->buff; delete buffinfo; // printf("/帧长大于info->buffLen1\n"); } while (diff > 0); return bufsize; } static void streamCallBack(int datatype, int frametype, unsigned char *data, unsigned int datalen, long userdata) { GB28181API *_this = (GB28181API *)userdata; static bool startFlag = false; if(frametype == GB_VIDEO_FRAME_I){ startFlag = true; } // printf("streamCallBack recv data len %d frametype %d\n", datalen, startFlag); if (_this->datatype_ < 0) _this->datatype_ = datatype; if((data != NULL) && (startFlag == true)){ _this->pushInfo(data, datalen); } } long addCamera(string &rtsp){ int count = 0; while (handle < 0 && count <= 3) { count ++; handle = RTSPSTREAM_Open(rtsp.c_str(), streamCallBack, (long) this); printf("RTSPSTREAM_Open, handle:%ld \n", handle); usleep(20000); } return handle; } void deleteCamera(){ printf("RTSPSTREAM_Close\n"); if(handle > -1){ RTSPSTREAM_Close(handle); } handle = -1; } const int getDataType(){return datatype_;} private: int datatype_ = -1; MyQueue m_rtpQueue; long handle = -1; };