From 109ffe9a777658936a38d0c146579a67c60a0d17 Mon Sep 17 00:00:00 2001
From: xuxiuxi <xuxiuxi@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期四, 11 五月 2017 17:48:48 +0800
Subject: [PATCH] 

---
 FaceServer/ev_server.cpp |  200 ++++++++++++++++++++++++++++++++-----------------
 1 files changed, 130 insertions(+), 70 deletions(-)

diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp
index 2f423a2..5ba4372 100644
--- a/FaceServer/ev_server.cpp
+++ b/FaceServer/ev_server.cpp
@@ -1,6 +1,6 @@
 #include "ev_server.h" 
 #include "ev_proto.h"
-#include "logger.h"
+#include <logger.h>
 
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -8,6 +8,8 @@
 #include <arpa/inet.h>
 
 #include <sys/time.h>
+
+#include <netinet/tcp.h> // for TCP_NODELAY
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -32,12 +34,14 @@
 	size_t recvbuff_end;
 	size_t recvbuff_max;
 	size_t read_times; // read times for a command
+	bool toClose;
 	
 	evclient_proc_t proc;
 	
 	EVClient() : 
 		fd(-1), buf_ev(nullptr), 
 		recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), 
+		toClose(false), 
 		proc(nullptr)
 	{ }
 };
@@ -47,6 +51,12 @@
 #endif
 
 // Set a socket to non-blocking mode.
+static int setnodelay(int fd)
+{
+	int flag = 1; 
+	return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
+}
+
 static int setnonblock(int fd)
 {
 	int flags;
@@ -61,6 +71,15 @@
 	return 0;
 }
 
