#include "PL_RTSPClient.h" #include "MaterialBuffer.h" #include "logger.h" #include struct RtspClientParam { std::string sdp; std::string fmtp; uint16_t width; uint16_t height; uint32_t fps; std::string codecName; uint32_t bandwidth; }; void rtsp_client_set_param_callback(void* arg, RtspClientParam& param); void rtsp_client_frame_callback(void* arg, uint8_t* buffer, size_t buffSize, timeval presentationTime); void rtsp_client_continue_callback(void* arg); // depends: struct PL_RTSPClient_Config; #include "live555/testProgs/testRTSPClient.hpp" struct RTSPClient_Internal { PL_RTSPClient_Config rtspConfig; pthread_t live_daemon_thid; char eventLoopWatchVariable; bool live_daemon_running; pthread_mutex_t* frame_mutex; pthread_mutex_t* continue_mutex; MB_Frame lastFrame; RtspClientParam lastParam; volatile bool killed; RTSPClient_Internal() : rtspConfig(), live_daemon_thid(0), eventLoopWatchVariable(0), live_daemon_running(false), frame_mutex(new pthread_mutex_t), continue_mutex(new pthread_mutex_t), lastFrame(), lastParam(), killed(false) { pthread_mutex_init(frame_mutex, NULL); pthread_mutex_init(continue_mutex, NULL); } ~RTSPClient_Internal() { if (frame_mutex != nullptr) { pthread_mutex_destroy(frame_mutex); delete frame_mutex; frame_mutex = nullptr; } if (continue_mutex != nullptr) { pthread_mutex_destroy(continue_mutex); delete continue_mutex; continue_mutex = nullptr; } } void reset() { PL_RTSPClient_Config _rtspConfig; rtspConfig = _rtspConfig; live_daemon_thid = 0; eventLoopWatchVariable = 0; live_daemon_running = false; if (frame_mutex != nullptr) { pthread_mutex_destroy(frame_mutex); delete frame_mutex; frame_mutex = nullptr; } frame_mutex = new pthread_mutex_t; pthread_mutex_init(frame_mutex, NULL); if (continue_mutex != nullptr) { pthread_mutex_destroy(continue_mutex); delete continue_mutex; continue_mutex = nullptr; } continue_mutex = new pthread_mutex_t; pthread_mutex_init(continue_mutex, NULL); MB_Frame _lastFrame; lastFrame = _lastFrame; RtspClientParam _lastParam; lastParam = _lastParam; } }; static void* live_daemon_thd(void* arg) { RTSPClient_Internal* in = (RTSPClient_Internal*)arg; TaskScheduler* scheduler = BasicTaskScheduler::createNew(); UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler); usage(*env, in->rtspConfig.progName.c_str()); openURL(*env, in->rtspConfig); in->live_daemon_running = true; env->taskScheduler().doEventLoop(&(in->eventLoopWatchVariable)); in->live_daemon_running = false; } PipeLineElem* create_PL_RTSPClient() { return new PL_RTSPClient; } PL_RTSPClient::PL_RTSPClient() : internal(new RTSPClient_Internal) { } PL_RTSPClient::~PL_RTSPClient() { delete (RTSPClient_Internal*)internal; internal= nullptr; } bool PL_RTSPClient::init(void* args) { if (args == nullptr) return false; const PL_RTSPClient_Config* config = reinterpret_cast(args); RTSPClient_Internal* in = (RTSPClient_Internal*)internal; in->reset(); in->rtspConfig = *config; in->rtspConfig.args = this; int ret = pthread_mutex_lock(in->frame_mutex); if(ret != 0) { LOGP(ERROR, "pthread_mutex_lock frame_mutex: %s/n", strerror(ret)); return false; } ret = pthread_mutex_lock(in->continue_mutex); if(ret != 0) { LOGP(ERROR, "pthread_mutex_lock continue_mutex: %s/n", strerror(ret)); return false; } ret = pthread_create(&(in->live_daemon_thid), NULL, live_daemon_thd, in); if(ret != 0) { LOGP(ERROR, "pthread_create: %s/n", strerror(ret)); return false; } return true; } void PL_RTSPClient::finit() { RTSPClient_Internal* in = (RTSPClient_Internal*)internal; in->eventLoopWatchVariable = 1; pthread_mutex_unlock(in->continue_mutex); pthread_mutex_unlock(in->frame_mutex); pthread_join(in->live_daemon_thid, NULL); in->reset(); } bool PL_RTSPClient::pay(const PipeMaterial& pm) { RTSPClient_Internal* in = (RTSPClient_Internal*)internal; return in->live_daemon_running; } bool PL_RTSPClient::gain(PipeMaterial& pm) { RTSPClient_Internal* in = (RTSPClient_Internal*)internal; int ret = pthread_mutex_unlock(in->continue_mutex); if(ret != 0) { LOGP(ERROR, "pthread_mutex_unlock continue_mutex: %s/n", strerror(ret)); return false; } if (in->killed) { LOGP(WARN, "killed 1"); return false; } ret = pthread_mutex_lock(in->frame_mutex); if(ret != 0) { LOGP(ERROR, "pthread_mutex_lock: %s/n", strerror(ret)); return false; } if (in->killed) { LOGP(WARN, "killed 2"); return false; } pm.type = PipeMaterial::PMT_FRAME; pm.buffer = &(in->lastFrame); pm.buffSize = 0; pm.former = this; // if(nullptr!=pm.buffer) // { // MB_Frame* frame = (MB_Frame*)pm.buffer; // if (frame->type == MB_Frame::MBFT_H264_NALU) // { // static FILE *pFile = fopen("/data/bb1.264", "wb"); // fwrite(frame->buffer, sizeof(char), frame->buffSize, pFile); // fflush(pFile); // } // } return true; } void PL_RTSPClient::kill() { RTSPClient_Internal* in = (RTSPClient_Internal*)internal; in->killed = true; pthread_mutex_unlock(in->frame_mutex); } void rtsp_client_set_param_callback(void* arg, RtspClientParam& param) { if (arg == nullptr) return; PL_RTSPClient* client = (PL_RTSPClient*)arg; RTSPClient_Internal* in = (RTSPClient_Internal*)(client->internal); in->lastParam = param; if (client->manager == nullptr) return; char tmp[50]; client->manager->set_param(PLGP_RTSP_SDP, param.sdp); client->manager->set_param(PLGP_RTSP_FMTP, param.fmtp); sprintf(tmp, "%u", param.width); client->manager->set_param(PLGP_RTSP_WIDTH, std::string(tmp)); sprintf(tmp, "%u", param.height); client->manager->set_param(PLGP_RTSP_HEIGHT, std::string(tmp)); sprintf(tmp, "%u", param.fps); client->manager->set_param(PLGP_RTSP_FPS, std::string(tmp)); size_t spl = 0; if (param.fmtp.find_first_of(',') != std::string::npos) { // split fmpt to base64 of sps,pps // set to PLGP_DEC_SPS_B64 PLGP_DEC_PPS_B64 spl = param.fmtp.find_first_of(','); std::string _base64_sps = param.fmtp.substr(0, spl); std::string _base64_pps = param.fmtp.substr(spl + 1, param.fmtp.length()); client->manager->set_param(PLGP_DEC_SPS_B64,_base64_sps); client->manager->set_param(PLGP_DEC_PPS_B64,_base64_pps); } /* std::string fmtp(client->manager->get_param(PLGP_RTSP_FMTP)); if (fmtp.empty()) return ; uint32_t numSPropRecords = 0; SPropRecord *p_record = parseSPropParameterSets(fmtp.c_str(), numSPropRecords); if (numSPropRecords < 2) { LOG_WARN << "numSPropRecords < 2" << std::endl; return ; } SPropRecord &sps = p_record[0]; SPropRecord &pps = p_record[1]; LOG_INFO << "sps.sPropLength" << sps.sPropLength << LOG_ENDL; for (int i = 0; i < sps.sPropLength; i++) LOGP(INFO, "0x%02X ", (int)sps.sPropBytes[i]); LOG_INFO << "pps.sPropLength" << pps.sPropLength << LOG_ENDL; for (int i = 0; i < pps.sPropLength; i++) LOGP(INFO, "0x%02X ", (int)pps.sPropBytes[i]); */ } void rtsp_client_frame_callback(void* arg, uint8_t* buffer, size_t buffSize, timeval presentationTime) { if (arg == nullptr || buffer == nullptr || buffSize == 0) return; PL_RTSPClient* client = (PL_RTSPClient*)arg; RTSPClient_Internal* in = (RTSPClient_Internal*)(client->internal); in->lastFrame.type = MB_Frame::MBFT_H264_NALU; in->lastFrame.buffer = buffer; in->lastFrame.buffSize = buffSize; in->lastFrame.width = in->lastParam.width;//#todo bug zero0 in->lastFrame.height = in->lastParam.height; in->lastFrame.pts = presentationTime; int ret = pthread_mutex_unlock(in->frame_mutex); if(ret != 0) { LOG_ERROR << "pthread_mutex_unlock frame_mutex: " << strerror(ret) << std::endl; } } void rtsp_client_continue_callback(void* arg) { if (arg == nullptr) return; PL_RTSPClient* client = (PL_RTSPClient*)arg; RTSPClient_Internal* in = (RTSPClient_Internal*)(client->internal); int ret = pthread_mutex_lock(in->continue_mutex); if(ret != 0) { LOG_ERROR << "pthread_mutex_lock continue_mutex: " << strerror(ret) << std::endl; } }