From 0b1de1fddd889cf2ebbe578bfad83862f5ebdf5a Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期一, 09 一月 2017 12:11:05 +0800
Subject: [PATCH] add libevent based daemon
---
RtspFace/make.sh | 8
RtspFace/main_face_daemon.cpp | 280 +++++++++++++++++++++++++++++++++++
RtspFace/PipeLinePool.h | 33 ++++
RtspFace/PL_RTSPClient.cpp | 4
RtspFace/PipeLinePool.cpp | 123 +++++++++++++++
5 files changed, 443 insertions(+), 5 deletions(-)
diff --git a/RtspFace/PL_RTSPClient.cpp b/RtspFace/PL_RTSPClient.cpp
index d169da2..759fc5d 100644
--- a/RtspFace/PL_RTSPClient.cpp
+++ b/RtspFace/PL_RTSPClient.cpp
@@ -231,7 +231,7 @@
int ret = pthread_mutex_unlock(in->frame_mutex);
if(ret != 0)
{
- printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
+ LOG_ERROR << "pthread_mutex_unlock frame_mutex: ", strerror(ret);
}
}
@@ -246,6 +246,6 @@
int ret = pthread_mutex_lock(in->continue_mutex);
if(ret != 0)
{
- printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret));
+ printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret));//#todo
}
}
diff --git a/RtspFace/PipeLinePool.cpp b/RtspFace/PipeLinePool.cpp
new file mode 100644
index 0000000..3b01327
--- /dev/null
+++ b/RtspFace/PipeLinePool.cpp
@@ -0,0 +1,123 @@
+#include "PipeLinePool.h"
+#include "logger.h"
+#include <pthread.h>
+
+#define PLP_MUTEX_LOCK(mut,_ret) if (mut != nullptr) {\
+ int ret = pthread_mutex_lock((pthread_mutex_t*)mut); \
+ if(ret != 0) \
+ { \
+ LOG_ERROR << "pthread_mutex_lock " << #mut << ": " << ret; \
+ return _ret; \
+ } \
+}
+
+#define PLP_MUTEX_UNLOCK(mut,_ret) if (mut != nullptr) {\
+ int ret = pthread_mutex_unlock((pthread_mutex_t*)mut); \
+ if(ret != 0) \
+ { \
+ LOG_ERROR << "pthread_mutex_unlock " << #mut << ": " << ret; \
+ return _ret; \
+ } \
+}
+
+PipeLinePool::PipeLinePool(bool _multithread_safe) :
+ multithread_safe(_multithread_safe), tsafe_mutex(nullptr), pl_mutex(nullptr),
+ pipelines(), pipelines_free()
+{
+ if (multithread_safe)
+ {
+ tsafe_mutex = new pthread_mutex_t;
+ pthread_mutex_init((pthread_mutex_t*)tsafe_mutex, NULL);
+
+ pl_mutex = new pthread_mutex_t;
+ pthread_mutex_init((pthread_mutex_t*)pl_mutex, NULL);
+
+ PLP_MUTEX_LOCK(pl_mutex,);
+ }
+}
+
+PipeLinePool::~PipeLinePool()
+{
+ if (multithread_safe)
+ {
+ PLP_MUTEX_UNLOCK(pl_mutex,);
+ }
+
+ pthread_mutex_destroy((pthread_mutex_t*)tsafe_mutex);
+ delete (pthread_mutex_t*)tsafe_mutex;
+ tsafe_mutex = nullptr;
+
+ pthread_mutex_destroy((pthread_mutex_t*)pl_mutex);
+ delete (pthread_mutex_t*)pl_mutex;
+ pl_mutex = nullptr;
+
+ pipelines_free.clear();
+
+ for (pl_set_t::iterator iter = pipelines.begin(); iter != pipelines.end(); ++iter)
+ delete *iter;
+
+ pipelines.clear();
+}
+
+void PipeLinePool::manage(PipeLine* pl)
+{
+ if (pl == nullptr)
+ return;
+
+ PLP_MUTEX_LOCK(tsafe_mutex,);
+
+ if (pipelines.find(pl) != pipelines.end())
+ return;
+
+ pipelines.insert(pl);
+ pipelines_free.insert(pl);
+
+ PLP_MUTEX_UNLOCK(tsafe_mutex,);
+}
+
+void PipeLinePool::unmanage(PipeLine* pl)
+{
+ PLP_MUTEX_LOCK(tsafe_mutex,);
+
+ pipelines.erase(pl);
+ pipelines_free.erase(pl);
+
+ PLP_MUTEX_UNLOCK(tsafe_mutex,);
+}
+
+PipeLine* PipeLinePool::get_free()
+{
+ if (pipelines_free.empty())
+ {
+ PLP_MUTEX_LOCK(pl_mutex, nullptr);
+ }
+
+ PLP_MUTEX_LOCK(tsafe_mutex, nullptr);
+
+ if (pipelines_free.empty())
+ return nullptr;
+
+ pl_set_t::iterator iter = pipelines_free.begin();
+ PipeLine* pl = *iter;
+ pipelines_free.erase(iter);
+
+ PLP_MUTEX_UNLOCK(tsafe_mutex, nullptr);
+
+ return pl;
+}
+
+void PipeLinePool::release(PipeLine* pl)
+{
+ if (pipelines.find(pl) == pipelines.end())
+ return;
+ if (pipelines_free.find(pl) != pipelines.end())
+ return;
+
+ PLP_MUTEX_LOCK(tsafe_mutex,);
+
+ pipelines_free.insert(pl);
+
+ PLP_MUTEX_UNLOCK(tsafe_mutex,);
+
+ PLP_MUTEX_UNLOCK(pl_mutex,);
+}
diff --git a/RtspFace/PipeLinePool.h b/RtspFace/PipeLinePool.h
new file mode 100644
index 0000000..141639c
--- /dev/null
+++ b/RtspFace/PipeLinePool.h
@@ -0,0 +1,33 @@
+#ifndef _PIPELINEPOOL_H_
+#define _PIPELINEPOOL_H_
+
+#include "PipeLine.h"
+#include <set>
+
+class PipeLinePool
+{
+public:
+ PipeLinePool(bool _multithread_safe = false);
+ ~PipeLinePool();
+
+ void manage(PipeLine* pl);
+ void unmanage(PipeLine* pl);
+
+ PipeLine* get_free();
+ void release(PipeLine* pl);
+
+ bool wait_free();
+ bool notify_free();
+
+private:
+ bool multithread_safe;
+
+ void* tsafe_mutex;
+ void* pl_mutex;
+
+ typedef std::set<PipeLine*> pl_set_t;
+ pl_set_t pipelines;
+ pl_set_t pipelines_free;
+};
+
+#endif
diff --git a/RtspFace/main_face_daemon.cpp b/RtspFace/main_face_daemon.cpp
new file mode 100644
index 0000000..51fa8e2
--- /dev/null
+++ b/RtspFace/main_face_daemon.cpp
@@ -0,0 +1,280 @@
+#include "PipeLine.h"
+#include "PL_RTSPClient.h"
+#include "PL_RTSPServer.h"
+#include "PL_H264Decoder.h"
+#include "PL_H264Encoder.h"
+#include "PL_AVFrameYUV420.h"
+#include "PL_AVFrameBGRA.h"
+#include "PL_Queue.h"
+#include "PL_Scale.h"
+#include "PL_Fork.h"
+
+#include "PL_SensetimeFaceTrack.h"
+
+#include "PL_DlibFaceTrack.h"
+
+#include "logger.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <sys/time.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <err.h>
+
+#include <event.h>
+
+#define SERVER_PORT 5432
+#define REUSEADDR_ON 1
+
+// A struct for client specific data, also includes pointer to create a list of clients.
+struct EVClient
+{
+ // The clients socket.
+ int fd;
+
+ // The bufferedevent for this client.
+ struct bufferevent *buf_ev;
+
+ EVClient() : fd(-1), buf_ev(nullptr)
+ { }
+};
+
+// Set a socket to non-blocking mode.
+int setnonblock(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFL);
+ if (flags < 0)
+ return flags;
+ flags |= O_NONBLOCK;
+ if (fcntl(fd, F_SETFL, flags) < 0)
+ return -1;
+
+ return 0;
+}
+
+// Called by libevent when there is data to read.
+void buffered_on_read(struct bufferevent *bufev, void *arg)
+{
+ struct EVClient *client = (struct EVClient *)arg;
+
+ // Write back the read buffer. It is important to note that
+ // bufferevent_write_buffer will drain the incoming data so it
+ // is effectively gone after we call it.
+ //LOG_DEBUG << (char*)bufev->input;
+ //bufferevent_write_buffer(bufev, bufev->input);
+
+ char buff[100] = {'\0'};
+ size_t readSize = bufferevent_read(bufev, buff, sizeof(buff));
+ LOG_DEBUG << "readSize=" << readSize << "\t" << buff;
+}
+
+// Called by libevent when the write buffer reaches 0.
+// We only provide this because libevent expects it, but we don't use it.
+void buffered_on_write(struct bufferevent *bufev, void *arg)
+{
+}
+
+// Called by libevent when there is an error on the underlying socket descriptor.
+void buffered_on_error(struct bufferevent *bufev, short what, void *arg)
+{
+ struct EVClient *client = (struct EVClient *)arg;
+
+ if (what & EVBUFFER_EOF)
+ {
+ //Client disconnected, remove the read event and the free the client structure.
+ LOG_INFO << "Client disconnected.";
+ }
+ else
+ {
+ LOG_WARN << "Client socket error, disconnecting.";
+ }
+ bufferevent_free(client->buf_ev);
+ close(client->fd);
+ delete client;
+}
+
+// This function will be called by libevent when there is a connection ready to be accepted.
+void on_accept(int fd, short ev, void *arg)
+{
+ struct sockaddr_in client_addr;
+ socklen_t client_len = sizeof(client_addr);
+ int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
+ if (client_fd < 0)
+ {
+ LOG_WARN << "accept failed";
+ return;
+ }
+
+ // Set the client socket to non-blocking mode.
+ if (setnonblock(client_fd) < 0)
+ LOG_WARN << "failed to set client socket non-blocking";
+
+ // We've accepted a new client, create a client object.
+ struct EVClient* client = new EVClient;
+ if (client == NULL)
+ {
+ LOG_ERROR << "malloc failed";
+ }
+ client->fd = client_fd;
+
+ // Create the buffered event.
+ client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client);
+
+ // We have to enable it before our callbacks will be called.
+ bufferevent_enable(client->buf_ev, EV_READ);
+
+ LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr);
+}
+
+int main(int argc, char **argv)
+{
+ initLogger(LV_DEBUG);
+
+ // Initialize libevent.
+ event_init();
+
+ // Create our listening socket.
+ int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (listen_fd < 0)
+ {
+ LOG_ERROR << "create socket failed";
+ return EXIT_FAILURE;
+ }
+
+ struct sockaddr_in listen_addr;
+ memset(&listen_addr, 0, sizeof(listen_addr));
+ listen_addr.sin_family = AF_INET;
+ listen_addr.sin_addr.s_addr = INADDR_ANY;
+ listen_addr.sin_port = htons(SERVER_PORT);
+ if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0)
+ {
+ LOG_ERROR << "bind failed";
+ return EXIT_FAILURE;
+ }
+
+ if (listen(listen_fd, 5) < 0)
+ {
+ LOG_ERROR << "listen failed";
+ return EXIT_FAILURE;
+ }
+
+ // Set the socket to non-blocking, this is essential in event based programming with libevent.
+ int reuseaddr_on = REUSEADDR_ON;
+ setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
+ if (setnonblock(listen_fd) < 0)
+ {
+ LOG_ERROR << "failed to set server socket to non-blocking";
+ return EXIT_FAILURE;
+ }
+
+ // We now have a listening socket, we create a read event to be notified when a client connects.
+ struct event ev_accept;
+ event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);
+ event_add(&ev_accept, NULL);
+
+ // Start the event loop.
+ event_dispatch();
+
+ return EXIT_SUCCESS;
+}
+
+
+
+
+
+
+int _main(int argc, char** argv)
+{
+ initLogger(LV_DEBUG);
+
+ PipeLine pipeLine;
+
+ PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack);
+
+ {
+ PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
+ PL_RTSPClient_Config rtspConfig;
+ rtspConfig.progName = argv[0];
+ rtspConfig.rtspURL = argv[1];
+ rtspConfig.aux = true; // ffmpeg need aux, but live555 not
+ rtspConfig.verbosityLevel = 1;
+ rtspConfig.tunnelOverHTTPPortNum = 0;
+ rtspConfig.args = nullptr;
+ bool ret = rtspClient->init(&rtspConfig);
+ if (!ret)
+ {
+ LOG_ERROR << "rtspClient.init error";
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ {
+ PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
+ bool ret = h264Decoder->init(nullptr);
+ if (!ret)
+ {
+ LOG_ERROR << "PL_H264Decoder.init error";
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ {
+ PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
+ bool ret = avFrameYUV420->init(nullptr);
+ if (!ret)
+ {
+ LOG_ERROR << "PL_AVFrameYUV420.init error";
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ {
+ PL_Scale_Config config;
+ config.toWidth = 800;
+ config.toHeight = 600;
+ PL_Scale* ple = (PL_Scale*)pipeLine.push_elem("PL_Scale");
+ bool ret = ple->init(&config);
+ if (!ret)
+ {
+ LOG_ERROR << "PL_Scale.init error";
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ PL_SensetimeFaceTrack* sensetimeFaceTrack;
+ {
+ SensetimeFaceTrackConfig config;
+ config.generate_face_feature = true;
+ sensetimeFaceTrack = (PL_SensetimeFaceTrack*)pipeLine.push_elem("PL_SensetimeFaceTrack");
+ sensetimeFaceTrack->init(&config);
+ }
+
+ while(true)
+ {
+ //LOG_ERROR << "begin pipe";
+
+ PipeMaterial pm;
+ if (pipeLine.pipe(&pm) == sensetimeFaceTrack);
+ sensetimeFaceTrack->gain(pm);
+
+ if (pm.type == PipeMaterial::PMT_PM_LIST)
+ {
+ PipeMaterial& facePM = ((PipeMaterial*)(pm.buffer))[1];
+ st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer);
+ LOG_NOTICE << "faceFeatures " << faceFeatures.size();
+ }
+
+ //LOG_ERROR << "end pipe";
+ }
+}
diff --git a/RtspFace/make.sh b/RtspFace/make.sh
index 05baf7f..0cd039b 100644
--- a/RtspFace/make.sh
+++ b/RtspFace/make.sh
@@ -39,7 +39,7 @@
# -O3
CPPFLAGS+="-g -mavx -c -std=c++11 -pthread $LIVEMEDIA_INC $FFMPEG_INC $LIBBASE64_INC $LIBYUV_INC $SENSETIMEFACESDK_INC $LIBLOG4CPP_INC $DLIB_INC"
-LDFLAGS+="-pthread $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB $LIBYUV_LIB $LIBX264_LIB $SENSETIMEFACESDK_LIB $OPENCV_LIB $LIBLOG4CPP_LIB $DLIB_LIB"
+LDFLAGS+="-pthread -levent $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB $LIBYUV_LIB $LIBX264_LIB $SENSETIMEFACESDK_LIB $OPENCV_LIB $LIBLOG4CPP_LIB $DLIB_LIB"
CFLAGS+="-D__STDC_CONSTANT_MACROS"
@@ -47,8 +47,10 @@
rm *.o
#g++ main.cpp $CFLAGS $CPPFLAGS -o main.o
-g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o
+#g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o
+g++ main_face_daemon.cpp $CFLAGS $CPPFLAGS -o main.o
g++ PipeLine.cpp $CFLAGS $CPPFLAGS
+g++ PipeLinePool.cpp $CFLAGS $CPPFLAGS
g++ PL_RTSPClient.cpp $CFLAGS $CPPFLAGS
g++ PL_RTSPServer.cpp $CFLAGS $CPPFLAGS
@@ -69,7 +71,7 @@
g++ $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS
g++ -g -std=c++11 \
- main.o PipeLine.o \
+ main.o PipeLine.o PipeLinePool.o \
PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o PL_Scale.o PL_Fork.o \
PL_SensetimeFaceTrack.o \
PL_DlibFaceTrack.o \
--
Gitblit v1.8.0