From b6441a19e4db6339e37de3440107be0530c5baaf Mon Sep 17 00:00:00 2001
From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674>
Date: 星期四, 20 四月 2017 15:09:10 +0800
Subject: [PATCH] socket server finished

---
 FaceServer/main_face_daemon.cpp |    7 -
 FaceServer/ev_server.cpp        |  143 +++++++++++++++++++++++------------
 FaceServer/face_daemon_proto.h  |    8 ++
 FaceServer/test_client.cpp      |   30 +++++--
 FaceServer/facelist-1001-0-2.pb |    0 
 FaceServer/facelist-1001-0-4.pb |    0 
 6 files changed, 126 insertions(+), 62 deletions(-)

diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp
index 4d5b29c..ce9ebb1 100644
--- a/FaceServer/ev_server.cpp
+++ b/FaceServer/ev_server.cpp
@@ -9,6 +9,8 @@
 
 #include <sys/time.h>
 
+#include <netinet/tcp.h> // for TCP_NODELAY
+
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -32,12 +34,14 @@
 	size_t recvbuff_end;
 	size_t recvbuff_max;
 	size_t read_times; // read times for a command
+	bool toClose;
 	
 	evclient_proc_t proc;
 	
 	EVClient() : 
 		fd(-1), buf_ev(nullptr), 
 		recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), 
+		toClose(false), 
 		proc(nullptr)
 	{ }
 };
@@ -47,6 +51,12 @@
 #endif
 
 // Set a socket to non-blocking mode.
+static int setnodelay(int fd)
+{
+	int flag = 1; 
+	return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
+}
+
 static int setnonblock(int fd)
 {
 	int flags;
@@ -59,6 +69,15 @@
 		return -1;
 
 	return 0;
+}
+
+static int setlinger(int fd)
+{
+	struct linger so_linger;
+	so_linger.l_onoff = 1;
+	so_linger.l_linger = 0;
+	
+	return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
 }
 
 // Called by libevent when there is an error on the underlying socket descriptor.
@@ -99,48 +118,47 @@
 	EVPHeader* evpHeader = (EVPHeader*)client->recvbuff;
 	
 	// read header if expected
+	if (client->recvbuff_end == 0)
 	{
 		uint8_t headerBuff[sizeof(EVPHeader)];
-		if (client->recvbuff_end == 0)
+
+		size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
+		client->read_times = 1;
+		if (readSize != sizeof(headerBuff))
 		{
-			size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
-			client->read_times = 1;
-			if (readSize != sizeof(headerBuff))
-			{
-				LOG_WARN << "client send incomplete header" << LOG_ENDL;
-				buffered_on_error(bufev, 0, arg);
-				return;
-			}
-			
-			evpHeader = (EVPHeader*)headerBuff;
-			
-			// check header
-			if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || 
-				evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || 
-				evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
-			{
-				LOG_WARN << "client send invalid header" << LOG_ENDL;
-				buffered_on_error(bufev, 0, arg);
-				return;
-			}
-			
-			if (client->recvbuff_max < evpHeader->size)
-			{
-				delete[] client->recvbuff;
-				client->recvbuff = nullptr;
-				client->recvbuff_max = 0;
-			}
-			
-			if (client->recvbuff == nullptr)
-			{
-				uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
-				client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
-				client->recvbuff = new uint8_t[client->recvbuff_max];
-			}
-			
-			memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
-			client->recvbuff_end = sizeof(headerBuff);
+			LOG_WARN << "client send incomplete header" << LOG_ENDL;
+			buffered_on_error(bufev, 0, arg);
+			return;
 		}
+		
+		evpHeader = (EVPHeader*)headerBuff;
+		
+		// check header
+		if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || 
+			evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || 
+			evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
+		{
+			LOG_WARN << "client send invalid header" << LOG_ENDL;
+			buffered_on_error(bufev, 0, arg);
+			return;
+		}
+		
+		if (client->recvbuff_max < evpHeader->size)
+		{
+			delete[] client->recvbuff;
+			client->recvbuff = nullptr;
+			client->recvbuff_max = 0;
+		}
+		
+		if (client->recvbuff == nullptr)
+		{
+			uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
+			client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
+			client->recvbuff = new uint8_t[client->recvbuff_max];
+		}
+		
+		memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
+		client->recvbuff_end = sizeof(headerBuff);
 	}
 	
 	// read sub command or data
