From 0b1de1fddd889cf2ebbe578bfad83862f5ebdf5a Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期一, 09 一月 2017 12:11:05 +0800
Subject: [PATCH] add libevent based daemon

---
 RtspFace/make.sh              |    8 
 RtspFace/main_face_daemon.cpp |  280 +++++++++++++++++++++++++++++++++++
 RtspFace/PipeLinePool.h       |   33 ++++
 RtspFace/PL_RTSPClient.cpp    |    4 
 RtspFace/PipeLinePool.cpp     |  123 +++++++++++++++
 5 files changed, 443 insertions(+), 5 deletions(-)

diff --git a/RtspFace/PL_RTSPClient.cpp b/RtspFace/PL_RTSPClient.cpp
index d169da2..759fc5d 100644
--- a/RtspFace/PL_RTSPClient.cpp
+++ b/RtspFace/PL_RTSPClient.cpp
@@ -231,7 +231,7 @@
 	int ret = pthread_mutex_unlock(in->frame_mutex);
 	if(ret != 0)
 	{
-		printf("pthread_mutex_unlock frame_mutex: %s/n", strerror(ret));
+		LOG_ERROR << "pthread_mutex_unlock frame_mutex: ", strerror(ret);
 	}
 }
 
@@ -246,6 +246,6 @@
 	int ret = pthread_mutex_lock(in->continue_mutex);
 	if(ret != 0)
 	{
-		printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret));
+		printf("pthread_mutex_unlock continue_mutex: %s/n", strerror(ret));//#todo
 	}
 }
