From 4a09037a212df66c170ee494907b5a746c01ba26 Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期二, 18 四月 2017 18:51:05 +0800
Subject: [PATCH] 

---
 FaceServer/main_face_daemon.cpp |  233 +++++++++++++++++++++
 FaceServer/ev_server.cpp        |  302 +++++++++++++++++++++++++++
 FaceServer/ev_proto.h           |   56 +++++
 FaceServer/ev_server.h          |   48 ++++
 4 files changed, 639 insertions(+), 0 deletions(-)

diff --git a/FaceServer/ev_proto.h b/FaceServer/ev_proto.h
new file mode 100644
index 0000000..6dc27be
--- /dev/null
+++ b/FaceServer/ev_proto.h
@@ -0,0 +1,56 @@
+#ifndef _EV_PROTO_H_
+#define _EV_PROTO_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#pragma pack(1)
+
+struct EVPCommand
+{
+	enum EVPC
+	{
+		EVPC__FIRST,
+		EVPC_STATUS = 1,
+		EVPC_USER_DEFINE = 128,
+		EVPC__LAST
+	};
+};
+
+struct EVPStatus
+{
+	enum EVPS
+	{
+		EVPS__FIRST,
+		EVPS_OK = 1,
+		EVPS_ERROR = 128,
+		EVPS_INTERNAL_ERROR,
+		EVPS_PARAMETER_ERROR,
+		EVPS__LAST
+	};
+};
+
+struct EVPHeader
+{
+	int16_t cmd;	// EVPCommand::EVPC
+	uint32_t size;	// sizeof(EVPHeader)+sizeof(subcmd)
+};
+
+struct EVP_Status
+{
+	int16_t status;
+};
+
+struct EVP_VariableBuffer
+{
+	int16_t type;
+	uint8_t buff[0];
+};
+
+//#todo
+template<typename TPacket>
+void endian_convert(TPacket& packet);
+
+#pragma pack()
+
+#endif
diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp
new file mode 100644
index 0000000..2f423a2
--- /dev/null
+++ b/FaceServer/ev_server.cpp
@@ -0,0 +1,302 @@
+#include "ev_server.h" 
+#include "ev_proto.h"
+#include "logger.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <sys/time.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <err.h>
+
+#include <event.h>
+
+// A struct for client specific data, also includes pointer to create a list of clients.
+struct EVClient
+{
+	// The clients socket.
+	int fd;
+
+	// The bufferedevent for this client.
+	struct bufferevent *buf_ev;
+	
+	uint8_t* recvbuff;
+	size_t recvbuff_end;
+	size_t recvbuff_max;
+	size_t read_times; // read times for a command
+	
+	evclient_proc_t proc;
+	
+	EVClient() : 
+		fd(-1), buf_ev(nullptr), 
+		recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), 
+		proc(nullptr)
+	{ }
+};
+
+#ifndef USER_DEFINE_EVCLIENT_PROC
+evclient_proc_t evclient_proc = nullptr;
+#endif
+
+// Set a socket to non-blocking mode.
+static int setnonblock(int fd)
+{
+	int flags;
+
+	flags = fcntl(fd, F_GETFL);
+	if (flags < 0)
+		return flags;
+	flags |= O_NONBLOCK;
+	if (fcntl(fd, F_SETFL, flags) < 0)
+		return -1;
+
+	return 0;
+}
+
+// Called by libevent when there is an error on the underlying socket descriptor.
+static void buffered_on_error(struct bufferevent *bufev, short what, void *arg)
+{
+	struct EVClient *client = (struct EVClient *)arg;
+
+	if (what & EVBUFFER_EOF)
+	{
+		//Client disconnected, remove the read event and the free the client structure.
+		LOG_INFO << "Client disconnected." << std::endl;
+	}
+	else
+	{
+		LOG_WARN << "Client socket error, disconnecting." << std::endl;
+	}
+	bufferevent_free(client->buf_ev);
+	close(client->fd);
+	delete[] client->recvbuff;
+	delete client;
+}
+
+// Called by libevent when there is data to read.
+static void buffered_on_read(struct bufferevent *bufev, void *arg)
+{
+	struct EVClient *client = (struct EVClient *)arg;
+	
+	// Write back the read buffer. It is important to note that
+	// bufferevent_write_buffer will drain the incoming data so it
+	// is effectively gone after we call it.
+	//LOG_DEBUG << (char*)bufev->input << std::endl;
+	//bufferevent_write_buffer(bufev, bufev->input);
+	
+	//char buff[100] = {'\0'};
+	//size_t readSize = bufferevent_read(bufev, buff, sizeof(buff));
+	//LOG_DEBUG << "readSize=" << readSize << "\t" << buff << std::endl;
+
+	EVPHeader* evpHeader = (EVPHeader*)client->recvbuff;
+	
+	// read header if expected
+	{
+		uint8_t headerBuff[sizeof(EVPHeader)];
+		if (client->recvbuff_end == 0)
+		{
+			size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
+			client->read_times = 1;
+			if (readSize != sizeof(headerBuff))
+			{
+				LOG_WARN << "client send incomplete header" << std::endl;
+				buffered_on_error(bufev, 0, arg);
+				return;
+			}
+			
+			evpHeader = (EVPHeader*)headerBuff;
+			
+			// check header
+			if (evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || 
+				evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
+			{
+				LOG_WARN << "client send invalid header" << std::endl;
+				buffered_on_error(bufev, 0, arg);
+				return;
+			}
+			
+			if (client->recvbuff_max < evpHeader->size)
+			{
+				delete[] client->recvbuff;
+				client->recvbuff = nullptr;
+				client->recvbuff_max = 0;
+			}
+			
+			if (client->recvbuff == nullptr)
+			{
+				uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
+				client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
+				client->recvbuff = new uint8_t[client->recvbuff_max];
+			}
+			
+			memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
+			client->recvbuff_end = sizeof(headerBuff);
+		}
+	}
+	
+	// read sub command or data
+	size_t readSize = 0;
+	do
+	{
+		readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end);
+		client->read_times++;
+		if (readSize == 0)
+			break;
+
+		client->recvbuff_end += readSize;
+	} while (readSize > 0);
+	
+	// test if command complete
+	if (evpHeader->size == client->recvbuff_end)
+	{
+		// call client proc
+		bool closeClient = true;
+		if (client->proc != nullptr)
+		{
+			EVClientStub cs(client->recvbuff, client->recvbuff_end);
+			cs.id = client->fd;
+			closeClient = !(client->proc(cs));
+			
+			if (cs.sendBuff != nullptr && cs.sendBuffSize > 0)
+			{
+				size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize);
+				if (writeSize != cs.sendBuffSize)
+					LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << std::endl;
+				
+				if (cs.deleteSendBuff)
+					delete[] cs.sendBuff;
+			}
+		}
+		
+		if (closeClient)
+		{
+			LOG_DEBUG << "server initiative close" << std::endl;
+			buffered_on_error(bufev, 0, arg);
+		}
+
+		// reset client
+		client->recvbuff_end = 0;
+		client->read_times = 0;
+	}
+	
+	// check read times
+	if (client->read_times > CLIENT_READ_TIMES_MAX)
+	{
+		LOG_WARN << "client read times to max" << std::endl;
+		buffered_on_error(bufev, 0, arg);
+	}
+}
+
+// Called by libevent when the write buffer reaches 0. 
+// We only provide this because libevent expects it, but we don't use it.
+static void buffered_on_write(struct bufferevent *bufev, void *arg)
+{
+}
+
+// This function will be called by libevent when there is a connection ready to be accepted.
+static void on_accept(int fd, short ev, void *arg)
+{
+	struct sockaddr_in client_addr;
+	socklen_t client_len = sizeof(client_addr);
+	int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
+	if (client_fd < 0)
+	{
+		LOG_WARN << "accept failed" << std::endl;
+		return;
+	}
+
+	// Set the client socket to non-blocking mode.
+	if (setnonblock(client_fd) < 0)
+		LOG_WARN << "failed to set client socket non-blocking" << std::endl;
+
+	// We've accepted a new client, create a client object.
+	struct EVClient* client = new EVClient;
+	if (client == NULL)
+	{
+		LOG_ERROR << "malloc failed" << std::endl;
+	}
+	client->fd = client_fd;
+	client->proc = evclient_proc;
+
+	// Create the buffered event.
+	client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client);
+
+	// We have to enable it before our callbacks will be called.
+	bufferevent_enable(client->buf_ev, EV_READ);
+
+	LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << std::endl;
+}
+
+int server_main(int argc, char **argv)
+{
+	//initLogger(LV_DEBUG);
+
+	// Initialize libevent.
+	event_init();
+
+	// Create our listening socket.
+	int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (listen_fd < 0)
+	{
+		LOG_ERROR << "create socket failed" << std::endl;
+		return EXIT_FAILURE;
+	}
+	
+	struct sockaddr_in listen_addr;
+	memset(&listen_addr, 0, sizeof(listen_addr));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_addr.s_addr = INADDR_ANY;
+	listen_addr.sin_port = htons(SERVER_PORT);
+	if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0)
+	{
+		LOG_ERROR << "bind failed" << std::endl;
+		return EXIT_FAILURE;
+	}
+	
+	if (listen(listen_fd, 5) < 0)
+	{
+		LOG_ERROR << "listen failed" << std::endl;
+		return EXIT_FAILURE;
+	}
+	
+	// Set the socket to non-blocking, this is essential in event based programming with libevent.
+	int reuseaddr_on = REUSEADDR_ON;
+	setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
+	if (setnonblock(listen_fd) < 0)
+	{
+		LOG_ERROR << "failed to set server socket to non-blocking" << std::endl;
+		return EXIT_FAILURE;
+	}
+
+	// We now have a listening socket, we create a read event to be notified when a client connects.
+	struct event ev_accept;
+	event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);
+	event_add(&ev_accept, NULL);
+
+	// Start the event loop.
+	event_dispatch();
+
+	return EXIT_SUCCESS;
+}
+
+void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status)
+{
+	client.sendBuffSize = sizeof(EVPHeader)+sizeof(EVP_Status);
+	client.sendBuff = new uint8_t[client.sendBuffSize];
+	client.deleteSendBuff = true;
+	
+	EVPHeader* evpHeader = new (client.sendBuff) EVPHeader;
+	evpHeader->cmd = EVPCommand::EVPC_STATUS;
+	evpHeader->size = client.sendBuffSize;
+	
+	EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status;
+	evpStatus->status = status;
+}
diff --git a/FaceServer/ev_server.h b/FaceServer/ev_server.h
new file mode 100644
index 0000000..75749b5
--- /dev/null
+++ b/FaceServer/ev_server.h
@@ -0,0 +1,48 @@
+#ifndef _EV_SERVER_H_
+#define _EV_SERVER_H_
+
+#include <stddef.h>
+#include <stdint.h>
+#include "ev_proto.h"
+
+#define SERVER_PORT 5432
+#define REUSEADDR_ON 1
+#define CLIENT_BUFFER_MAX 100*1024 // 100KB
+#define CLIENT_READ_TIMES_MAX 100
+#define CLIENT_MAX 100 // max count of clients connected //#todo not support
+
+struct EVClientStub
+{
+	int id;
+	const uint8_t* recvBuff;
+	const size_t recvBuffSize;
+	uint8_t* sendBuff;
+	size_t sendBuffSize;
+	bool deleteSendBuff;
+	
+	EVClientStub() : 
+		id(-1), 
+		recvBuff(nullptr), recvBuffSize(0), 
+		sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false)
+	{
+	}
+	
+	EVClientStub(const uint8_t* _recvBuff, size_t _recvBuffSize) : 
+		id(-1), 
+		recvBuff(_recvBuff), recvBuffSize(_recvBuffSize), 
+		sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false)
+	{
+	}
+};
+
+typedef bool (*evclient_proc_t)(EVClientStub& client);
+extern evclient_proc_t evclient_proc;
+
+//#define USER_DEFINE_EVCLIENT_PROC
+
+int server_main(int argc, char **argv);
+
+void ev_send_packet(EVClientStub& client);
+void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status);
+
+#endif
diff --git a/FaceServer/main_face_daemon.cpp b/FaceServer/main_face_daemon.cpp
new file mode 100644
index 0000000..9058ee4
--- /dev/null
+++ b/FaceServer/main_face_daemon.cpp
@@ -0,0 +1,233 @@
+#include "PipeLine.h"
+#include "MaterialBuffer.h"
+#include "PL_RTSPClient.h"
+#include "PL_RTSPServer.h"
+#include "PL_H264Decoder.h"
+#include "PL_H264Encoder.h"
+#include "PL_AVFrameYUV420.h"
+#include "PL_AVFrameBGRA.h"
+#include "PL_Queue.h"
+#include "PL_Scale.h"
+#include "PL_Fork.h"
+#include "PL_Payer.h"
+#include "PL_Gainer.h"
+#include "PL_SensetimeFaceTrack.h"
+#include "PL_SensetimeFaceDetect.h"
+#include "PL_DlibFaceTrack.h"
+
+#include "PipeLinePool.h"
+
+#include "ev_server.h" 
+#include "ev_proto.h"
+#include "face_daemon_proto.h"
+
+#include "SensetimeFaceAPIWrapper/src/FaceDBPool.h"
+#include "SensetimeFaceAPIWrapper/src/faceAPI.h"
+
+#include "logger.h"
+
+template<typename TPoolPtr, typename TPoolElem>
+struct PoolElemLocker
+{
+	TPoolPtr pool;
+	TPoolElem elem;
+	PoolElemLocker(TPoolPtr _pool, TPoolElem _elem) : pool(_pool), elem(_elem)
+	{
+		
+	}
+	~PoolElemLocker()
+	{
+		pool->release(elem);
+		pool->notify_free();
+	}
+};
+
+template<typename TArrayPtr>
+struct ArrayDeleter
+{
+	TArrayPtr array;
+	ArrayDeleter(TArrayPtr _array) : array(_array)
+	{
+		
+	}
+	~ArrayDeleter()
+	{
+		delete[] array;
+	}
+};
+
+PipeLinePool g_PipeLinePool;
+
+FaceDBPool g_faceAPIPool;//#todo config
+
+evclient_proc_t evclient_proc;
+
+bool send_SensetimeFaceDetectResult(EVClientStub& client, PipeMaterial& lastPm)
+{
+	//if (lastPm.type != PipeMaterial::PMT_PM_LIST)
+
+	//PipeMaterial& facePM = ((PipeMaterial*)(lastPm.buffer))[1];
+	//st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer);
+	//LOG_NOTICE << "faceFeatures " << faceFeatures.size() << std::endl;
+	//#todo send result packet
+
+	if (lastPm.buffer == nullptr || lastPm.type != PipeMaterial::PMT_BYTES || lastPm.buffSize != sizeof(SensetimeFaceDetectResult))
+	{
+		LOG_WARN << "pm not available" << std::endl;
+		ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR);
+		return false;
+	}
+	
+	const SensetimeFaceDetectResult*  result = (const SensetimeFaceDetectResult*)lastPm.buffer;
+	
+	client.sendBuffSize = sizeof(EVPHeader)+sizeof(SensetimeFaceDetectResult);
+	client.sendBuff = new uint8_t[client.sendBuffSize];
+	client.deleteSendBuff = true;
+	
+	EVPHeader* evpHeader = new (client.sendBuff) EVPHeader;
+	evpHeader->cmd = FaceDaemonCommand::FDC_SENSETIMEFACEDETECT_RESULT;
+	evpHeader->size = client.sendBuffSize;
+	
+	SensetimeFaceDetectResult* evpSub = new (client.sendBuff + sizeof(EVPHeader)) SensetimeFaceDetectResult;
+	evpSub->school_id = result->school_id;
+	evpSub->st_id = result->st_id;
+	
+	return true;
+}
+
+bool ev_proc_SensetimeFaceDetect(EVClientStub& client)
+{
+	//#test send 01000B0000004142434445
+	//LOG_DEBUG << "cmd=" << evpHeader->cmd << ", size=" << evpHeader->size << ", \t" << (char*)(evpHeader + sizeof(EVPHeader)) << std::endl;
+	//return true;
+	
+	FDP_Image* fdpImage = (FDP_Image*)(client.recvBuff + sizeof(EVPHeader));
+
+	FaceDB* _faceDB = g_faceAPIPool.get_free(fdpImage->school_id);
+	if (_faceDB == nullptr)
+	{
+		LOG_WARN << "can't get face db" << std::endl;
+		ev_send_status_packet(client, EVPStatus::EVPS_PARAMETER_ERROR);
+		return false;
+	}
+	
+	PoolElemLocker<FaceDBPool*, int> _lock_faceAPI(&g_faceAPIPool, fdpImage->school_id);
+
+	PipeLine* pipeLine = g_PipeLinePool.get_free();
+	if (pipeLine == nullptr)
+	{
+		LOG_WARN << "can't get free pipeline" << std::endl;
+		ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR);
+		return false;
+	}
+	
+	PoolElemLocker<PipeLinePool*, PipeLine*> _lock_pipeLine(&g_PipeLinePool, pipeLine);
+
+	// fill
+	SensetimeFaceDetectDbFrame dbFrame;
+	dbFrame.type = (MB_Frame::MBFType)(fdpImage->mb_type);
+	dbFrame.buffSize = client.recvBuffSize - sizeof(EVPHeader) - sizeof(FDP_Image);
+	dbFrame.buffer = new uint8_t[dbFrame.buffSize];
+	ArrayDeleter<uint8_t*> _del_img((uint8_t*)dbFrame.buffer);
+	memcpy(dbFrame.buffer, fdpImage->buff, dbFrame.buffSize);
+	dbFrame.width = fdpImage->width;
+	dbFrame.height = fdpImage->height;
+	dbFrame.school_id = fdpImage->school_id;
+	dbFrame._faceDB = _faceDB;
+
+	PipeMaterial pm;
+	pm.type = PipeMaterial::PMT_FRAME;
+	pm.buffer = &dbFrame;
+	pm.buffSize = 0;
+
+	PipeLineElem* plElem = pipeLine->pipe(&pm);
+	if (! pipeLine->check_pipe_complete(plElem))
+	{
+		LOG_WARN << "pipeline not complete" << std::endl;
+		ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR);
+		return false;
+	}
+	
+	if (!plElem->gain(pm))
+	{
+		LOG_WARN << "pipeline gain error" << std::endl;
+		ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR);
+		return false;
+	}
+	
+	// can not release pipleline unless pm not used
+	send_SensetimeFaceDetectResult(client, pm);
+
+	return false;
+}
+
+bool ev_proc(EVClientStub& client)
+{
+	EVPHeader* evpHeader = (EVPHeader*)client.recvBuff;
+	if (evpHeader->size != client.recvBuffSize)
+	{
+		LOG_WARN << "Truncated buffer " << (evpHeader->size - client.recvBuffSize) << " bytes" << std::endl;
+		return false;
+	}
+
+	switch(evpHeader->cmd)
+	{
+	case EVPCommand::EVPC_USER_DEFINE + 1:
+		return ev_proc_SensetimeFaceDetect(client);
+	break;
+	default:
+		LOG_WARN << "Unknown command" << std::endl;
+		ev_send_status_packet(client, EVPStatus::EVPS_PARAMETER_ERROR);
+		return false;
+	break;
+	}
+	
+	// return false to disconnect
+	return false;
+}
+
+int main(int argc, char** argv)
+{
+	initLogger(LV_DEBUG);
+	
+	PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack);
+	PipeLine::register_global_elem_creator("PL_Gainer", create_PL_Gainer);
+	
+	g_PipeLinePool = new PipeLinePool(true);
+	
+	for (int i = 0; i < 5; i++)
+	{
+		PipeLine* pipeLine = new PipeLine;
+
+		{
+			PL_Payer_Config config;
+			config.copyData = true;//#todo false
+			PL_Gainer* ple = (PL_Gainer*)pipeLine->push_elem("PL_Gainer");
+			bool ret = ple->init(&config);
+			if (!ret)
+			{
+				LOG_ERROR << "ple init error" << std::endl;
+				exit(EXIT_FAILURE);
+			}
+		}
+		
+		{
+			SensetimeFaceTrackConfig config;
+			config.draw_face_rect = false;
+			config.draw_face_feature_point = false;
+			config.generate_face_feature = true;
+			PL_SensetimeFaceTrack* ple = (PL_SensetimeFaceTrack*)pipeLine->push_elem("PL_SensetimeFaceTrack");
+			bool ret = ple->init(&config);
+			if (!ret)
+			{
+				LOG_ERROR << "ple init error" << std::endl;
+				exit(EXIT_FAILURE);
+			}
+		}
+		
+		g_PipeLinePool.manage(pipeLine);
+	}
+	
+	evclient_proc = ev_proc;
+	return server_main(argc, argv);
+}

--
Gitblit v1.8.0