From b6441a19e4db6339e37de3440107be0530c5baaf Mon Sep 17 00:00:00 2001 From: houxiao <houxiao@454eff88-639b-444f-9e54-f578c98de674> Date: 星期四, 20 四月 2017 15:09:10 +0800 Subject: [PATCH] socket server finished --- FaceServer/main_face_daemon.cpp | 7 - FaceServer/ev_server.cpp | 143 +++++++++++++++++++++++------------ FaceServer/face_daemon_proto.h | 8 ++ FaceServer/test_client.cpp | 30 +++++-- FaceServer/facelist-1001-0-2.pb | 0 FaceServer/facelist-1001-0-4.pb | 0 6 files changed, 126 insertions(+), 62 deletions(-) diff --git a/FaceServer/ev_server.cpp b/FaceServer/ev_server.cpp index 4d5b29c..ce9ebb1 100644 --- a/FaceServer/ev_server.cpp +++ b/FaceServer/ev_server.cpp @@ -9,6 +9,8 @@ #include <sys/time.h> +#include <netinet/tcp.h> // for TCP_NODELAY + #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -32,12 +34,14 @@ size_t recvbuff_end; size_t recvbuff_max; size_t read_times; // read times for a command + bool toClose; evclient_proc_t proc; EVClient() : fd(-1), buf_ev(nullptr), recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), + toClose(false), proc(nullptr) { } }; @@ -47,6 +51,12 @@ #endif // Set a socket to non-blocking mode. +static int setnodelay(int fd) +{ + int flag = 1; + return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); +} + static int setnonblock(int fd) { int flags; @@ -59,6 +69,15 @@ return -1; return 0; +} + +static int setlinger(int fd) +{ + struct linger so_linger; + so_linger.l_onoff = 1; + so_linger.l_linger = 0; + + return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); } // Called by libevent when there is an error on the underlying socket descriptor. @@ -99,48 +118,47 @@ EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; // read header if expected + if (client->recvbuff_end == 0) { uint8_t headerBuff[sizeof(EVPHeader)]; - if (client->recvbuff_end == 0) + + size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); + client->read_times = 1; + if (readSize != sizeof(headerBuff)) { - size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); - client->read_times = 1; - if (readSize != sizeof(headerBuff)) - { - LOG_WARN << "client send incomplete header" << LOG_ENDL; - buffered_on_error(bufev, 0, arg); - return; - } - - evpHeader = (EVPHeader*)headerBuff; - - // check header - if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || - evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || - evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) - { - LOG_WARN << "client send invalid header" << LOG_ENDL; - buffered_on_error(bufev, 0, arg); - return; - } - - if (client->recvbuff_max < evpHeader->size) - { - delete[] client->recvbuff; - client->recvbuff = nullptr; - client->recvbuff_max = 0; - } - - if (client->recvbuff == nullptr) - { - uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; - client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); - client->recvbuff = new uint8_t[client->recvbuff_max]; - } - - memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); - client->recvbuff_end = sizeof(headerBuff); + LOG_WARN << "client send incomplete header" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + return; } + + evpHeader = (EVPHeader*)headerBuff; + + // check header + if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || + evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || + evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) + { + LOG_WARN << "client send invalid header" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + return; + } + + if (client->recvbuff_max < evpHeader->size) + { + delete[] client->recvbuff; + client->recvbuff = nullptr; + client->recvbuff_max = 0; + } + + if (client->recvbuff == nullptr) + { + uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; + client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); + client->recvbuff = new uint8_t[client->recvbuff_max]; + } + + memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); + client->recvbuff_end = sizeof(headerBuff); } // read sub command or data @@ -159,33 +177,37 @@ if (evpHeader->size == client->recvbuff_end) { // call client proc - bool closeClient = true; if (client->proc != nullptr) { EVClientStub cs(client->recvbuff, client->recvbuff_end); cs.id = client->fd; - closeClient = !(client->proc(cs)); + client->toClose = !(client->proc(cs)); if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) { size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize); - if (writeSize != cs.sendBuffSize) - LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << LOG_ENDL; - + bufferevent_enable(client->buf_ev, EV_WRITE); + if (writeSize != 0) + LOG_WARN << "server send truncate" << LOG_ENDL; + if (cs.deleteSendBuff) + { delete[] cs.sendBuff; + cs.sendBuff = nullptr; + } } - } - - if (closeClient) - { - LOG_DEBUG << "server initiative close" << LOG_ENDL; - buffered_on_error(bufev, 0, arg); } // reset client client->recvbuff_end = 0; client->read_times = 0; + } + else + { + LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size + << ", recvbuff_end=" << client->recvbuff_end + << ", read_times=" << client->read_times + << LOG_ENDL; } // check read times @@ -200,6 +222,23 @@ // We only provide this because libevent expects it, but we don't use it. static void buffered_on_write(struct bufferevent *bufev, void *arg) { + struct EVClient *client = (struct EVClient *)arg; + if (evbuffer_get_length(bufev->output) == 0) + { + bufferevent_disable(client->buf_ev, EV_WRITE); + if (client->toClose) + { + LOG_DEBUG << "server initiative close" << LOG_ENDL; + buffered_on_error(bufev, 0, arg); + } + else + { + //bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work + //bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED); + // flush socket + shutdown(client->fd, SHUT_WR); + } + } } // This function will be called by libevent when there is a connection ready to be accepted. @@ -217,6 +256,12 @@ // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL; + + if (setnodelay(client_fd) < 0) + LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL; + + //if (setlinger(client_fd) < 0) + // LOG_WARN << "failed to set client socket linger" << LOG_ENDL; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; diff --git a/FaceServer/face_daemon_proto.h b/FaceServer/face_daemon_proto.h index 029beb2..ccda811 100644 --- a/FaceServer/face_daemon_proto.h +++ b/FaceServer/face_daemon_proto.h @@ -32,6 +32,14 @@ uint8_t buff[0]; }; +struct FDP_FaceDetectPB +{ + int32_t db_id; + + FDP_FaceDetectPB(int32_t _db_id) : db_id(_db_id) + {} +}; + struct FDP_FaceDetectResult { int32_t db_id; diff --git a/FaceServer/facelist-1001-0-2.pb b/FaceServer/facelist-1001-0-2.pb new file mode 100644 index 0000000..83e8e4d --- /dev/null +++ b/FaceServer/facelist-1001-0-2.pb Binary files differ diff --git a/FaceServer/facelist-1001-0-4.pb b/FaceServer/facelist-1001-0-4.pb new file mode 100644 index 0000000..3200270 --- /dev/null +++ b/FaceServer/facelist-1001-0-4.pb Binary files differ diff --git a/FaceServer/main_face_daemon.cpp b/FaceServer/main_face_daemon.cpp index 581cbc2..09f30e3 100644 --- a/FaceServer/main_face_daemon.cpp +++ b/FaceServer/main_face_daemon.cpp @@ -56,9 +56,10 @@ //return true; EVPHeader* evpHeader = (EVPHeader*)client.recvBuff; + FDP_FaceDetectPB* fdpFaceDetectPB = (FDP_FaceDetectPB*)(client.recvBuff + sizeof(EVPHeader)); PbFaceList pbFaceList; - pbFaceList.ParseFromArray(client.recvBuff + sizeof(EVPHeader), evpHeader->size - sizeof(EVPHeader)); + pbFaceList.ParseFromArray(client.recvBuff + sizeof(EVPHeader) + sizeof(FDP_FaceDetectPB), evpHeader->size - sizeof(EVPHeader) - sizeof(FDP_FaceDetectPB)); LOGP(DEBUG, "pbFaceList: magic=%u, image_count=%u, src_width=%u, src_height=%u", pbFaceList.magic(), pbFaceList.image_count(), pbFaceList.src_width(), pbFaceList.src_height()); @@ -85,9 +86,7 @@ result.push_back(FDP_FaceDetectResult(2,456)); result.push_back(FDP_FaceDetectResult(0,0)); - send_SensetimeFaceDetectResultJson(client, result); - - return false; + return send_SensetimeFaceDetectResultJson(client, result); } bool ev_dispatcher_proto_pb(EVClientStub& client) diff --git a/FaceServer/test_client.cpp b/FaceServer/test_client.cpp index 5383601..d8d1ce1 100644 --- a/FaceServer/test_client.cpp +++ b/FaceServer/test_client.cpp @@ -19,7 +19,8 @@ #include "ev_proto.h" #include "face_daemon_proto.h" -void make_msg(char* mesg, int length) +/* +void make_msg(char* mesg, int& length) { EVPHeader* evpHeader = new (mesg) EVPHeader; evpHeader->proto = EVPProto::EVPP_PROTOBUF; @@ -30,11 +31,19 @@ size_t fsize = fread(mesg + sizeof(EVPHeader), 1, length - sizeof(EVPHeader), pFile); fclose(pFile); } +*/ + +void make_msg(char* mesg, int& length) +{ + FILE* pFile = fopen("facelist-1001-0-2.pb", "rb"); + length = fread(mesg, 1, length, pFile); + fclose(pFile); +} int main() { // build the message to be sent - int length = 19454; // the size of message + int length = 1024 * 1024; // the size of message char* mesg = (char*)malloc((length+1)*sizeof(char)); // Look out the end mark '/0' of a C string if (mesg == NULL) exit(1); @@ -75,16 +84,19 @@ bufferevent_write(conn,mesg,length); // check the output evbuffer struct evbuffer* output = bufferevent_get_output(conn); - int len = 0; - len = evbuffer_get_length(output); - printf("output buffer has %d bytes left\n", len); event_base_dispatch(base); - //char readbuf[100*1024]; - //int readbufsize = read(fd, readbuf, sizeof(readbuf)); - //printf("read:\n%s\n", readbuf); - + char readbuf[10]; + int readbufsize = read(fd, readbuf, sizeof(readbuf)); + while(readbufsize>0) + { + readbuf[readbufsize] = '\0'; + printf("%s", readbuf); + readbufsize = read(fd, readbuf, sizeof(readbuf)); + } + printf("\n"); + free(mesg); mesg = NULL; -- Gitblit v1.8.0