From 0b1de1fddd889cf2ebbe578bfad83862f5ebdf5a Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期一, 09 一月 2017 12:11:05 +0800 Subject: [PATCH] add libevent based daemon --- RtspFace/make.sh | 8 RtspFace/main_face_daemon.cpp | 280 +++++++++++++++++++++++++++++++++++ RtspFace/PipeLinePool.h | 33 ++++ RtspFace/PL_RTSPClient.cpp | 4 RtspFace/PipeLinePool.cpp | 123 +++++++++++++++ 5 files changed, 443 insertions(+), 5 deletions(-) diff --git a/RtspFace/PL_RTSPClient.cpp b/RtspFace/PL_RTSPClient.cpp index d169da2..759fc5d 100644 --- a/RtspFace/PL_RTSPClient.cpp +++ b/RtspFace/PL_RTSPClient.cpp @@ -231,7 +231,7 @@ int ret = pthread_mutex_unlock(in->frame_mutex); if(ret != 0) { - printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret)); + LOG_ERROR << "pthread_mutex_unlock frame_mutex: ", strerror(ret); } } @@ -246,6 +246,6 @@ int ret = pthread_mutex_lock(in->continue_mutex); if(ret != 0) { - printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret)); + printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret));//#todo } } diff --git a/RtspFace/PipeLinePool.cpp b/RtspFace/PipeLinePool.cpp new file mode 100644 index 0000000..3b01327 --- /dev/null +++ b/RtspFace/PipeLinePool.cpp @@ -0,0 +1,123 @@ +#include "PipeLinePool.h" +#include "logger.h" +#include <pthread.h> + +#define PLP_MUTEX_LOCK(mut,_ret) if (mut != nullptr) {\ + int ret = pthread_mutex_lock((pthread_mutex_t*)mut); \ + if(ret != 0) \ + { \ + LOG_ERROR << "pthread_mutex_lock " << #mut << ": " << ret; \ + return _ret; \ + } \ +} + +#define PLP_MUTEX_UNLOCK(mut,_ret) if (mut != nullptr) {\ + int ret = pthread_mutex_unlock((pthread_mutex_t*)mut); \ + if(ret != 0) \ + { \ + LOG_ERROR << "pthread_mutex_unlock " << #mut << ": " << ret; \ + return _ret; \ + } \ +} + +PipeLinePool::PipeLinePool(bool _multithread_safe) : + multithread_safe(_multithread_safe), tsafe_mutex(nullptr), pl_mutex(nullptr), + pipelines(), pipelines_free() +{ + if (multithread_safe) + { + tsafe_mutex = new pthread_mutex_t; + pthread_mutex_init((pthread_mutex_t*)tsafe_mutex, NULL); + + pl_mutex = new pthread_mutex_t; + pthread_mutex_init((pthread_mutex_t*)pl_mutex, NULL); + + PLP_MUTEX_LOCK(pl_mutex,); + } +} + +PipeLinePool::~PipeLinePool() +{ + if (multithread_safe) + { + PLP_MUTEX_UNLOCK(pl_mutex,); + } + + pthread_mutex_destroy((pthread_mutex_t*)tsafe_mutex); + delete (pthread_mutex_t*)tsafe_mutex; + tsafe_mutex = nullptr; + + pthread_mutex_destroy((pthread_mutex_t*)pl_mutex); + delete (pthread_mutex_t*)pl_mutex; + pl_mutex = nullptr; + + pipelines_free.clear(); + + for (pl_set_t::iterator iter = pipelines.begin(); iter != pipelines.end(); ++iter) + delete *iter; + + pipelines.clear(); +} + +void PipeLinePool::manage(PipeLine* pl) +{ + if (pl == nullptr) + return; + + PLP_MUTEX_LOCK(tsafe_mutex,); + + if (pipelines.find(pl) != pipelines.end()) + return; + + pipelines.insert(pl); + pipelines_free.insert(pl); + + PLP_MUTEX_UNLOCK(tsafe_mutex,); +} + +void PipeLinePool::unmanage(PipeLine* pl) +{ + PLP_MUTEX_LOCK(tsafe_mutex,); + + pipelines.erase(pl); + pipelines_free.erase(pl); + + PLP_MUTEX_UNLOCK(tsafe_mutex,); +} + +PipeLine* PipeLinePool::get_free() +{ + if (pipelines_free.empty()) + { + PLP_MUTEX_LOCK(pl_mutex, nullptr); + } + + PLP_MUTEX_LOCK(tsafe_mutex, nullptr); + + if (pipelines_free.empty()) + return nullptr; + + pl_set_t::iterator iter = pipelines_free.begin(); + PipeLine* pl = *iter; + pipelines_free.erase(iter); + + PLP_MUTEX_UNLOCK(tsafe_mutex, nullptr); + + return pl; +} + +void PipeLinePool::release(PipeLine* pl) +{ + if (pipelines.find(pl) == pipelines.end()) + return; + if (pipelines_free.find(pl) != pipelines.end()) + return; + + PLP_MUTEX_LOCK(tsafe_mutex,); + + pipelines_free.insert(pl); + + PLP_MUTEX_UNLOCK(tsafe_mutex,); + + PLP_MUTEX_UNLOCK(pl_mutex,); +} diff --git a/RtspFace/PipeLinePool.h b/RtspFace/PipeLinePool.h new file mode 100644 index 0000000..141639c --- /dev/null +++ b/RtspFace/PipeLinePool.h @@ -0,0 +1,33 @@ +#ifndef _PIPELINEPOOL_H_ +#define _PIPELINEPOOL_H_ + +#include "PipeLine.h" +#include <set> + +class PipeLinePool +{ +public: + PipeLinePool(bool _multithread_safe = false); + ~PipeLinePool(); + + void manage(PipeLine* pl); + void unmanage(PipeLine* pl); + + PipeLine* get_free(); + void release(PipeLine* pl); + + bool wait_free(); + bool notify_free(); + +private: + bool multithread_safe; + + void* tsafe_mutex; + void* pl_mutex; + + typedef std::set<PipeLine*> pl_set_t; + pl_set_t pipelines; + pl_set_t pipelines_free; +}; + +#endif diff --git a/RtspFace/main_face_daemon.cpp b/RtspFace/main_face_daemon.cpp new file mode 100644 index 0000000..51fa8e2 --- /dev/null +++ b/RtspFace/main_face_daemon.cpp @@ -0,0 +1,280 @@ +#include "PipeLine.h" +#include "PL_RTSPClient.h" +#include "PL_RTSPServer.h" +#include "PL_H264Decoder.h" +#include "PL_H264Encoder.h" +#include "PL_AVFrameYUV420.h" +#include "PL_AVFrameBGRA.h" +#include "PL_Queue.h" +#include "PL_Scale.h" +#include "PL_Fork.h" + +#include "PL_SensetimeFaceTrack.h" + +#include "PL_DlibFaceTrack.h" + +#include "logger.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <sys/time.h> + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include <err.h> + +#include <event.h> + +#define SERVER_PORT 5432 +#define REUSEADDR_ON 1 + +// A struct for client specific data, also includes pointer to create a list of clients. +struct EVClient +{ + // The clients socket. + int fd; + + // The bufferedevent for this client. + struct bufferevent *buf_ev; + + EVClient() : fd(-1), buf_ev(nullptr) + { } +}; + +// Set a socket to non-blocking mode. +int setnonblock(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFL); + if (flags < 0) + return flags; + flags |= O_NONBLOCK; + if (fcntl(fd, F_SETFL, flags) < 0) + return -1; + + return 0; +} + +// Called by libevent when there is data to read. +void buffered_on_read(struct bufferevent *bufev, void *arg) +{ + struct EVClient *client = (struct EVClient *)arg; + + // Write back the read buffer. It is important to note that + // bufferevent_write_buffer will drain the incoming data so it + // is effectively gone after we call it. + //LOG_DEBUG << (char*)bufev->input; + //bufferevent_write_buffer(bufev, bufev->input); + + char buff[100] = {'\0'}; + size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); + LOG_DEBUG << "readSize=" << readSize << "\t" << buff; +} + +// Called by libevent when the write buffer reaches 0. +// We only provide this because libevent expects it, but we don't use it. +void buffered_on_write(struct bufferevent *bufev, void *arg) +{ +} + +// Called by libevent when there is an error on the underlying socket descriptor. +void buffered_on_error(struct bufferevent *bufev, short what, void *arg) +{ + struct EVClient *client = (struct EVClient *)arg; + + if (what & EVBUFFER_EOF) + { + //Client disconnected, remove the read event and the free the client structure. + LOG_INFO << "Client disconnected."; + } + else + { + LOG_WARN << "Client socket error, disconnecting."; + } + bufferevent_free(client->buf_ev); + close(client->fd); + delete client; +} + +// This function will be called by libevent when there is a connection ready to be accepted. +void on_accept(int fd, short ev, void *arg) +{ + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); + if (client_fd < 0) + { + LOG_WARN << "accept failed"; + return; + } + + // Set the client socket to non-blocking mode. + if (setnonblock(client_fd) < 0) + LOG_WARN << "failed to set client socket non-blocking"; + + // We've accepted a new client, create a client object. + struct EVClient* client = new EVClient; + if (client == NULL) + { + LOG_ERROR << "malloc failed"; + } + client->fd = client_fd; + + // Create the buffered event. + client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client); + + // We have to enable it before our callbacks will be called. + bufferevent_enable(client->buf_ev, EV_READ); + + LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr); +} + +int main(int argc, char **argv) +{ + initLogger(LV_DEBUG); + + // Initialize libevent. + event_init(); + + // Create our listening socket. + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + { + LOG_ERROR << "create socket failed"; + return EXIT_FAILURE; + } + + struct sockaddr_in listen_addr; + memset(&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = INADDR_ANY; + listen_addr.sin_port = htons(SERVER_PORT); + if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) + { + LOG_ERROR << "bind failed"; + return EXIT_FAILURE; + } + + if (listen(listen_fd, 5) < 0) + { + LOG_ERROR << "listen failed"; + return EXIT_FAILURE; + } + + // Set the socket to non-blocking, this is essential in event based programming with libevent. + int reuseaddr_on = REUSEADDR_ON; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); + if (setnonblock(listen_fd) < 0) + { + LOG_ERROR << "failed to set server socket to non-blocking"; + return EXIT_FAILURE; + } + + // We now have a listening socket, we create a read event to be notified when a client connects. + struct event ev_accept; + event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); + event_add(&ev_accept, NULL); + + // Start the event loop. + event_dispatch(); + + return EXIT_SUCCESS; +} + + + + + + +int _main(int argc, char** argv) +{ + initLogger(LV_DEBUG); + + PipeLine pipeLine; + + PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack); + + { + PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient"); + PL_RTSPClient_Config rtspConfig; + rtspConfig.progName = argv[0]; + rtspConfig.rtspURL = argv[1]; + rtspConfig.aux = true; // ffmpeg need aux, but live555 not + rtspConfig.verbosityLevel = 1; + rtspConfig.tunnelOverHTTPPortNum = 0; + rtspConfig.args = nullptr; + bool ret = rtspClient->init(&rtspConfig); + if (!ret) + { + LOG_ERROR << "rtspClient.init error"; + exit(EXIT_FAILURE); + } + } + + { + PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder"); + bool ret = h264Decoder->init(nullptr); + if (!ret) + { + LOG_ERROR << "PL_H264Decoder.init error"; + exit(EXIT_FAILURE); + } + } + + { + PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420"); + bool ret = avFrameYUV420->init(nullptr); + if (!ret) + { + LOG_ERROR << "PL_AVFrameYUV420.init error"; + exit(EXIT_FAILURE); + } + } + + { + PL_Scale_Config config; + config.toWidth = 800; + config.toHeight = 600; + PL_Scale* ple = (PL_Scale*)pipeLine.push_elem("PL_Scale"); + bool ret = ple->init(&config); + if (!ret) + { + LOG_ERROR << "PL_Scale.init error"; + exit(EXIT_FAILURE); + } + } + + PL_SensetimeFaceTrack* sensetimeFaceTrack; + { + SensetimeFaceTrackConfig config; + config.generate_face_feature = true; + sensetimeFaceTrack = (PL_SensetimeFaceTrack*)pipeLine.push_elem("PL_SensetimeFaceTrack"); + sensetimeFaceTrack->init(&config); + } + + while(true) + { + //LOG_ERROR << "begin pipe"; + + PipeMaterial pm; + if (pipeLine.pipe(&pm) == sensetimeFaceTrack); + sensetimeFaceTrack->gain(pm); + + if (pm.type == PipeMaterial::PMT_PM_LIST) + { + PipeMaterial& facePM = ((PipeMaterial*)(pm.buffer))[1]; + st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer); + LOG_NOTICE << "faceFeatures " << faceFeatures.size(); + } + + //LOG_ERROR << "end pipe"; + } +} diff --git a/RtspFace/make.sh b/RtspFace/make.sh index 05baf7f..0cd039b 100644 --- a/RtspFace/make.sh +++ b/RtspFace/make.sh @@ -39,7 +39,7 @@ # -O3 CPPFLAGS+="-g -mavx -c -std=c++11 -pthread $LIVEMEDIA_INC $FFMPEG_INC $LIBBASE64_INC $LIBYUV_INC $SENSETIMEFACESDK_INC $LIBLOG4CPP_INC $DLIB_INC" -LDFLAGS+="-pthread $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB $LIBYUV_LIB $LIBX264_LIB $SENSETIMEFACESDK_LIB $OPENCV_LIB $LIBLOG4CPP_LIB $DLIB_LIB" +LDFLAGS+="-pthread -levent $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB $LIBYUV_LIB $LIBX264_LIB $SENSETIMEFACESDK_LIB $OPENCV_LIB $LIBLOG4CPP_LIB $DLIB_LIB" CFLAGS+="-D__STDC_CONSTANT_MACROS" @@ -47,8 +47,10 @@ rm *.o #g++ main.cpp $CFLAGS $CPPFLAGS -o main.o -g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o +#g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o +g++ main_face_daemon.cpp $CFLAGS $CPPFLAGS -o main.o g++ PipeLine.cpp $CFLAGS $CPPFLAGS +g++ PipeLinePool.cpp $CFLAGS $CPPFLAGS g++ PL_RTSPClient.cpp $CFLAGS $CPPFLAGS g++ PL_RTSPServer.cpp $CFLAGS $CPPFLAGS @@ -69,7 +71,7 @@ g++ $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS g++ -g -std=c++11 \ - main.o PipeLine.o \ + main.o PipeLine.o PipeLinePool.o \ PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o PL_Scale.o PL_Fork.o \ PL_SensetimeFaceTrack.o \ PL_DlibFaceTrack.o \ -- Gitblit v1.8.0