From 109ffe9a777658936a38d0c146579a67c60a0d17 Mon Sep 17 00:00:00 2001 From: xuxiuxi <xuxiuxi@454eff88-639b-444f-9e54-f578c98de674> Date: 星期四, 11 五月 2017 17:48:48 +0800 Subject: [PATCH] --- FaceServer/ev_server.cpp | 200 ++++++++++++++++++++++++++++++++----------------- 1 files changed, 130 insertions(+), 70 deletions(-) diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp index 2f423a2..5ba4372 100644 --- a/FaceServer/ev_server.cpp +++ b/FaceServer/ev_server.cpp @@ -1,6 +1,6 @@ #include "ev_server.h" #include "ev_proto.h" -#include "logger.h" +#include <logger.h> #include <sys/types.h> #include <sys/socket.h> @@ -8,6 +8,8 @@ #include <arpa/inet.h> #include <sys/time.h> + +#include <netinet/tcp.h> // for TCP_NODELAY #include <stdlib.h> #include <stdio.h> @@ -32,12 +34,14 @@ size_t recvbuff_end; size_t recvbuff_max; size_t read_times; // read times for a command + bool toClose; evclient_proc_t proc; EVClient() : fd(-1), buf_ev(nullptr), recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), + toClose(false), proc(nullptr) { } }; @@ -47,6 +51,12 @@ #endif // Set a socket to non-blocking mode. +static int setnodelay(int fd) +{ + int flag = 1; + return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); +} + static int setnonblock(int fd) { int flags; @@ -61,6 +71,15 @@ return 0; } +static int setlinger(int fd) +{ + struct linger so_linger; + so_linger.l_onoff = 1; + so_linger.l_linger = 0; + + return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); +} + // Called by libevent when there is an error on the underlying socket descriptor. static void buffered_on_error(struct bufferevent *bufev, short what, void *arg) { @@ -69,11 +88,11 @@ if (what & EVBUFFER_EOF) { //Client disconnected, remove the read event and the free the client structure. - LOG_INFO << "Client disconnected." << std::endl; + LOG_INFO << "Client disconnected." << LOG_ENDL; } else { - LOG_WARN << "Client socket error, disconnecting." << std::endl; + LOG_WARN << "Client socket error, disconnecting." << LOG_ENDL; } bufferevent_free(client->buf_ev); close(client->fd); @@ -89,57 +108,58 @@ // Write back the read buffer. It is important to note that // bufferevent_write_buffer will drain the incoming data so it // is effectively gone after we call it. - //LOG_DEBUG << (char*)bufev->input << std::endl; + //LOG_DEBUG << (char*)bufev->input << LOG_ENDL; //bufferevent_write_buffer(bufev, bufev->input); //char buff[100] = {'\0'}; //size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); - //LOG_DEBUG << "readSize=" << readSize << "\t" << buff << std::endl; + //LOG_DEBUG << "readSize=" << readSize << "\t" << buff << LOG_ENDL; EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; // read header if expected + if (client->recvbuff_end == 0) { uint8_t headerBuff[sizeof(EVPHeader)]; - if (client->recvbuff_end == 0) + + size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); + client->read_times = 1; + if (readSize != sizeof(headerBuff)) { - size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); - client->read_times = 1; - if (readSize != sizeof(headerBuff)) - { - LOG_WARN << "client send incomplete header" << std::endl; - buffered_on_error(bufev, 0, arg); - return; - } - - evpHeader = (EVPHeader*)headerBuff; - - // check header - if (evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || - evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) - { - LOG_WARN << "client send invalid header" << std::endl; - buffered_on_error(bufev, 0, arg); - return; - } - - if (client->recvbuff_max < evpHeader->size) - { - delete[] client->recvbuff; - client->recvbuff = nullptr; - client->recvbuff_max = 0; - } - - if (client->recvbuff == nullptr) - { - uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; - client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); - client->recvbuff = new uint8_t[client->recvbuff_max]; - } - - memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); - client->recvbuff_end = sizeof(headerBuff); + LOG_WARN << "client send incomplete header" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + return; } + + evpHeader = (EVPHeader*)headerBuff; + evpHeader->ntoh(); + + // check header + if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || + evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || + evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) + { + LOG_WARN << "client send invalid header" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + return; + } + + if (client->recvbuff_max < evpHeader->size) + { + delete[] client->recvbuff; + client->recvbuff = nullptr; + client->recvbuff_max = 0; + } + + if (client->recvbuff == nullptr) + { + uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; + client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); + client->recvbuff = new uint8_t[client->recvbuff_max]; + } + + memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); + client->recvbuff_end = sizeof(headerBuff); } // read sub command or data @@ -147,9 +167,10 @@ do { readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end); - client->read_times++; if (readSize == 0) break; + else + client->read_times++; client->recvbuff_end += readSize; } while (readSize > 0); @@ -158,39 +179,43 @@ if (evpHeader->size == client->recvbuff_end) { // call client proc - bool closeClient = true; if (client->proc != nullptr) { EVClientStub cs(client->recvbuff, client->recvbuff_end); cs.id = client->fd; - closeClient = !(client->proc(cs)); + client->toClose = !(client->proc(cs)); if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) { size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize); - if (writeSize != cs.sendBuffSize) - LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << std::endl; - + bufferevent_enable(client->buf_ev, EV_WRITE); + if (writeSize != 0) + LOG_WARN << "server send truncate" << LOG_ENDL; + if (cs.deleteSendBuff) + { delete[] cs.sendBuff; + cs.sendBuff = nullptr; + } } - } - - if (closeClient) - { - LOG_DEBUG << "server initiative close" << std::endl; - buffered_on_error(bufev, 0, arg); } // reset client client->recvbuff_end = 0; client->read_times = 0; } + else + { + //LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size + // << ", recvbuff_end=" << client->recvbuff_end + // << ", read_times=" << client->read_times + // << LOG_ENDL; + } // check read times if (client->read_times > CLIENT_READ_TIMES_MAX) { - LOG_WARN << "client read times to max" << std::endl; + LOG_WARN << "client read times to max" << LOG_ENDL; buffered_on_error(bufev, 0, arg); } } @@ -199,6 +224,23 @@ // We only provide this because libevent expects it, but we don't use it. static void buffered_on_write(struct bufferevent *bufev, void *arg) { + struct EVClient *client = (struct EVClient *)arg; + if (evbuffer_get_length(bufev->output) == 0) + { + bufferevent_disable(client->buf_ev, EV_WRITE); + if (client->toClose) + { + LOG_DEBUG << "server initiative close" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + } + else + { + //bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work + //bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED); + // flush socket + shutdown(client->fd, SHUT_WR); + } + } } // This function will be called by libevent when there is a connection ready to be accepted. @@ -209,19 +251,25 @@ int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); if (client_fd < 0) { - LOG_WARN << "accept failed" << std::endl; + LOG_WARN << "accept failed" << LOG_ENDL; return; } // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) - LOG_WARN << "failed to set client socket non-blocking" << std::endl; + LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL; + + if (setnodelay(client_fd) < 0) + LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL; + + //if (setlinger(client_fd) < 0) + // LOG_WARN << "failed to set client socket linger" << LOG_ENDL; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; if (client == NULL) { - LOG_ERROR << "malloc failed" << std::endl; + LOG_ERROR << "malloc failed" << LOG_ENDL; } client->fd = client_fd; client->proc = evclient_proc; @@ -232,12 +280,12 @@ // We have to enable it before our callbacks will be called. bufferevent_enable(client->buf_ev, EV_READ); - LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << std::endl; + LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << LOG_ENDL; } int server_main(int argc, char **argv) { - //initLogger(LV_DEBUG); + LOG_NOTICE << "server_main" << LOG_ENDL; // Initialize libevent. event_init(); @@ -246,7 +294,7 @@ int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { - LOG_ERROR << "create socket failed" << std::endl; + LOG_ERROR << "create socket failed" << LOG_ENDL; return EXIT_FAILURE; } @@ -255,26 +303,28 @@ listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port = htons(SERVER_PORT); + + int reuseaddr_on = REUSEADDR_ON; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); + if (setnonblock(listen_fd) < 0) + { + LOG_ERROR << "failed to set server socket to non-blocking" << LOG_ENDL; + return EXIT_FAILURE; + } + if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) { - LOG_ERROR << "bind failed" << std::endl; + LOG_ERROR << "bind failed" << LOG_ENDL; return EXIT_FAILURE; } if (listen(listen_fd, 5) < 0) { - LOG_ERROR << "listen failed" << std::endl; + LOG_ERROR << "listen failed" << LOG_ENDL; return EXIT_FAILURE; } // Set the socket to non-blocking, this is essential in event based programming with libevent. - int reuseaddr_on = REUSEADDR_ON; - setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); - if (setnonblock(listen_fd) < 0) - { - LOG_ERROR << "failed to set server socket to non-blocking" << std::endl; - return EXIT_FAILURE; - } // We now have a listening socket, we create a read event to be notified when a client connects. struct event ev_accept; @@ -284,7 +334,15 @@ // Start the event loop. event_dispatch(); + close(listen_fd); + listen_fd = 0; return EXIT_SUCCESS; +} + +void server_stop() +{ + LOG_NOTICE << "server_stop" << LOG_ENDL; + event_loopexit(NULL); } void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status) @@ -296,7 +354,9 @@ EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; evpHeader->cmd = EVPCommand::EVPC_STATUS; evpHeader->size = client.sendBuffSize; + evpHeader->hton(); EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status; evpStatus->status = status; + evpHeader->hton(); } -- Gitblit v1.8.0