From 109ffe9a777658936a38d0c146579a67c60a0d17 Mon Sep 17 00:00:00 2001 From: xuxiuxi <xuxiuxi@454eff88-639b-444f-9e54-f578c98de674> Date: 星期四, 11 五月 2017 17:48:48 +0800 Subject: [PATCH] --- FaceServer/ev_server.cpp | 149 +++++++++++++++++++++++++++++++++---------------- 1 files changed, 99 insertions(+), 50 deletions(-) diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp index 4d5b29c..5ba4372 100644 --- a/FaceServer/ev_server.cpp +++ b/FaceServer/ev_server.cpp @@ -9,6 +9,8 @@ #include <sys/time.h> +#include <netinet/tcp.h> // for TCP_NODELAY + #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -32,12 +34,14 @@ size_t recvbuff_end; size_t recvbuff_max; size_t read_times; // read times for a command + bool toClose; evclient_proc_t proc; EVClient() : fd(-1), buf_ev(nullptr), recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), + toClose(false), proc(nullptr) { } }; @@ -47,6 +51,12 @@ #endif // Set a socket to non-blocking mode. +static int setnodelay(int fd) +{ + int flag = 1; + return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); +} + static int setnonblock(int fd) { int flags; @@ -59,6 +69,15 @@ return -1; return 0; +} + +static int setlinger(int fd) +{ + struct linger so_linger; + so_linger.l_onoff = 1; + so_linger.l_linger = 0; + + return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); } // Called by libevent when there is an error on the underlying socket descriptor. @@ -99,48 +118,48 @@ EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; // read header if expected + if (client->recvbuff_end == 0) { uint8_t headerBuff[sizeof(EVPHeader)]; - if (client->recvbuff_end == 0) + + size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); + client->read_times = 1; + if (readSize != sizeof(headerBuff)) { - size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); - client->read_times = 1; - if (readSize != sizeof(headerBuff)) - { - LOG_WARN << "client send incomplete header" << LOG_ENDL; - buffered_on_error(bufev, 0, arg); - return; - } - - evpHeader = (EVPHeader*)headerBuff; - - // check header - if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || - evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || - evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) - { - LOG_WARN << "client send invalid header" << LOG_ENDL; - buffered_on_error(bufev, 0, arg); - return; - } - - if (client->recvbuff_max < evpHeader->size) - { - delete[] client->recvbuff; - client->recvbuff = nullptr; - client->recvbuff_max = 0; - } - - if (client->recvbuff == nullptr) - { - uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; - client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); - client->recvbuff = new uint8_t[client->recvbuff_max]; - } - - memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); - client->recvbuff_end = sizeof(headerBuff); + LOG_WARN << "client send incomplete header" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + return; } + + evpHeader = (EVPHeader*)headerBuff; + evpHeader->ntoh(); + + // check header + if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || + evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || + evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) + { + LOG_WARN << "client send invalid header" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + return; + } + + if (client->recvbuff_max < evpHeader->size) + { + delete[] client->recvbuff; + client->recvbuff = nullptr; + client->recvbuff_max = 0; + } + + if (client->recvbuff == nullptr) + { + uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; + client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); + client->recvbuff = new uint8_t[client->recvbuff_max]; + } + + memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); + client->recvbuff_end = sizeof(headerBuff); } // read sub command or data @@ -148,9 +167,10 @@ do { readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end); - client->read_times++; if (readSize == 0) break; + else + client->read_times++; client->recvbuff_end += readSize; } while (readSize > 0); @@ -159,33 +179,37 @@ if (evpHeader->size == client->recvbuff_end) { // call client proc - bool closeClient = true; if (client->proc != nullptr) { EVClientStub cs(client->recvbuff, client->recvbuff_end); cs.id = client->fd; - closeClient = !(client->proc(cs)); + client->toClose = !(client->proc(cs)); if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) { size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize); - if (writeSize != cs.sendBuffSize) - LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << LOG_ENDL; - + bufferevent_enable(client->buf_ev, EV_WRITE); + if (writeSize != 0) + LOG_WARN << "server send truncate" << LOG_ENDL; + if (cs.deleteSendBuff) + { delete[] cs.sendBuff; + cs.sendBuff = nullptr; + } } - } - - if (closeClient) - { - LOG_DEBUG << "server initiative close" << LOG_ENDL; - buffered_on_error(bufev, 0, arg); } // reset client client->recvbuff_end = 0; client->read_times = 0; + } + else + { + //LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size + // << ", recvbuff_end=" << client->recvbuff_end + // << ", read_times=" << client->read_times + // << LOG_ENDL; } // check read times @@ -200,6 +224,23 @@ // We only provide this because libevent expects it, but we don't use it. static void buffered_on_write(struct bufferevent *bufev, void *arg) { + struct EVClient *client = (struct EVClient *)arg; + if (evbuffer_get_length(bufev->output) == 0) + { + bufferevent_disable(client->buf_ev, EV_WRITE); + if (client->toClose) + { + LOG_DEBUG << "server initiative close" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + } + else + { + //bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work + //bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED); + // flush socket + shutdown(client->fd, SHUT_WR); + } + } } // This function will be called by libevent when there is a connection ready to be accepted. @@ -217,6 +258,12 @@ // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL; + + if (setnodelay(client_fd) < 0) + LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL; + + //if (setlinger(client_fd) < 0) + // LOG_WARN << "failed to set client socket linger" << LOG_ENDL; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; @@ -307,7 +354,9 @@ EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; evpHeader->cmd = EVPCommand::EVPC_STATUS; evpHeader->size = client.sendBuffSize; + evpHeader->hton(); EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status; evpStatus->status = status; + evpHeader->hton(); } -- Gitblit v1.8.0