RtspFace/PipeLine.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/PipeLine.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/PipeLinePool.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/ev_proto.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/ev_server.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/ev_server.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/main_face_daemon.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
RtspFace/make.sh | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
RtspFace/PipeLine.cpp
@@ -102,6 +102,11 @@ return false; } bool PipeLine::check_pipe_complete(PipeLineElem* lastRetElem) const { return lastRetElem == *elems.rbegin(); } PipeLineElem* PipeLine::push_elem(const std::string& type) { elem_create_func_map_t::iterator iter = elem_create_func_map.find(type); RtspFace/PipeLine.h
@@ -83,6 +83,8 @@ void push_front_elem(PipeLineElem* elem); bool remove_elem(PipeLineElem* elem); bool check_pipe_complete(PipeLineElem* lastRetElem) const; // do pipe sync. returns the element who returns false, or the last one. // if false return, the element should deal with pm, clean up. PipeLineElem* pipe(PipeMaterial* pm = nullptr); RtspFace/PipeLinePool.cpp
@@ -87,10 +87,6 @@ PipeLine* PipeLinePool::get_free() { if (pipelines_free.empty()) { PLP_MUTEX_LOCK(pl_mutex, nullptr); } PLP_MUTEX_LOCK(tsafe_mutex, nullptr); @@ -118,6 +114,19 @@ pipelines_free.insert(pl); PLP_MUTEX_UNLOCK(tsafe_mutex,); } PLP_MUTEX_UNLOCK(pl_mutex,); bool PipeLinePool::wait_free() { if (pipelines_free.empty()) { PLP_MUTEX_LOCK(pl_mutex, false); } return true; } bool PipeLinePool::notify_free() { PLP_MUTEX_UNLOCK(pl_mutex, false); } RtspFace/ev_proto.h
New file @@ -0,0 +1,33 @@ #ifndef _EV_PROTO_H_ #define _EV_PROTO_H_ #include <stddef.h> #include <stdint.h> #pragma pack(1) struct EVPCommand { enum EVPC { EVPC__FIRST, EVPC_SENSETIMEFACEDETECT, EVPC__LAST, }; }; struct EVPHeader { int16_t cmd; // EVPCommand::EVPC uint32_t size; // sizeof(EVPHeader)+sizeof(subcmd) }; struct EVP_VariableBuffer { int16_t mb_type; // MB_Frame::MBFType uint8_t buff[0]; }; #pragma pack() #endif RtspFace/ev_server.cpp
New file @@ -0,0 +1,285 @@ #include "ev_server.h" #include "ev_proto.h" #include "logger.h" #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/time.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <err.h> #include <event.h> // A struct for client specific data, also includes pointer to create a list of clients. struct EVClient { // The clients socket. int fd; // The bufferedevent for this client. struct bufferevent *buf_ev; uint8_t* recvbuff; size_t recvbuff_end; size_t recvbuff_max; size_t read_times; // read times for a command evclient_proc_t proc; EVClient() : fd(-1), buf_ev(nullptr), recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), proc(nullptr) { } }; // Set a socket to non-blocking mode. static int setnonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } // Called by libevent when there is an error on the underlying socket descriptor. static void buffered_on_error(struct bufferevent *bufev, short what, void *arg) { struct EVClient *client = (struct EVClient *)arg; if (what & EVBUFFER_EOF) { //Client disconnected, remove the read event and the free the client structure. LOG_INFO << "Client disconnected."; } else { LOG_WARN << "Client socket error, disconnecting."; } bufferevent_free(client->buf_ev); close(client->fd); delete[] client->recvbuff; delete client; } // Called by libevent when there is data to read. static void buffered_on_read(struct bufferevent *bufev, void *arg) { struct EVClient *client = (struct EVClient *)arg; // Write back the read buffer. It is important to note that // bufferevent_write_buffer will drain the incoming data so it // is effectively gone after we call it. //LOG_DEBUG << (char*)bufev->input; //bufferevent_write_buffer(bufev, bufev->input); //char buff[100] = {'\0'}; //size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); //LOG_DEBUG << "readSize=" << readSize << "\t" << buff; EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; // read header if expected { uint8_t headerBuff[sizeof(EVPHeader)]; if (client->recvbuff_end == 0) { size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); client->read_times = 1; if (readSize != sizeof(headerBuff)) { LOG_WARN << "client send incomplete header"; buffered_on_error(bufev, 0, arg); return; } evpHeader = (EVPHeader*)headerBuff; // check header if (evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) { LOG_WARN << "client send invalid header"; buffered_on_error(bufev, 0, arg); return; } if (client->recvbuff_max < evpHeader->size) { delete[] client->recvbuff; client->recvbuff = nullptr; client->recvbuff_max = 0; } if (client->recvbuff == nullptr) { uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); client->recvbuff = new uint8_t[client->recvbuff_max]; } memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); client->recvbuff_end = sizeof(headerBuff); } } // read sub command or data size_t readSize = 0; do { readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end); client->read_times++; if (readSize == 0) break; client->recvbuff_end += readSize; } while (readSize > 0); // test if command complete if (evpHeader->size == client->recvbuff_end) { // call client proc bool closeClient = true; if (client->proc != nullptr) { EVClientStub cs; cs.id = client->fd; cs.recvBuff = client->recvbuff; cs.recvBuffSize = client->recvbuff_end; closeClient = !(client->proc(cs)); if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) { //#todo bufferevent_write if (cs.deleteSendBuff) delete[] cs.sendBuff; } } if (closeClient) { LOG_DEBUG << "server initiative close"; buffered_on_error(bufev, 0, arg); } // reset client client->recvbuff_end = 0; client->read_times = 0; } // check read times if (client->read_times > CLIENT_READ_TIMES_MAX) { LOG_WARN << "client read times to max"; buffered_on_error(bufev, 0, arg); } } // Called by libevent when the write buffer reaches 0. // We only provide this because libevent expects it, but we don't use it. static void buffered_on_write(struct bufferevent *bufev, void *arg) { } // This function will be called by libevent when there is a connection ready to be accepted. static void on_accept(int fd, short ev, void *arg) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); if (client_fd < 0) { LOG_WARN << "accept failed"; return; } // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) LOG_WARN << "failed to set client socket non-blocking"; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; if (client == NULL) { LOG_ERROR << "malloc failed"; } client->fd = client_fd; client->proc = evclient_proc; // Create the buffered event. client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client); // We have to enable it before our callbacks will be called. bufferevent_enable(client->buf_ev, EV_READ); LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr); } int server_main(int argc, char **argv) { //initLogger(LV_DEBUG); // Initialize libevent. event_init(); // Create our listening socket. int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { LOG_ERROR << "create socket failed"; return EXIT_FAILURE; } struct sockaddr_in listen_addr; memset(&listen_addr, 0, sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port = htons(SERVER_PORT); if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) { LOG_ERROR << "bind failed"; return EXIT_FAILURE; } if (listen(listen_fd, 5) < 0) { LOG_ERROR << "listen failed"; return EXIT_FAILURE; } // Set the socket to non-blocking, this is essential in event based programming with libevent. int reuseaddr_on = REUSEADDR_ON; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); if (setnonblock(listen_fd) < 0) { LOG_ERROR << "failed to set server socket to non-blocking"; return EXIT_FAILURE; } // We now have a listening socket, we create a read event to be notified when a client connects. struct event ev_accept; event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); event_add(&ev_accept, NULL); // Start the event loop. event_dispatch(); return EXIT_SUCCESS; } RtspFace/ev_server.h
New file @@ -0,0 +1,35 @@ #ifndef _EV_SERVER_H_ #define _EV_SERVER_H_ #include <stddef.h> #include <stdint.h> #define SERVER_PORT 5432 #define REUSEADDR_ON 1 #define CLIENT_BUFFER_MAX 100*1024 // 100KB #define CLIENT_READ_TIMES_MAX 100 #define CLIENT_MAX 100 // max count of clients connected //#todo not support struct EVClientStub { int id; const uint8_t* recvBuff; const size_t recvBuffSize; uint8_t* sendBuff; size_t sendBuffSize; bool deleteSendBuff; EVClientStub() : id(-1), recvBuff(nullptr), recvBuffSize(0), sendBuff(nullptr), sendBuffSize(0), deleteSendBuff(false) { } }; typedef bool (*evclient_proc_t)(EVClientStub& client); extern evclient_proc_t evclient_proc; int server_main(int argc, char **argv); #endif RtspFace/main_face_daemon.cpp
@@ -8,257 +8,105 @@ #include "PL_Queue.h" #include "PL_Scale.h" #include "PL_Fork.h" #include "PL_SensetimeFaceTrack.h" #include "PL_DlibFaceTrack.h" #include "PipeLinePool.h" #include "ev_server.h" #include "ev_proto.h" #include "logger.h" #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> PipeLinePool g_PipeLinePool; #include <sys/time.h> evclient_proc_t evclient_proc; #include <stdlib.h> #include <stdio.h> #include <string.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <err.h> #include <event.h> #define SERVER_PORT 5432 #define REUSEADDR_ON 1 // A struct for client specific data, also includes pointer to create a list of clients. struct EVClient { // The clients socket. int fd; // The bufferedevent for this client. struct bufferevent *buf_ev; EVClient() : fd(-1), buf_ev(nullptr) { } }; // Set a socket to non-blocking mode. int setnonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } // Called by libevent when there is data to read. void buffered_on_read(struct bufferevent *bufev, void *arg) { struct EVClient *client = (struct EVClient *)arg; // Write back the read buffer. It is important to note that // bufferevent_write_buffer will drain the incoming data so it // is effectively gone after we call it. //LOG_DEBUG << (char*)bufev->input; //bufferevent_write_buffer(bufev, bufev->input); char buff[100] = {'\0'}; size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); LOG_DEBUG << "readSize=" << readSize << "\t" << buff; } // Called by libevent when the write buffer reaches 0. // We only provide this because libevent expects it, but we don't use it. void buffered_on_write(struct bufferevent *bufev, void *arg) bool ev_proc_SensetimeFaceDetect(EVClientStub& client) { } // Called by libevent when there is an error on the underlying socket descriptor. void buffered_on_error(struct bufferevent *bufev, short what, void *arg) bool ev_proc(EVClientStub& client) { struct EVClient *client = (struct EVClient *)arg; EVPHeader* evpHeader = (EVPHeader*)client.recvBuff; //#todo check cmd and size if (what & EVBUFFER_EOF) //#test send 01000B0000004142434445 //LOG_DEBUG << "cmd=" << evpHeader->cmd << ", size=" << evpHeader->size << ", \t" << (char*)(evpHeader + sizeof(EVPHeader)); //return true; PipeLine* pipeLine = nullptr; if (g_PipeLinePool.wait_free()) pipeLine = g_PipeLinePool.get_free(); if (pipeLine == nullptr) { //Client disconnected, remove the read event and the free the client structure. LOG_INFO << "Client disconnected."; } else { LOG_WARN << "Client socket error, disconnecting."; } bufferevent_free(client->buf_ev); close(client->fd); delete client; LOG_WARN << "can't get free pipeline";//#todo send err packet return false; } // This function will be called by libevent when there is a connection ready to be accepted. void on_accept(int fd, short ev, void *arg) PipeMaterial pm; // fill PipeLineElem* plElem = pipeLine.pipe(&pm); if (! pipeLine.check_pipe_complete(plElem)) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); if (client_fd < 0) { LOG_WARN << "accept failed"; return; LOG_WARN << "pipeline not complete"; g_PipeLinePool.release(pipeLine);//#todo send err packet return false; } // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) LOG_WARN << "failed to set client socket non-blocking"; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; if (client == NULL) if (!plElem->gain(pm)) { LOG_ERROR << "malloc failed"; LOG_WARN << "pipeline gain error"; g_PipeLinePool.release(pipeLine);//#todo send err packet return false; } client->fd = client_fd; // Create the buffered event. client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client); if (pm.type == PipeMaterial::PMT_PM_LIST) { PipeMaterial& facePM = ((PipeMaterial*)(pm.buffer))[1]; st_ff_vect_t& faceFeatures = *((st_ff_vect_t*)facePM.buffer); LOG_NOTICE << "faceFeatures " << faceFeatures.size(); //#todo send result packet } // We have to enable it before our callbacks will be called. bufferevent_enable(client->buf_ev, EV_READ); LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr); g_PipeLinePool.release(pipeLine); return false; } int main(int argc, char **argv) { initLogger(LV_DEBUG); // Initialize libevent. event_init(); // Create our listening socket. int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { LOG_ERROR << "create socket failed"; return EXIT_FAILURE; } struct sockaddr_in listen_addr; memset(&listen_addr, 0, sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port = htons(SERVER_PORT); if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) { LOG_ERROR << "bind failed"; return EXIT_FAILURE; } if (listen(listen_fd, 5) < 0) { LOG_ERROR << "listen failed"; return EXIT_FAILURE; } // Set the socket to non-blocking, this is essential in event based programming with libevent. int reuseaddr_on = REUSEADDR_ON; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); if (setnonblock(listen_fd) < 0) { LOG_ERROR << "failed to set server socket to non-blocking"; return EXIT_FAILURE; } // We now have a listening socket, we create a read event to be notified when a client connects. struct event ev_accept; event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); event_add(&ev_accept, NULL); // Start the event loop. event_dispatch(); return EXIT_SUCCESS; } int _main(int argc, char** argv) { initLogger(LV_DEBUG); PipeLine pipeLine; PipeLine::register_global_elem_creator("PL_SensetimeFaceTrack", create_PL_SensetimeFaceTrack); g_PipeLinePool = new PipeLinePool(true); for (int i = 0; i < 5; i++) { PL_RTSPClient* rtspClient = (PL_RTSPClient*)pipeLine.push_elem("PL_RTSPClient"); PL_RTSPClient_Config rtspConfig; rtspConfig.progName = argv[0]; rtspConfig.rtspURL = argv[1]; rtspConfig.aux = true; // ffmpeg need aux, but live555 not rtspConfig.verbosityLevel = 1; rtspConfig.tunnelOverHTTPPortNum = 0; rtspConfig.args = nullptr; bool ret = rtspClient->init(&rtspConfig); if (!ret) { LOG_ERROR << "rtspClient.init error"; exit(EXIT_FAILURE); } PipeLine* pipeLine = new PipeLine; {//payer//#todo } { PL_H264Decoder* h264Decoder = (PL_H264Decoder*)pipeLine.push_elem("PL_H264Decoder"); bool ret = h264Decoder->init(nullptr); if (!ret) { LOG_ERROR << "PL_H264Decoder.init error"; exit(EXIT_FAILURE); } } { PL_AVFrameYUV420* avFrameYUV420 = (PL_AVFrameYUV420*)pipeLine.push_elem("PL_AVFrameYUV420"); bool ret = avFrameYUV420->init(nullptr); if (!ret) { LOG_ERROR << "PL_AVFrameYUV420.init error"; exit(EXIT_FAILURE); } } { PL_Scale_Config config; config.toWidth = 800; config.toHeight = 600; PL_Scale* ple = (PL_Scale*)pipeLine.push_elem("PL_Scale"); bool ret = ple->init(&config); if (!ret) { LOG_ERROR << "PL_Scale.init error"; exit(EXIT_FAILURE); } } PL_SensetimeFaceTrack* sensetimeFaceTrack; { SensetimeFaceTrackConfig config; config.generate_face_feature = true; sensetimeFaceTrack = (PL_SensetimeFaceTrack*)pipeLine.push_elem("PL_SensetimeFaceTrack"); sensetimeFaceTrack->init(&config); PL_SensetimeFaceTrack* sensetimeFaceTrack = (PL_SensetimeFaceTrack*)pipeLine->push_elem("PL_SensetimeFaceTrack"); bool ret = sensetimeFaceTrack->init(&config); if (!ret) { LOG_ERROR << "sensetimeFaceTrack init error"; exit(EXIT_FAILURE); } } g_PipeLinePool.manage(pipeLine); } evclient_proc = ev_proc; return server_main(argc, argv); while(true) { RtspFace/make.sh
@@ -49,6 +49,7 @@ #g++ main.cpp $CFLAGS $CPPFLAGS -o main.o #g++ main_dump_st_face.cpp $CFLAGS $CPPFLAGS -o main.o g++ main_face_daemon.cpp $CFLAGS $CPPFLAGS -o main.o g++ face_daemon_server.cpp $CFLAGS $CPPFLAGS g++ PipeLine.cpp $CFLAGS $CPPFLAGS g++ PipeLinePool.cpp $CFLAGS $CPPFLAGS @@ -72,6 +73,7 @@ g++ -g -std=c++11 \ main.o PipeLine.o PipeLinePool.o \ face_daemon_server.o \ PL_RTSPClient.o PL_H264Decoder.o PL_H264Encoder.o PL_AVFrameYUV420.o PL_AVFrameBGRA.o PL_Queue.o PL_Scale.o PL_Fork.o \ PL_SensetimeFaceTrack.o \ PL_DlibFaceTrack.o \