@@ -159,33 +177,37 @@
 	if (evpHeader->size == client->recvbuff_end)
 	{
 		// call client proc
-		bool closeClient = true;
 		if (client->proc != nullptr)
 		{
 			EVClientStub cs(client->recvbuff, client->recvbuff_end);
 			cs.id = client->fd;
-			closeClient = !(client->proc(cs));
+			client->toClose = !(client->proc(cs));
 			
 			if (cs.sendBuff != nullptr && cs.sendBuffSize > 0)
 			{
 				size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize);
-				if (writeSize != cs.sendBuffSize)
-					LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << LOG_ENDL;
-				
+				bufferevent_enable(client->buf_ev, EV_WRITE);
+				if (writeSize != 0)
+					LOG_WARN << "server send truncate" << LOG_ENDL;
+
 				if (cs.deleteSendBuff)
+				{
 					delete[] cs.sendBuff;
+					cs.sendBuff = nullptr;
+				}
 			}
-		}
-		
-		if (closeClient)
-		{
-			LOG_DEBUG << "server initiative close" << LOG_ENDL;
-			buffered_on_error(bufev, 0, arg);
 		}
 
 		// reset client
 		client->recvbuff_end = 0;
 		client->read_times = 0;
+	}
+	else
+	{
+		LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size 
+			<< ", recvbuff_end=" << client->recvbuff_end 
+			<< ", read_times=" << client->read_times 
+			<< LOG_ENDL;
 	}
 	
 	// check read times
@@ -200,6 +222,23 @@
 // We only provide this because libevent expects it, but we don't use it.
 static void buffered_on_write(struct bufferevent *bufev, void *arg)
 {
+	struct EVClient *client = (struct EVClient *)arg;
+	if (evbuffer_get_length(bufev->output) == 0)
+	{
+		bufferevent_disable(client->buf_ev, EV_WRITE);
+		if (client->toClose)
+		{
+			LOG_DEBUG << "server initiative close" << LOG_ENDL;
+			buffered_on_error(bufev, 0, arg);
+		}
+		else
+		{
+			//bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work
+			//bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED);
+			// flush socket
+			shutdown(client->fd, SHUT_WR);
+		}
+	}
 }
 
 // This function will be called by libevent when there is a connection ready to be accepted.
@@ -217,6 +256,12 @@
 	// Set the client socket to non-blocking mode.
 	if (setnonblock(client_fd) < 0)
 		LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL;
+	
+	if (setnodelay(client_fd) < 0)
+		LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL;
+	
+	//if (setlinger(client_fd) < 0)
+	//	LOG_WARN << "failed to set client socket linger" << LOG_ENDL;
 
 	// We've accepted a new client, create a client object.
 	struct EVClient* client = new EVClient;
diff --git a/FaceServer/face_daemon_proto.h b/FaceServer/face_daemon_proto.h
index 029beb2..ccda811 100644
--- a/FaceServer/face_daemon_proto.h
+++ b/FaceServer/face_daemon_proto.h
@@ -32,6 +32,14 @@
 	uint8_t buff[0];
 };
 