+static int setlinger(int fd)
+{
+	struct linger so_linger;
+	so_linger.l_onoff = 1;
+	so_linger.l_linger = 0;
+	
+	return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
+}
+
 // Called by libevent when there is an error on the underlying socket descriptor.
 static void buffered_on_error(struct bufferevent *bufev, short what, void *arg)
 {
@@ -69,11 +88,11 @@
 	if (what & EVBUFFER_EOF)
 	{
 		//Client disconnected, remove the read event and the free the client structure.
-		LOG_INFO << "Client disconnected." << std::endl;
+		LOG_INFO << "Client disconnected." << LOG_ENDL;
 	}
 	else
 	{
-		LOG_WARN << "Client socket error, disconnecting." << std::endl;
+		LOG_WARN << "Client socket error, disconnecting." << LOG_ENDL;
 	}
 	bufferevent_free(client->buf_ev);
 	close(client->fd);
@@ -89,57 +108,58 @@
 	// Write back the read buffer. It is important to note that
 	// bufferevent_write_buffer will drain the incoming data so it
 	// is effectively gone after we call it.
-	//LOG_DEBUG << (char*)bufev->input << std::endl;
+	//LOG_DEBUG << (char*)bufev->input << LOG_ENDL;
 	//bufferevent_write_buffer(bufev, bufev->input);
 	
 	//char buff[100] = {'\0'};
 	//size_t readSize = bufferevent_read(bufev, buff, sizeof(buff));
-	//LOG_DEBUG << "readSize=" << readSize << "\t" << buff << std::endl;
+	//LOG_DEBUG << "readSize=" << readSize << "\t" << buff << LOG_ENDL;
 
 	EVPHeader* evpHeader = (EVPHeader*)client->recvbuff;
 	
 	// read header if expected
+	if (client->recvbuff_end == 0)
 	{
 		uint8_t headerBuff[sizeof(EVPHeader)];
-		if (client->recvbuff_end == 0)
+
+		size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
+		client->read_times = 1;
+		if (readSize != sizeof(headerBuff))
 		{
-			size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
-			client->read_times = 1;
-			if (readSize != sizeof(headerBuff))
-			{
-				LOG_WARN << "client send incomplete header" << std::endl;
-				buffered_on_error(bufev, 0, arg);
-				return;
-			}
-			
-			evpHeader = (EVPHeader*)headerBuff;
-			
-			// check header
-			if (evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || 
-				evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
-			{
-				LOG_WARN << "client send invalid header" << std::endl;
-				buffered_on_error(bufev, 0, arg);
-				return;
-			}
-			
-			if (client->recvbuff_max < evpHeader->size)
-			{
-				delete[] client->recvbuff;
-				client->recvbuff = nullptr;
-				client->recvbuff_max = 0;
-			}
-			
-			if (client->recvbuff == nullptr)
-			{
-				uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
-				client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
-				client->recvbuff = new uint8_t[client->recvbuff_max];
-			}
-			
-			memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
-			client->recvbuff_end = sizeof(headerBuff);
+			LOG_WARN << "client send incomplete header" << LOG_ENDL;
+			buffered_on_error(bufev, 0, arg);
+			return;
 		}
+		
+		evpHeader = (EVPHeader*)headerBuff;
+		evpHeader->ntoh();
+		
+		// check header
+		if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || 
+			evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || 
+			evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
+		{
+			LOG_WARN << "client send invalid header" << LOG_ENDL;
+			buffered_on_error(bufev, 0, arg);
+			return;
+		}
+		
+		if (client->recvbuff_max < evpHeader->size)
+		{
+			delete[] client->recvbuff;
+			client->recvbuff = nullptr;
+			client->recvbuff_max = 0;
+		}
+		
+		if (client->recvbuff == nullptr)
+		{
+			uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
+			client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
+			client->recvbuff = new uint8_t[client->recvbuff_max];
+		}
+		
+		memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
+		client->recvbuff_end = sizeof(headerBuff);
 	}
 	
 	// read sub command or data
@@ -147,9 +167,10 @@
 	do
 	{
 		readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end);
-		client->read_times++;
 		if (readSize == 0)
 			break;
+		else
+			client->read_times++;
 
 		client->recvbuff_end += readSize;
 	} while (readSize > 0);
@@ -158,39 +179,43 @@
 	if (evpHeader->size == client->recvbuff_end)
 	{
 		// call client proc
-		bool closeClient = true;
 		if (client->proc != nullptr)
 		{
 			EVClientStub cs(client->recvbuff, client->recvbuff_end);
 			cs.id = client->fd;
-			closeClient = !(client->proc(cs));
+			client->toClose = !(client->proc(cs));
 			
 			if (cs.sendBuff != nullptr && cs.sendBuffSize > 0)
 			{
 				size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize);
-				if (writeSize != cs.sendBuffSize)
-					LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << std::endl;
-				
+				bufferevent_enable(client->buf_ev, EV_WRITE);
+				if (writeSize != 0)
+					LOG_WARN << "server send truncate" << LOG_ENDL;
+
 				if (cs.deleteSendBuff)
+				{
 					delete[] cs.sendBuff;
+					cs.sendBuff = nullptr;
+				}
 			}
-		}
-		
-		if (closeClient)
-		{
-			LOG_DEBUG << "server initiative close" << std::endl;
-			buffered_on_error(bufev, 0, arg);
 		}
 
 		// reset client
 		client->recvbuff_end = 0;
 		client->read_times = 0;
 	}
+	else
+	{
+		//LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size 
+		//	<< ", recvbuff_end=" << client->recvbuff_end 
+		//	<< ", read_times=" << client->read_times 
+		//	<< LOG_ENDL;
+	}
 	
 	// check read times
 	if (client->read_times > CLIENT_READ_TIMES_MAX)
 	{
-		LOG_WARN << "client read times to max" << std::endl;
+		LOG_WARN << "client read times to max" << LOG_ENDL;
 		buffered_on_error(bufev, 0, arg);
 	}
 }
@@ -199,6 +224,23 @@
 // We only provide this because libevent expects it, but we don't use it.
 static void buffered_on_write(struct bufferevent *bufev, void *arg)
 {
+	struct EVClient *client = (struct EVClient *)arg;
+	if (evbuffer_get_length(bufev->output) == 0)
+	{
+		bufferevent_disable(client->buf_ev, EV_WRITE);
+		if (client->toClose)
+		{
+			LOG_DEBUG << "server initiative close" << LOG_ENDL;
+			buffered_on_error(bufev, 0, arg);
+		}
+		else
+		{
+			//bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work
+			//bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED);
+			// flush socket
+			shutdown(client->fd, SHUT_WR);
+		}
+	}
 }
 
 // This function will be called by libevent when there is a connection ready to be accepted.
