From 4a09037a212df66c170ee494907b5a746c01ba26 Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期二, 18 四月 2017 18:51:05 +0800 Subject: [PATCH] --- FaceServer/main_face_daemon.cpp | 233 +++++++++++++++++++++ FaceServer/ev_server.cpp | 302 +++++++++++++++++++++++++++ FaceServer/ev_proto.h | 56 +++++ FaceServer/ev_server.h | 48 ++++ 4 files changed, 639 insertions(+), 0 deletions(-) diff --git a/FaceServer/ev_proto.h b/FaceServer/ev_proto.h new file mode 100644 index 0000000..6dc27be --- /dev/null +++ b/FaceServer/ev_proto.h @@ -0,0 +1,56 @@ +#ifndef _EV_PROTO_H_ +#define _EV_PROTO_H_ + +#include <stddef.h> +#include <stdint.h> + +#pragma pack(1) + +struct EVPCommand +{ + enum EVPC + { + EVPC__FIRST, + EVPC_STATUS = 1, + EVPC_USER_DEFINE = 128, + EVPC__LAST + }; +}; + +struct EVPStatus +{ + enum EVPS + { + EVPS__FIRST, + EVPS_OK = 1, + EVPS_ERROR = 128, + EVPS_INTERNAL_ERROR, + EVPS_PARAMETER_ERROR, + EVPS__LAST + }; +}; + +struct EVPHeader +{ + int16_t cmd; // EVPCommand::EVPC + uint32_t size; // sizeof(EVPHeader)+sizeof(subcmd) +}; + +struct EVP_Status +{ + int16_t status; +}; + +struct EVP_VariableBuffer +{ + int16_t type; + uint8_t buff[0]; +}; + +//#todo +template<typename TPacket> +void endian_convert(TPacket& packet); + +#pragma pack() + +#endif diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp new file mode 100644 index 0000000..2f423a2 --- /dev/null +++ b/FaceServer/ev_server.cpp @@ -0,0 +1,302 @@ +#include "ev_server.h" +#include "ev_proto.h" +#include "logger.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <sys/time.h> + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include <err.h> + +#include <event.h> + +// A struct for client specific data, also includes pointer to create a list of clients. +struct EVClient +{ + // The clients socket. + int fd; + + // The bufferedevent for this client. + struct bufferevent *buf_ev; + + uint8_t* recvbuff; + size_t recvbuff_end; + size_t recvbuff_max; + size_t read_times; // read times for a command + + evclient_proc_t proc; + + EVClient() : + fd(-1), buf_ev(nullptr), + recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), + proc(nullptr) + { } +}; + +#ifndef USER_DEFINE_EVCLIENT_PROC +evclient_proc_t evclient_proc = nullptr; +#endif + +// Set a socket to non-blocking mode. +static int setnonblock(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFL); + if (flags < 0) + return flags; + flags |= O_NONBLOCK; + if (fcntl(fd, F_SETFL, flags) < 0) + return -1; + + return 0; +} + +// Called by libevent when there is an error on the underlying socket descriptor. +static void buffered_on_error(struct bufferevent *bufev, short what, void *arg) +{ + struct EVClient *client = (struct EVClient *)arg; + + if (what & EVBUFFER_EOF) + { + //Client disconnected, remove the read event and the free the client structure. + LOG_INFO << "Client disconnected." << std::endl; + } + else + { + LOG_WARN << "Client socket error, disconnecting." << std::endl; + } + bufferevent_free(client->buf_ev); + close(client->fd); + delete[] client->recvbuff; + delete client; +} + +// Called by libevent when there is data to read. +static void buffered_on_read(struct bufferevent *bufev, void *arg) +{ + struct EVClient *client = (struct EVClient *)arg; + + // Write back the read buffer. It is important to note that + // bufferevent_write_buffer will drain the incoming data so it + // is effectively gone after we call it. + //LOG_DEBUG << (char*)bufev->input << std::endl; + //bufferevent_write_buffer(bufev, bufev->input); + + //char buff[100] = {'\0'}; + //size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); + //LOG_DEBUG << "readSize=" << readSize << "\t" << buff << std::endl; + + EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; + + // read header if expected + { + uint8_t headerBuff[sizeof(EVPHeader)]; + if (client->recvbuff_end == 0) + { + size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); + client->read_times = 1; + if (readSize != sizeof(headerBuff)) + { + LOG_WARN << "client send incomplete header" << std::endl; + buffered_on_error(bufev, 0, arg); + return; + } + + evpHeader = (EVPHeader*)headerBuff; + + // check header + if (evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || + evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) + { + LOG_WARN << "client send invalid header" << std::endl; + buffered_on_error(bufev, 0, arg); + return; + } + + if (client->recvbuff_max < evpHeader->size) + { + delete[] client->recvbuff; + client->recvbuff = nullptr; + client->recvbuff_max = 0; + } + + if (client->recvbuff == nullptr) + { + uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; + client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); + client->recvbuff = new uint8_t[client->recvbuff_max]; + } + + memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); + client->recvbuff_end = sizeof(headerBuff); + } + } + + // read sub command or data + size_t readSize = 0; + do + { + readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end); + client->read_times++; + if (readSize == 0) + break; + + client->recvbuff_end += readSize; + } while (readSize > 0); + + // test if command complete + if (evpHeader->size == client->recvbuff_end) + { + // call client proc + bool closeClient = true; + if (client->proc != nullptr) + { + EVClientStub cs(client->recvbuff, client->recvbuff_end); + cs.id = client->fd; + closeClient = !(client->proc(cs)); + + if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) + { + size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize); + if (writeSize != cs.sendBuffSize) + LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << std::endl; + + if (cs.deleteSendBuff) + delete[] cs.sendBuff; + } + } + + if (closeClient) + { + LOG_DEBUG << "server initiative close" << std::endl; + buffered_on_error(bufev, 0, arg); + } + + // reset client + client->recvbuff_end = 0; + client->read_times = 0; + } + + // check read times + if (client->read_times > CLIENT_READ_TIMES_MAX) + { + LOG_WARN << "client read times to max" << std::endl; + buffered_on_error(bufev, 0, arg); + } +} + +// Called by libevent when the write buffer reaches 0. +// We only provide this because libevent expects it, but we don't use it. +static void buffered_on_write(struct bufferevent *bufev, void *arg) +{ +} + +// This function will be called by libevent when there is a connection ready to be accepted. +static void on_accept(int fd, short ev, void *arg) +{ + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); + if (client_fd < 0) + { + LOG_WARN << "accept failed" << std::endl; + return; + } + + // Set the client socket to non-blocking mode. + if (setnonblock(client_fd) < 0) + LOG_WARN << "failed to set client socket non-blocking" << std::endl; + + // We've accepted a new client, create a client object. + struct EVClient* client = new EVClient; + if (client == NULL) + { + LOG_ERROR << "malloc failed" << std::endl; + } + client->fd = client_fd; + client->proc = evclient_proc; + + // Create the buffered event. + client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client); + + // We have to enable it before our callbacks will be called. + bufferevent_enable(client->buf_ev, EV_READ); + + LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << std::endl; +} + +int server_main(int argc, char **argv) +{ + //initLogger(LV_DEBUG); + + // Initialize libevent. + event_init(); + + // Create our listening socket. + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + { + LOG_ERROR << "create socket failed" << std::endl; + return EXIT_FAILURE; + } + + struct sockaddr_in listen_addr; + memset(&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = INADDR_ANY; + listen_addr.sin_port = htons(SERVER_PORT); + if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) + { + LOG_ERROR << "bind failed" << std::endl; + return EXIT_FAILURE; + } + + if (listen(listen_fd, 5) < 0) + { + LOG_ERROR << "listen failed" << std::endl; + return EXIT_FAILURE; + } + + // Set the socket to non-blocking, this is essential in event based programming with libevent. + int reuseaddr_on = REUSEADDR_ON; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); + if (setnonblock(listen_fd) < 0) + { + LOG_ERROR << "failed to set server socket to non-blocking" << std::endl; + return EXIT_FAILURE; + } + + // We now have a listening socket, we create a read event to be notified when a client connects. + struct event ev_accept; + event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); + event_add(&ev_accept, NULL); + + // Start the event loop. + event_dispatch(); + + return EXIT_SUCCESS; +} + +void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status) +{ + client.sendBuffSize = sizeof(EVPHeader)+sizeof(EVP_Status); + client.sendBuff = new uint8_t[client.sendBuffSize]; + client.deleteSendBuff = true; + + EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; + evpHeader->cmd = EVPCommand::EVPC_STATUS; + evpHeader->size = client.sendBuffSize; + + EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status; + evpStatus->status = status; +} diff --git a/FaceServer/ev_server.h b/FaceServer/ev_server.h new file mode 100644 index 0000000..75749b5 --- /dev/null +++ b/FaceServer/ev_server.h @@ -0,0 +1,48 @@ +#ifndef _EV_SERVER_H_ +#define _EV_SERVER_H_ + +#include <stddef.h> +#include <stdint.h> +#include "ev_proto.h" + +#define SERVER_PORT 5432 +#define REUSEADDR_ON 1 +#define CLIENT_BUFFER_MAX 100*1024 // 100KB +#define CLIENT_READ_TIMES_MAX 100 +#define CLIENT_MAX 100 // max count of clients connected //#todo not support + +struct EVClientStub +{ + int id; + const uint8_t* recvBuff; + const size_t recvBuffSize; + uint8_t* sendBuff; + size_t sendBuffSize; + bool deleteSendBuff; + + EVClientStub() : + id(-1), + recvBuff(nullptr), recvBuffSize(0), + sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false) + { + } + + EVClientStub(const uint8_t* _recvBuff, size_t _recvBuffSize) : + id(-1), + recvBuff(_recvBuff), recvBuffSize(_recvBuffSize), + sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false) + { + } +}; + +typedef bool (*evclient_proc_t)(EVClientStub& client); +extern evclient_proc_t evclient_proc; + +//#define USER_DEFINE_EVCLIENT_PROC + +int server_main(int argc, char **argv); + +void ev_send_packet(EVClientStub& client); +void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status); + +#endif diff --git a/FaceServer/main_face_daemon.cpp b/FaceServer/main_face_daemon.cpp new file mode 100644 index 0000000..9058ee4 --- /dev/null +++ b/FaceServer/main_face_daemon.cpp @@ -0,0 +1,233 @@ +#include "PipeLine.h" +#include "MaterialBuffer.h" +#include "PL_RTSPClient.h" +#include "PL_RTSPServer.h" +#include "PL_H264Decoder.h" +#include "PL_H264Encoder.h" +#include "PL_AVFrameYUV420.h" +#include "PL_AVFrameBGRA.h" +#include "PL_Queue.h" +#include "PL_Scale.h" +#include "PL_Fork.h" +#include "PL_Payer.h" +#include "PL_Gainer.h" +#include "PL_SensetimeFaceTrack.h" +#include "PL_SensetimeFaceDetect.h" +#include "PL_DlibFaceTrack.h" + +#include "PipeLinePool.h" + +#include "ev_server.h" +#include "ev_proto.h" +#include "face_daemon_proto.h" + +#include "SensetimeFaceAPIWrapper/src/FaceDBPool.h" +#include "SensetimeFaceAPIWrapper/src/faceAPI.h" + +#include "logger.h" + +template<typename TPoolPtr, typename TPoolElem> +struct PoolElemLocker +{ + TPoolPtr pool; + TPoolElem elem; + PoolElemLocker(TPoolPtr _pool, TPoolElem _elem) : pool(_pool), elem(_elem) + { + + } + ~PoolElemLocker() + { + pool->release(elem); + pool->notify_free(); + } +}; + +template<typename TArrayPtr> +struct ArrayDeleter +{ + TArrayPtr array; + ArrayDeleter(TArrayPtr _array) : array(_array) + { + + } + ~ArrayDeleter() + { + delete[] array; + } +}; + +PipeLinePool g_PipeLinePool; + +FaceDBPool g_faceAPIPool;//#todo config + +evclient_proc_t evclient_proc; + +bool send_SensetimeFaceDetectResult(EVClientStub& client, PipeMaterial& lastPm) +{ + //if (lastPm.type != PipeMaterial::PMT_PM_LIST) + + //PipeMaterial& facePM = ((PipeMaterial*)(lastPm.buffer))[1]; + //st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer); + //LOG_NOTICE << "faceFeatures " << faceFeatures.size() << std::endl; + //#todo send result packet + + if (lastPm.buffer == nullptr || lastPm.type != PipeMaterial::PMT_BYTES || lastPm.buffSize != sizeof(SensetimeFaceDetectResult)) + { + LOG_WARN << "pm not available" << std::endl; + ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); + return false; + } + + const SensetimeFaceDetectResult* result = (const SensetimeFaceDetectResult*)lastPm.buffer; + + client.sendBuffSize = sizeof(EVPHeader)+sizeof(SensetimeFaceDetectResult); + client.sendBuff = new uint8_t[client.sendBuffSize]; + client.deleteSendBuff = true; + + EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; + evpHeader->cmd = FaceDaemonCommand::FDC_SENSETIMEFACEDETECT_RESULT; + evpHeader->size = client.sendBuffSize; + + SensetimeFaceDetectResult* evpSub = new (client.sendBuff + sizeof(EVPHeader)) SensetimeFaceDetectResult; + evpSub->school_id = result->school_id; + evpSub->st_id = result->st_id; + + return true; +} + +bool ev_proc_SensetimeFaceDetect(EVClientStub& client) +{ + //#test send 01000B0000004142434445 + //LOG_DEBUG << "cmd=" << evpHeader->cmd << ", size=" << evpHeader->size << ", \t" << (char*)(evpHeader + sizeof(EVPHeader)) << std::endl; + //return true; + + FDP_Image* fdpImage = (FDP_Image*)(client.recvBuff + sizeof(EVPHeader)); + + FaceDB* _faceDB = g_faceAPIPool.get_free(fdpImage->school_id); + if (_faceDB == nullptr) + { + LOG_WARN << "can't get face db" << std::endl; + ev_send_status_packet(client, EVPStatus::EVPS_PARAMETER_ERROR); + return false; + } + + PoolElemLocker<FaceDBPool*, int> _lock_faceAPI(&g_faceAPIPool, fdpImage->school_id); + + PipeLine* pipeLine = g_PipeLinePool.get_free(); + if (pipeLine == nullptr) + { + LOG_WARN << "can't get free pipeline" << std::endl; + ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); + return false; + } + + PoolElemLocker<PipeLinePool*, PipeLine*> _lock_pipeLine(&g_PipeLinePool, pipeLine); + + // fill + SensetimeFaceDetectDbFrame dbFrame; + dbFrame.type = (MB_Frame::MBFType)(fdpImage->mb_type); + dbFrame.buffSize = client.recvBuffSize - sizeof(EVPHeader) - sizeof(FDP_Image); + dbFrame.buffer = new uint8_t[dbFrame.buffSize]; + ArrayDeleter<uint8_t*> _del_img((uint8_t*)dbFrame.buffer); + memcpy(dbFrame.buffer, fdpImage->buff, dbFrame.buffSize); + dbFrame.width = fdpImage->width; + dbFrame.height = fdpImage->height; + dbFrame.school_id = fdpImage->school_id; + dbFrame._faceDB = _faceDB; + + PipeMaterial pm; + pm.type = PipeMaterial::PMT_FRAME; + pm.buffer = &dbFrame; + pm.buffSize = 0; + + PipeLineElem* plElem = pipeLine->pipe(&pm); + if (! pipeLine->check_pipe_complete(plElem)) + { + LOG_WARN << "pipeline not complete" << std::endl; + ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); + return false; + } + + if (!plElem->gain(pm)) + { + LOG_WARN << "pipeline gain error" << std::endl; + ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); + return false; + } + + // can not release pipleline unless pm not used + send_SensetimeFaceDetectResult(client, pm); + + return false; +} + +bool ev_proc(EVClientStub& client) +{ + EVPHeader* evpHeader = (EVPHeader*)client.recvBuff; + if (evpHeader->size != client.recvBuffSize) + { + LOG_WARN << "Truncated buffer " << (evpHeader->size - client.recvBuffSize) << " bytes" << std::endl; + return false; + } + + switch(evpHeader->cmd) + { + case EVPCommand::EVPC_USER_DEFINE + 1: + return ev_proc_SensetimeFaceDetect(client); + break; + default: + LOG_WARN << "Unknown command" << std::endl; + ev_send_status_packet(client, EVPStatus::EVPS_PARAMETER_ERROR); + return false; + break; + } + + // return false to disconnect + return false; +} + +int main(int argc, char** argv) +{ + initLogger(LV_DEBUG); + + PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack); + PipeLine::register_global_elem_creator("PL_Gainer", create_PL_Gainer); + + g_PipeLinePool = new PipeLinePool(true); + + for (int i = 0; i < 5; i++) + { + PipeLine* pipeLine = new PipeLine; + + { + PL_Payer_Config config; + config.copyData = true;//#todo false + PL_Gainer* ple = (PL_Gainer*)pipeLine->push_elem("PL_Gainer"); + bool ret = ple->init(&config); + if (!ret) + { + LOG_ERROR << "ple init error" << std::endl; + exit(EXIT_FAILURE); + } + } + + { + SensetimeFaceTrackConfig config; + config.draw_face_rect = false; + config.draw_face_feature_point = false; + config.generate_face_feature = true; + PL_SensetimeFaceTrack* ple = (PL_SensetimeFaceTrack*)pipeLine->push_elem("PL_SensetimeFaceTrack"); + bool ret = ple->init(&config); + if (!ret) + { + LOG_ERROR << "ple init error" << std::endl; + exit(EXIT_FAILURE); + } + } + + g_PipeLinePool.manage(pipeLine); + } + + evclient_proc = ev_proc; + return server_main(argc, argv); +} -- Gitblit v1.8.0