diff --git a/RtspFace/PipeLinePool.cpp b/RtspFace/PipeLinePool.cpp
new file mode 100644
index 0000000..3b01327
--- /dev/null
+++ b/RtspFace/PipeLinePool.cpp
@@ -0,0 +1,123 @@
+#include "PipeLinePool.h"
+#include "logger.h"
+#include <pthread.h>
+
+#define PLP_MUTEX_LOCK(mut,_ret) if (mut != nullptr) {\
+	int ret = pthread_mutex_lock((pthread_mutex_t*)mut); \
+	if(ret != 0) \
+	{ \
+		LOG_ERROR << "pthread_mutex_lock " << #mut <<  ": " << ret; \
+		return _ret; \
+	} \
+}
+
+#define PLP_MUTEX_UNLOCK(mut,_ret) if (mut != nullptr) {\
+	int ret = pthread_mutex_unlock((pthread_mutex_t*)mut); \
+	if(ret != 0) \
+	{ \
+		LOG_ERROR << "pthread_mutex_unlock " << #mut <<  ": " << ret; \
+		return _ret; \
+	} \
+}
+
+PipeLinePool::PipeLinePool(bool _multithread_safe) : 
+	multithread_safe(_multithread_safe), tsafe_mutex(nullptr), pl_mutex(nullptr), 
+	pipelines(), pipelines_free()
+{
+	if (multithread_safe)
+	{
+		tsafe_mutex = new pthread_mutex_t;
+		pthread_mutex_init((pthread_mutex_t*)tsafe_mutex, NULL);
+		
+		pl_mutex = new pthread_mutex_t;
+		pthread_mutex_init((pthread_mutex_t*)pl_mutex, NULL);
+		
+		PLP_MUTEX_LOCK(pl_mutex,);
+	}
+}
+
+PipeLinePool::~PipeLinePool()
+{
+	if (multithread_safe)
+	{
+		PLP_MUTEX_UNLOCK(pl_mutex,);
+	}
+	
+	pthread_mutex_destroy((pthread_mutex_t*)tsafe_mutex);
+	delete (pthread_mutex_t*)tsafe_mutex;
+	tsafe_mutex = nullptr;
+	
+	pthread_mutex_destroy((pthread_mutex_t*)pl_mutex);
+	delete (pthread_mutex_t*)pl_mutex;
+	pl_mutex = nullptr;
+	
+	pipelines_free.clear();
+	
+	for (pl_set_t::iterator iter = pipelines.begin(); iter != pipelines.end(); ++iter)
+		delete *iter;
+	
+	pipelines.clear();
+}
+
+void PipeLinePool::manage(PipeLine* pl)
+{
+	if (pl == nullptr)
+		return;
+
+	PLP_MUTEX_LOCK(tsafe_mutex,);
+	
+	if (pipelines.find(pl) != pipelines.end())
+		return;
+	
+	pipelines.insert(pl);
+	pipelines_free.insert(pl);
+	
+	PLP_MUTEX_UNLOCK(tsafe_mutex,);
+}
+
+void PipeLinePool::unmanage(PipeLine* pl)
+{
+	PLP_MUTEX_LOCK(tsafe_mutex,);
+
+	pipelines.erase(pl);
+	pipelines_free.erase(pl);
+	
+	PLP_MUTEX_UNLOCK(tsafe_mutex,);
+}
+
+PipeLine* PipeLinePool::get_free()
+{
+	if (pipelines_free.empty())
+	{
+		PLP_MUTEX_LOCK(pl_mutex, nullptr);
+	}
+	
+	PLP_MUTEX_LOCK(tsafe_mutex, nullptr);
+	
+	if (pipelines_free.empty())
+		return nullptr;
+	
+	pl_set_t::iterator iter = pipelines_free.begin();
+	PipeLine* pl = *iter;
+	pipelines_free.erase(iter);
+	
+	PLP_MUTEX_UNLOCK(tsafe_mutex, nullptr);
+	
+	return pl;
+}
+
+void PipeLinePool::release(PipeLine* pl)
+{
+	if (pipelines.find(pl) == pipelines.end())
+		return;
+	if (pipelines_free.find(pl) != pipelines.end())
+		return;
+
+	PLP_MUTEX_LOCK(tsafe_mutex,);
+	
+	pipelines_free.insert(pl);
+
+	PLP_MUTEX_UNLOCK(tsafe_mutex,);
+	
+	PLP_MUTEX_UNLOCK(pl_mutex,);
+}
diff --git a/RtspFace/PipeLinePool.h b/RtspFace/PipeLinePool.h
new file mode 100644
index 0000000..141639c
--- /dev/null
+++ b/RtspFace/PipeLinePool.h
@@ -0,0 +1,33 @@
+#ifndef _PIPELINEPOOL_H_
+#define _PIPELINEPOOL_H_
+
+#include "PipeLine.h"
+#include <set>
+
+class PipeLinePool
+{
+public:
+	PipeLinePool(bool _multithread_safe = false);
+	~PipeLinePool();
+	
+	void manage(PipeLine* pl);
+	void unmanage(PipeLine* pl);
+	
+	PipeLine* get_free();
+	void release(PipeLine* pl);
+	
+	bool wait_free();
+	bool notify_free();
+
+private:
+	bool multithread_safe;
+
+	void* tsafe_mutex;
+	void* pl_mutex;
+
+	typedef std::set<PipeLine*> pl_set_t;
+	pl_set_t pipelines;
+	pl_set_t pipelines_free;
+};
+
+#endif
diff --git a/RtspFace/main_face_daemon.cpp b/RtspFace/main_face_daemon.cpp
new file mode 100644
index 0000000..51fa8e2
--- /dev/null
+++ b/RtspFace/main_face_daemon.cpp
@@ -0,0 +1,280 @@
+#include "PipeLine.h"
+#include "PL_RTSPClient.h"
+#include "PL_RTSPServer.h"
+#include "PL_H264Decoder.h"
+#include "PL_H264Encoder.h"
+#include "PL_AVFrameYUV420.h"
+#include "PL_AVFrameBGRA.h"
+#include "PL_Queue.h"
+#include "PL_Scale.h"
+#include "PL_Fork.h"
+
+#include "PL_SensetimeFaceTrack.h"
+
+#include "PL_DlibFaceTrack.h"
+
+#include "logger.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <sys/time.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <err.h>
+
+#include <event.h>
+
+#define SERVER_PORT 5432
+#define REUSEADDR_ON 1
+
+// A struct for client specific data, also includes pointer to create a list of clients.
+struct EVClient
+{
+	// The clients socket.
+	int fd;
+
+	// The bufferedevent for this client.
+	struct bufferevent *buf_ev;
+	
+	EVClient() : fd(-1), buf_ev(nullptr)
+	{ }
+};
+
+// Set a socket to non-blocking mode.
+int setnonblock(int fd)
+{
+	int flags;
+
+	flags = fcntl(fd, F_GETFL);
+	if (flags < 0)
+		return flags;
+	flags |= O_NONBLOCK;
+	if (fcntl(fd, F_SETFL, flags) < 0)
+		return -1;
+
+	return 0;
+}
+
+// Called by libevent when there is data to read.
+void buffered_on_read(struct bufferevent *bufev, void *arg)
+{
+	struct EVClient *client = (struct EVClient *)arg;
+	
+	// Write back the read buffer. It is important to note that
+	// bufferevent_write_buffer will drain the incoming data so it
+	// is effectively gone after we call it.
+	//LOG_DEBUG << (char*)bufev->input;
+	//bufferevent_write_buffer(bufev, bufev->input);
+	
+	char buff[100] = {'\0'};
+	size_t readSize = bufferevent_read(bufev, buff, sizeof(buff));
+	LOG_DEBUG << "readSize=" << readSize << "\t" << buff;
+}
+
+// Called by libevent when the write buffer reaches 0. 
+// We only provide this because libevent expects it, but we don't use it.
+void buffered_on_write(struct bufferevent *bufev, void *arg)
+{
+}
+
+// Called by libevent when there is an error on the underlying socket descriptor.
+void buffered_on_error(struct bufferevent *bufev, short what, void *arg)
+{
+	struct EVClient *client = (struct EVClient *)arg;
+
+	if (what & EVBUFFER_EOF)
+	{
+		//Client disconnected, remove the read event and the free the client structure.
+		LOG_INFO << "Client disconnected.";
+	}
+	else
+	{
+		LOG_WARN << "Client socket error, disconnecting.";
+	}
+	bufferevent_free(client->buf_ev);
+	close(client->fd);
+	delete client;
+}
+
+// This function will be called by libevent when there is a connection ready to be accepted.
+void on_accept(int fd, short ev, void *arg)
+{
+	struct sockaddr_in client_addr;
+	socklen_t client_len = sizeof(client_addr);
+	int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
+	if (client_fd < 0)
+	{
+		LOG_WARN << "accept failed";
+		return;
+	}
+
+	// Set the client socket to non-blocking mode.
+	if (setnonblock(client_fd) < 0)
+		LOG_WARN << "failed to set client socket non-blocking";
+
+	// We've accepted a new client, create a client object.
+	struct EVClient* client = new EVClient;
+	if (client == NULL)
+	{
+		LOG_ERROR << "malloc failed";
+	}
+	client->fd = client_fd;
+
+	// Create the buffered event.
+	client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client);
+
+	// We have to enable it before our callbacks will be called.
+	bufferevent_enable(client->buf_ev, EV_READ);
+
+	LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr);
+}
+
+int main(int argc, char **argv)
+{
+	initLogger(LV_DEBUG);
+
+	// Initialize libevent.
+	event_init();
+
+	// Create our listening socket.
+	int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (listen_fd < 0)
+	{
+		LOG_ERROR << "create socket failed";
+		return EXIT_FAILURE;
+	}
+	
+	struct sockaddr_in listen_addr;
+	memset(&listen_addr, 0, sizeof(listen_addr));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_addr.s_addr = INADDR_ANY;
+	listen_addr.sin_port = htons(SERVER_PORT);
+	if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0)
+	{
+		LOG_ERROR << "bind failed";
+		return EXIT_FAILURE;
+	}
+	
+	if (listen(listen_fd, 5) < 0)
+	{
+		LOG_ERROR << "listen failed";
+		return EXIT_FAILURE;
+	}
+	
+	// Set the socket to non-blocking, this is essential in event based programming with libevent.
+	int reuseaddr_on = REUSEADDR_ON;
+	setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
+	if (setnonblock(listen_fd) < 0)
+	{
+		LOG_ERROR << "failed to set server socket to non-blocking";
+		return EXIT_FAILURE;
+	}
+
+	// We now have a listening socket, we create a read event to be notified when a client connects.
+	struct event ev_accept;
+	event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);
+	event_add(&ev_accept, NULL);
+
+	// Start the event loop.
+	event_dispatch();
+
+	return EXIT_SUCCESS;
+}
+
+
+
+
+
+
+int _main(int argc, char** argv)
+{
+	initLogger(LV_DEBUG);
+
+	PipeLine pipeLine;
+
+	PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack);
+
+	{
+		PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient");
+		PL_RTSPClient_Config rtspConfig;
+		rtspConfig.progName = argv[0];
+		rtspConfig.rtspURL = argv[1];
+		rtspConfig.aux = true; // ffmpeg need aux, but live555 not
+		rtspConfig.verbosityLevel = 1;
+		rtspConfig.tunnelOverHTTPPortNum = 0;
+		rtspConfig.args = nullptr;
+		bool ret = rtspClient->init(&rtspConfig);
+		if (!ret)
+			{
+				LOG_ERROR << "rtspClient.init error";
+				exit(EXIT_FAILURE);
+			}
+	}
+
+	{
+		PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder");
+		bool ret = h264Decoder->init(nullptr);
+		if (!ret)
+			{
+				LOG_ERROR << "PL_H264Decoder.init error";
+				exit(EXIT_FAILURE);
+			}
+	}
+
+	{
+		PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420");
+		bool ret = avFrameYUV420->init(nullptr);
+		if (!ret)
+			{
+				LOG_ERROR << "PL_AVFrameYUV420.init error";
+				exit(EXIT_FAILURE);
+			}
+	}
+
+	{
+		PL_Scale_Config config;
+		config.toWidth = 800;
+		config.toHeight = 600;
+		PL_Scale* ple = (PL_Scale*)pipeLine.push_elem("PL_Scale");
+		bool ret = ple->init(&config);
+		if (!ret)
+			{
+				LOG_ERROR << "PL_Scale.init error";
+				exit(EXIT_FAILURE);
+			}
+	}
+
+	PL_SensetimeFaceTrack* sensetimeFaceTrack;
+	{
+		SensetimeFaceTrackConfig config;
+		config.generate_face_feature = true;
+		sensetimeFaceTrack = (PL_SensetimeFaceTrack*)pipeLine.push_elem("PL_SensetimeFaceTrack");
+		sensetimeFaceTrack->init(&config);
+	}
+
+	while(true)
+		{
+			//LOG_ERROR << "begin pipe";
+
+			PipeMaterial pm;
+			if (pipeLine.pipe(&pm) == sensetimeFaceTrack);
+			sensetimeFaceTrack->gain(pm);
+
+			if (pm.type == PipeMaterial::PMT_PM_LIST)
+				{
+					PipeMaterial& facePM = ((PipeMaterial*)(pm.buffer))[1];
+					st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer);
+					LOG_NOTICE << "faceFeatures " << faceFeatures.size();
+				}
+
+			//LOG_ERROR << "end pipe";
+		}
+}
diff --git a/RtspFace/make.sh b/RtspFace/make.sh
index 05baf7f..0cd039b 100644
--- a/RtspFace/make.sh
+++ b/RtspFace/make.sh
@@ -39,7 +39,7 @@
 
 # -O3
 CPPFLAGS+="-g -mavx -c -std=c++11 -pthread $LIVEMEDIA_INC $FFMPEG_INC $LIBBASE64_INC $LIBYUV_INC $SENSETIMEFACESDK_INC $LIBLOG4CPP_INC $DLIB_INC"