@@ -209,19 +251,25 @@
 	int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
 	if (client_fd < 0)
 	{
-		LOG_WARN << "accept failed" << std::endl;
+		LOG_WARN << "accept failed" << LOG_ENDL;
 		return;
 	}
 
 	// Set the client socket to non-blocking mode.
 	if (setnonblock(client_fd) < 0)
-		LOG_WARN << "failed to set client socket non-blocking" << std::endl;
+		LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL;
+	
+	if (setnodelay(client_fd) < 0)
+		LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL;
+	
+	//if (setlinger(client_fd) < 0)
+	//	LOG_WARN << "failed to set client socket linger" << LOG_ENDL;
 
 	// We've accepted a new client, create a client object.
 	struct EVClient* client = new EVClient;
 	if (client == NULL)
 	{
-		LOG_ERROR << "malloc failed" << std::endl;
+		LOG_ERROR << "malloc failed" << LOG_ENDL;
 	}
 	client->fd = client_fd;
 	client->proc = evclient_proc;
@@ -232,12 +280,12 @@
 	// We have to enable it before our callbacks will be called.
 	bufferevent_enable(client->buf_ev, EV_READ);
 
-	LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << std::endl;
+	LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << LOG_ENDL;
 }
 
 int server_main(int argc, char **argv)
 {
-	//initLogger(LV_DEBUG);
+	LOG_NOTICE << "server_main" << LOG_ENDL;
 
 	// Initialize libevent.
 	event_init();
@@ -246,7 +294,7 @@
 	int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
 	if (listen_fd < 0)
 	{
-		LOG_ERROR << "create socket failed" << std::endl;
+		LOG_ERROR << "create socket failed" << LOG_ENDL;
 		return EXIT_FAILURE;
 	}
 	
@@ -255,26 +303,28 @@
 	listen_addr.sin_family = AF_INET;
 	listen_addr.sin_addr.s_addr = INADDR_ANY;
 	listen_addr.sin_port = htons(SERVER_PORT);
+	
+	int reuseaddr_on = REUSEADDR_ON;
+	setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
+	if (setnonblock(listen_fd) < 0)
+	{
+		LOG_ERROR << "failed to set server socket to non-blocking" << LOG_ENDL;
+		return EXIT_FAILURE;
+	}
+
 	if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0)
 	{
-		LOG_ERROR << "bind failed" << std::endl;
+		LOG_ERROR << "bind failed" << LOG_ENDL;
 		return EXIT_FAILURE;
 	}
 	
 	if (listen(listen_fd, 5) < 0)
 	{
-		LOG_ERROR << "listen failed" << std::endl;
+		LOG_ERROR << "listen failed" << LOG_ENDL;
 		return EXIT_FAILURE;
 	}
 	
 	// Set the socket to non-blocking, this is essential in event based programming with libevent.
-	int reuseaddr_on = REUSEADDR_ON;
-	setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
-	if (setnonblock(listen_fd) < 0)
-	{
-		LOG_ERROR << "failed to set server socket to non-blocking" << std::endl;
-		return EXIT_FAILURE;
-	}
 
 	// We now have a listening socket, we create a read event to be notified when a client connects.
 	struct event ev_accept;
@@ -284,7 +334,15 @@
 	// Start the event loop.
 	event_dispatch();
 
+	close(listen_fd);
+	listen_fd = 0;
 	return EXIT_SUCCESS;
+}
+
+void server_stop()
+{
+	LOG_NOTICE << "server_stop" << LOG_ENDL;
+	event_loopexit(NULL);
 }
 
 void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status)
@@ -296,7 +354,9 @@
 	EVPHeader* evpHeader = new (client.sendBuff) EVPHeader;
 	evpHeader->cmd = EVPCommand::EVPC_STATUS;
 	evpHeader->size = client.sendBuffSize;
+	evpHeader->hton();
 	
 	EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status;
 	evpStatus->status = status;
+	evpHeader->hton();
 }

--
Gitblit v1.8.0