+struct FDP_FaceDetectPB
+{
+	int32_t db_id;
+	
+	FDP_FaceDetectPB(int32_t _db_id) : db_id(_db_id)
+	{}
+};
+
 struct FDP_FaceDetectResult
 {
 	int32_t db_id;
diff --git a/FaceServer/facelist-1001-0-2.pb b/FaceServer/facelist-1001-0-2.pb
new file mode 100644
index 0000000..83e8e4d
--- /dev/null
+++ b/FaceServer/facelist-1001-0-2.pb
Binary files differ
diff --git a/FaceServer/facelist-1001-0-4.pb b/FaceServer/facelist-1001-0-4.pb
new file mode 100644
index 0000000..3200270
--- /dev/null
+++ b/FaceServer/facelist-1001-0-4.pb
Binary files differ
diff --git a/FaceServer/main_face_daemon.cpp b/FaceServer/main_face_daemon.cpp
index 581cbc2..09f30e3 100644
--- a/FaceServer/main_face_daemon.cpp
+++ b/FaceServer/main_face_daemon.cpp
@@ -56,9 +56,10 @@
 	//return true;
 	
 	EVPHeader* evpHeader = (EVPHeader*)client.recvBuff;
+	FDP_FaceDetectPB* fdpFaceDetectPB = (FDP_FaceDetectPB*)(client.recvBuff + sizeof(EVPHeader));
 	
 	PbFaceList pbFaceList;
-	pbFaceList.ParseFromArray(client.recvBuff + sizeof(EVPHeader), evpHeader->size - sizeof(EVPHeader));
+	pbFaceList.ParseFromArray(client.recvBuff + sizeof(EVPHeader) + sizeof(FDP_FaceDetectPB), evpHeader->size - sizeof(EVPHeader) - sizeof(FDP_FaceDetectPB));
 	LOGP(DEBUG, "pbFaceList: magic=%u, image_count=%u, src_width=%u, src_height=%u", 
 		pbFaceList.magic(), pbFaceList.image_count(), pbFaceList.src_width(), pbFaceList.src_height());
 
@@ -85,9 +86,7 @@
 	result.push_back(FDP_FaceDetectResult(2,456));
 	result.push_back(FDP_FaceDetectResult(0,0));
 	
-	send_SensetimeFaceDetectResultJson(client, result);
-
-	return false;
+	return send_SensetimeFaceDetectResultJson(client, result);
 }
 
 bool ev_dispatcher_proto_pb(EVClientStub& client)
diff --git a/FaceServer/test_client.cpp b/FaceServer/test_client.cpp
index 5383601..d8d1ce1 100644
--- a/FaceServer/test_client.cpp
+++ b/FaceServer/test_client.cpp
@@ -19,7 +19,8 @@
 #include "ev_proto.h"
 #include "face_daemon_proto.h"
 
-void make_msg(char* mesg, int length)
+/*
+void make_msg(char* mesg, int& length)
 {
 	EVPHeader* evpHeader = new (mesg) EVPHeader;
 	evpHeader->proto = EVPProto::EVPP_PROTOBUF;
@@ -30,11 +31,19 @@
 	size_t fsize = fread(mesg + sizeof(EVPHeader), 1, length - sizeof(EVPHeader), pFile);
 	fclose(pFile);
 }
+*/
+
+void make_msg(char* mesg, int& length)
+{
+	FILE* pFile = fopen("facelist-1001-0-2.pb", "rb");
+	length = fread(mesg, 1, length, pFile);
+	fclose(pFile);
+}
 
 int main()
 {
     // build the message to be sent
-    int length = 19454; // the size of message
+    int length = 1024 * 1024; // the size of message
     char* mesg = (char*)malloc((length+1)*sizeof(char)); // Look out the end mark '/0' of a C string
     if (mesg == NULL)
         exit(1);
@@ -75,16 +84,19 @@
     bufferevent_write(conn,mesg,length);
     // check the output evbuffer
     struct evbuffer* output = bufferevent_get_output(conn);
-    int len = 0;
-    len = evbuffer_get_length(output);
-    printf("output buffer has %d bytes left\n", len);
 
     event_base_dispatch(base);
  
-	//char readbuf[100*1024];
-	//int readbufsize = read(fd, readbuf, sizeof(readbuf));
-	//printf("read:\n%s\n", readbuf);
- 
+	char readbuf[10];
+	int readbufsize = read(fd, readbuf, sizeof(readbuf));
+	while(readbufsize>0)
+	{
+		readbuf[readbufsize] = '\0';
+		printf("%s", readbuf);
+		readbufsize = read(fd, readbuf, sizeof(readbuf));
+	}
+	printf("\n");
+
     free(mesg);
     mesg = NULL;
 

--
Gitblit v1.8.0