-LDFLAGS+="-pthread $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB $LIBYUV_LIB $LIBX264_LIB $SENSETIMEFACESDK_LIB $OPENCV_LIB $LIBLOG4CPP_LIB $DLIB_LIB"
+LDFLAGS+="-pthread -levent $LIVEMEDIA_LIB $FFMPEG_LIB $LIBBASE64_LIB $LIBYUV_LIB $LIBX264_LIB $SENSETIMEFACESDK_LIB $OPENCV_LIB $LIBLOG4CPP_LIB $DLIB_LIB"
 
 CFLAGS+="-D__STDC_CONSTANT_MACROS"
 
@@ -47,8 +47,10 @@
 rm *.o
 
 #g++ main.cpp $CFLAGS $CPPFLAGS -o main.o
-g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o
+#g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o
+g++ main_face_daemon.cpp $CFLAGS $CPPFLAGS -o main.o
 g++ PipeLine.cpp $CFLAGS $CPPFLAGS
+g++ PipeLinePool.cpp $CFLAGS $CPPFLAGS
 
 g++ PL_RTSPClient.cpp $CFLAGS $CPPFLAGS
 g++ PL_RTSPServer.cpp $CFLAGS $CPPFLAGS
@@ -69,7 +71,7 @@
 g++ $FFMPEGRTSPSERVER_BASE/LiveServerMediaSubsession.cpp $CFLAGS $CPPFLAGS
 
 g++ -g -std=c++11 \
-  main.o PipeLine.o \
+  main.o PipeLine.o PipeLinePool.o \
   PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o PL_Scale.o PL_Fork.o \
   PL_SensetimeFaceTrack.o \
   PL_DlibFaceTrack.o \

--
Gitblit v1.8.0