FaceServer/ev_proto.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
FaceServer/ev_server.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
FaceServer/ev_server.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
FaceServer/main_face_daemon.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
FaceServer/ev_proto.h
New file @@ -0,0 +1,56 @@ #ifndef _EV_PROTO_H_ #define _EV_PROTO_H_ #include <stddef.h> #include <stdint.h> #pragma pack(1) struct EVPCommand { enum EVPC { EVPC__FIRST, EVPC_STATUS = 1, EVPC_USER_DEFINE = 128, EVPC__LAST }; }; struct EVPStatus { enum EVPS { EVPS__FIRST, EVPS_OK = 1, EVPS_ERROR = 128, EVPS_INTERNAL_ERROR, EVPS_PARAMETER_ERROR, EVPS__LAST }; }; struct EVPHeader { int16_t cmd; // EVPCommand::EVPC uint32_t size; // sizeof(EVPHeader)+sizeof(subcmd) }; struct EVP_Status { int16_t status; }; struct EVP_VariableBuffer { int16_t type; uint8_t buff[0]; }; //#todo template<typename TPacket> void endian_convert(TPacket& packet); #pragma pack() #endif FaceServer/ev_server.cpp
New file @@ -0,0 +1,302 @@ #include "ev_server.h" #include "ev_proto.h" #include "logger.h" #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/time.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <err.h> #include <event.h> // A struct for client specific data, also includes pointer to create a list of clients. struct EVClient { // The clients socket. int fd; // The bufferedevent for this client. struct bufferevent *buf_ev; uint8_t* recvbuff; size_t recvbuff_end; size_t recvbuff_max; size_t read_times; // read times for a command evclient_proc_t proc; EVClient() : fd(-1), buf_ev(nullptr), recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), proc(nullptr) { } }; #ifndef USER_DEFINE_EVCLIENT_PROC evclient_proc_t evclient_proc = nullptr; #endif // Set a socket to non-blocking mode. static int setnonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } // Called by libevent when there is an error on the underlying socket descriptor. static void buffered_on_error(struct bufferevent *bufev, short what, void *arg) { struct EVClient *client = (struct EVClient *)arg; if (what & EVBUFFER_EOF) { //Client disconnected, remove the read event and the free the client structure. LOG_INFO << "Client disconnected." << std::endl; } else { LOG_WARN << "Client socket error, disconnecting." << std::endl; } bufferevent_free(client->buf_ev); close(client->fd); delete[] client->recvbuff; delete client; } // Called by libevent when there is data to read. static void buffered_on_read(struct bufferevent *bufev, void *arg) { struct EVClient *client = (struct EVClient *)arg; // Write back the read buffer. It is important to note that // bufferevent_write_buffer will drain the incoming data so it // is effectively gone after we call it. //LOG_DEBUG << (char*)bufev->input << std::endl; //bufferevent_write_buffer(bufev, bufev->input); //char buff[100] = {'\0'}; //size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); //LOG_DEBUG << "readSize=" << readSize << "\t" << buff << std::endl; EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; // read header if expected { uint8_t headerBuff[sizeof(EVPHeader)]; if (client->recvbuff_end == 0) { size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); client->read_times = 1; if (readSize != sizeof(headerBuff)) { LOG_WARN << "client send incomplete header" << std::endl; buffered_on_error(bufev, 0, arg); return; } evpHeader = (EVPHeader*)headerBuff; // check header if (evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) { LOG_WARN << "client send invalid header" << std::endl; buffered_on_error(bufev, 0, arg); return; } if (client->recvbuff_max < evpHeader->size) { delete[] client->recvbuff; client->recvbuff = nullptr; client->recvbuff_max = 0; } if (client->recvbuff == nullptr) { uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); client->recvbuff = new uint8_t[client->recvbuff_max]; } memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); client->recvbuff_end = sizeof(headerBuff); } } // read sub command or data size_t readSize = 0; do { readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end); client->read_times++; if (readSize == 0) break; client->recvbuff_end += readSize; } while (readSize > 0); // test if command complete if (evpHeader->size == client->recvbuff_end) { // call client proc bool closeClient = true; if (client->proc != nullptr) { EVClientStub cs(client->recvbuff, client->recvbuff_end); cs.id = client->fd; closeClient = !(client->proc(cs)); if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) { size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize); if (writeSize != cs.sendBuffSize) LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << std::endl; if (cs.deleteSendBuff) delete[] cs.sendBuff; } } if (closeClient) { LOG_DEBUG << "server initiative close" << std::endl; buffered_on_error(bufev, 0, arg); } // reset client client->recvbuff_end = 0; client->read_times = 0; } // check read times if (client->read_times > CLIENT_READ_TIMES_MAX) { LOG_WARN << "client read times to max" << std::endl; buffered_on_error(bufev, 0, arg); } } // Called by libevent when the write buffer reaches 0. // We only provide this because libevent expects it, but we don't use it. static void buffered_on_write(struct bufferevent *bufev, void *arg) { } // This function will be called by libevent when there is a connection ready to be accepted. static void on_accept(int fd, short ev, void *arg) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); if (client_fd < 0) { LOG_WARN << "accept failed" << std::endl; return; } // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) LOG_WARN << "failed to set client socket non-blocking" << std::endl; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; if (client == NULL) { LOG_ERROR << "malloc failed" << std::endl; } client->fd = client_fd; client->proc = evclient_proc; // Create the buffered event. client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client); // We have to enable it before our callbacks will be called. bufferevent_enable(client->buf_ev, EV_READ); LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << std::endl; } int server_main(int argc, char **argv) { //initLogger(LV_DEBUG); // Initialize libevent. event_init(); // Create our listening socket. int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { LOG_ERROR << "create socket failed" << std::endl; return EXIT_FAILURE; } struct sockaddr_in listen_addr; memset(&listen_addr, 0, sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port = htons(SERVER_PORT); if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) { LOG_ERROR << "bind failed" << std::endl; return EXIT_FAILURE; } if (listen(listen_fd, 5) < 0) { LOG_ERROR << "listen failed" << std::endl; return EXIT_FAILURE; } // Set the socket to non-blocking, this is essential in event based programming with libevent. int reuseaddr_on = REUSEADDR_ON; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); if (setnonblock(listen_fd) < 0) { LOG_ERROR << "failed to set server socket to non-blocking" << std::endl; return EXIT_FAILURE; } // We now have a listening socket, we create a read event to be notified when a client connects. struct event ev_accept; event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); event_add(&ev_accept, NULL); // Start the event loop. event_dispatch(); return EXIT_SUCCESS; } void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status) { client.sendBuffSize = sizeof(EVPHeader)+sizeof(EVP_Status); client.sendBuff = new uint8_t[client.sendBuffSize]; client.deleteSendBuff = true; EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; evpHeader->cmd = EVPCommand::EVPC_STATUS; evpHeader->size = client.sendBuffSize; EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status; evpStatus->status = status; } FaceServer/ev_server.h
New file @@ -0,0 +1,48 @@ #ifndef _EV_SERVER_H_ #define _EV_SERVER_H_ #include <stddef.h> #include <stdint.h> #include "ev_proto.h" #define SERVER_PORT 5432 #define REUSEADDR_ON 1 #define CLIENT_BUFFER_MAX 100*1024 // 100KB #define CLIENT_READ_TIMES_MAX 100 #define CLIENT_MAX 100 // max count of clients connected //#todo not support struct EVClientStub { int id; const uint8_t* recvBuff; const size_t recvBuffSize; uint8_t* sendBuff; size_t sendBuffSize; bool deleteSendBuff; EVClientStub() : id(-1), recvBuff(nullptr), recvBuffSize(0), sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false) { } EVClientStub(const uint8_t* _recvBuff, size_t _recvBuffSize) : id(-1), recvBuff(_recvBuff), recvBuffSize(_recvBuffSize), sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false) { } }; typedef bool (*evclient_proc_t)(EVClientStub& client); extern evclient_proc_t evclient_proc; //#define USER_DEFINE_EVCLIENT_PROC int server_main(int argc, char **argv); void ev_send_packet(EVClientStub& client); void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status); #endif FaceServer/main_face_daemon.cpp
New file @@ -0,0 +1,233 @@ #include "PipeLine.h" #include "MaterialBuffer.h" #include "PL_RTSPClient.h" #include "PL_RTSPServer.h" #include "PL_H264Decoder.h" #include "PL_H264Encoder.h" #include "PL_AVFrameYUV420.h" #include "PL_AVFrameBGRA.h" #include "PL_Queue.h" #include "PL_Scale.h" #include "PL_Fork.h" #include "PL_Payer.h" #include "PL_Gainer.h" #include "PL_SensetimeFaceTrack.h" #include "PL_SensetimeFaceDetect.h" #include "PL_DlibFaceTrack.h" #include "PipeLinePool.h" #include "ev_server.h" #include "ev_proto.h" #include "face_daemon_proto.h" #include "SensetimeFaceAPIWrapper/src/FaceDBPool.h" #include "SensetimeFaceAPIWrapper/src/faceAPI.h" #include "logger.h" template<typename TPoolPtr, typename TPoolElem> struct PoolElemLocker { TPoolPtr pool; TPoolElem elem; PoolElemLocker(TPoolPtr _pool, TPoolElem _elem) : pool(_pool), elem(_elem) { } ~PoolElemLocker() { pool->release(elem); pool->notify_free(); } }; template<typename TArrayPtr> struct ArrayDeleter { TArrayPtr array; ArrayDeleter(TArrayPtr _array) : array(_array) { } ~ArrayDeleter() { delete[] array; } }; PipeLinePool g_PipeLinePool; FaceDBPool g_faceAPIPool;//#todo config evclient_proc_t evclient_proc; bool send_SensetimeFaceDetectResult(EVClientStub& client, PipeMaterial& lastPm) { //if (lastPm.type != PipeMaterial::PMT_PM_LIST) //PipeMaterial& facePM = ((PipeMaterial*)(lastPm.buffer))[1]; //st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer); //LOG_NOTICE << "faceFeatures " << faceFeatures.size() << std::endl; //#todo send result packet if (lastPm.buffer == nullptr || lastPm.type != PipeMaterial::PMT_BYTES || lastPm.buffSize != sizeof(SensetimeFaceDetectResult)) { LOG_WARN << "pm not available" << std::endl; ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); return false; } const SensetimeFaceDetectResult* result = (const SensetimeFaceDetectResult*)lastPm.buffer; client.sendBuffSize = sizeof(EVPHeader)+sizeof(SensetimeFaceDetectResult); client.sendBuff = new uint8_t[client.sendBuffSize]; client.deleteSendBuff = true; EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; evpHeader->cmd = FaceDaemonCommand::FDC_SENSETIMEFACEDETECT_RESULT; evpHeader->size = client.sendBuffSize; SensetimeFaceDetectResult* evpSub = new (client.sendBuff + sizeof(EVPHeader)) SensetimeFaceDetectResult; evpSub->school_id = result->school_id; evpSub->st_id = result->st_id; return true; } bool ev_proc_SensetimeFaceDetect(EVClientStub& client) { //#test send 01000B0000004142434445 //LOG_DEBUG << "cmd=" << evpHeader->cmd << ", size=" << evpHeader->size << ", \t" << (char*)(evpHeader + sizeof(EVPHeader)) << std::endl; //return true; FDP_Image* fdpImage = (FDP_Image*)(client.recvBuff + sizeof(EVPHeader)); FaceDB* _faceDB = g_faceAPIPool.get_free(fdpImage->school_id); if (_faceDB == nullptr) { LOG_WARN << "can't get face db" << std::endl; ev_send_status_packet(client, EVPStatus::EVPS_PARAMETER_ERROR); return false; } PoolElemLocker<FaceDBPool*, int> _lock_faceAPI(&g_faceAPIPool, fdpImage->school_id); PipeLine* pipeLine = g_PipeLinePool.get_free(); if (pipeLine == nullptr) { LOG_WARN << "can't get free pipeline" << std::endl; ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); return false; } PoolElemLocker<PipeLinePool*, PipeLine*> _lock_pipeLine(&g_PipeLinePool, pipeLine); // fill SensetimeFaceDetectDbFrame dbFrame; dbFrame.type = (MB_Frame::MBFType)(fdpImage->mb_type); dbFrame.buffSize = client.recvBuffSize - sizeof(EVPHeader) - sizeof(FDP_Image); dbFrame.buffer = new uint8_t[dbFrame.buffSize]; ArrayDeleter<uint8_t*> _del_img((uint8_t*)dbFrame.buffer); memcpy(dbFrame.buffer, fdpImage->buff, dbFrame.buffSize); dbFrame.width = fdpImage->width; dbFrame.height = fdpImage->height; dbFrame.school_id = fdpImage->school_id; dbFrame._faceDB = _faceDB; PipeMaterial pm; pm.type = PipeMaterial::PMT_FRAME; pm.buffer = &dbFrame; pm.buffSize = 0; PipeLineElem* plElem = pipeLine->pipe(&pm); if (! pipeLine->check_pipe_complete(plElem)) { LOG_WARN << "pipeline not complete" << std::endl; ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); return false; } if (!plElem->gain(pm)) { LOG_WARN << "pipeline gain error" << std::endl; ev_send_status_packet(client, EVPStatus::EVPS_INTERNAL_ERROR); return false; } // can not release pipleline unless pm not used send_SensetimeFaceDetectResult(client, pm); return false; } bool ev_proc(EVClientStub& client) { EVPHeader* evpHeader = (EVPHeader*)client.recvBuff; if (evpHeader->size != client.recvBuffSize) { LOG_WARN << "Truncated buffer " << (evpHeader->size - client.recvBuffSize) << " bytes" << std::endl; return false; } switch(evpHeader->cmd) { case EVPCommand::EVPC_USER_DEFINE + 1: return ev_proc_SensetimeFaceDetect(client); break; default: LOG_WARN << "Unknown command" << std::endl; ev_send_status_packet(client, EVPStatus::EVPS_PARAMETER_ERROR); return false; break; } // return false to disconnect return false; } int main(int argc, char** argv) { initLogger(LV_DEBUG); PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack); PipeLine::register_global_elem_creator("PL_Gainer", create_PL_Gainer); g_PipeLinePool = new PipeLinePool(true); for (int i = 0; i < 5; i++) { PipeLine* pipeLine = new PipeLine; { PL_Payer_Config config; config.copyData = true;//#todo false PL_Gainer* ple = (PL_Gainer*)pipeLine->push_elem("PL_Gainer"); bool ret = ple->init(&config); if (!ret) { LOG_ERROR << "ple init error" << std::endl; exit(EXIT_FAILURE); } } { SensetimeFaceTrackConfig config; config.draw_face_rect = false; config.draw_face_feature_point = false; config.generate_face_feature = true; PL_SensetimeFaceTrack* ple = (PL_SensetimeFaceTrack*)pipeLine->push_elem("PL_SensetimeFaceTrack"); bool ret = ple->init(&config); if (!ret) { LOG_ERROR << "ple init error" << std::endl; exit(EXIT_FAILURE); } } g_PipeLinePool.manage(pipeLine); } evclient_proc = ev_proc; return server_main(argc, argv); }