houxiao
2017-04-20 b6441a19e4db6339e37de3440107be0530c5baaf
socket server finished

git-svn-id: http://192.168.1.226/svn/proxy@513 454eff88-639b-444f-9e54-f578c98de674
2个文件已添加
4个文件已修改
188 ■■■■■ 已修改文件
FaceServer/ev_server.cpp 143 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
FaceServer/face_daemon_proto.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
FaceServer/facelist-1001-0-2.pb 补丁 | 查看 | 原始文档 | blame | 历史
FaceServer/facelist-1001-0-4.pb 补丁 | 查看 | 原始文档 | blame | 历史
FaceServer/main_face_daemon.cpp 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
FaceServer/test_client.cpp 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
FaceServer/ev_server.cpp
@@ -9,6 +9,8 @@
#include <sys/time.h>
#include <netinet/tcp.h> // for TCP_NODELAY
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -32,12 +34,14 @@
    size_t recvbuff_end;
    size_t recvbuff_max;
    size_t read_times; // read times for a command
    bool toClose;
    
    evclient_proc_t proc;
    
    EVClient() : 
        fd(-1), buf_ev(nullptr), 
        recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), 
        toClose(false),
        proc(nullptr)
    { }
};
@@ -47,6 +51,12 @@
#endif
// Set a socket to non-blocking mode.
static int setnodelay(int fd)
{
    int flag = 1;
    return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
}
static int setnonblock(int fd)
{
    int flags;
@@ -59,6 +69,15 @@
        return -1;
    return 0;
}
static int setlinger(int fd)
{
    struct linger so_linger;
    so_linger.l_onoff = 1;
    so_linger.l_linger = 0;
    return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
}
// Called by libevent when there is an error on the underlying socket descriptor.
@@ -99,48 +118,47 @@
    EVPHeader* evpHeader = (EVPHeader*)client->recvbuff;
    
    // read header if expected
    if (client->recvbuff_end == 0)
    {
        uint8_t headerBuff[sizeof(EVPHeader)];
        if (client->recvbuff_end == 0)
        size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
        client->read_times = 1;
        if (readSize != sizeof(headerBuff))
        {
            size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff));
            client->read_times = 1;
            if (readSize != sizeof(headerBuff))
            {
                LOG_WARN << "client send incomplete header" << LOG_ENDL;
                buffered_on_error(bufev, 0, arg);
                return;
            }
            evpHeader = (EVPHeader*)headerBuff;
            // check header
            if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST ||
                evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST ||
                evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
            {
                LOG_WARN << "client send invalid header" << LOG_ENDL;
                buffered_on_error(bufev, 0, arg);
                return;
            }
            if (client->recvbuff_max < evpHeader->size)
            {
                delete[] client->recvbuff;
                client->recvbuff = nullptr;
                client->recvbuff_max = 0;
            }
            if (client->recvbuff == nullptr)
            {
                uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
                client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
                client->recvbuff = new uint8_t[client->recvbuff_max];
            }
            memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
            client->recvbuff_end = sizeof(headerBuff);
            LOG_WARN << "client send incomplete header" << LOG_ENDL;
            buffered_on_error(bufev, 0, arg);
            return;
        }
        evpHeader = (EVPHeader*)headerBuff;
        // check header
        if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST ||
            evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST ||
            evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX)
        {
            LOG_WARN << "client send invalid header" << LOG_ENDL;
            buffered_on_error(bufev, 0, arg);
            return;
        }
        if (client->recvbuff_max < evpHeader->size)
        {
            delete[] client->recvbuff;
            client->recvbuff = nullptr;
            client->recvbuff_max = 0;
        }
        if (client->recvbuff == nullptr)
        {
            uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX;
            client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX);
            client->recvbuff = new uint8_t[client->recvbuff_max];
        }
        memcpy(client->recvbuff, headerBuff, sizeof(headerBuff));
        client->recvbuff_end = sizeof(headerBuff);
    }
    
    // read sub command or data
@@ -159,33 +177,37 @@
    if (evpHeader->size == client->recvbuff_end)
    {
        // call client proc
        bool closeClient = true;
        if (client->proc != nullptr)
        {
            EVClientStub cs(client->recvbuff, client->recvbuff_end);
            cs.id = client->fd;
            closeClient = !(client->proc(cs));
            client->toClose = !(client->proc(cs));
            
            if (cs.sendBuff != nullptr && cs.sendBuffSize > 0)
            {
                size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize);
                if (writeSize != cs.sendBuffSize)
                    LOG_WARN << "server send truncate " << (cs.sendBuffSize - writeSize) << " bytes" << LOG_ENDL;
                bufferevent_enable(client->buf_ev, EV_WRITE);
                if (writeSize != 0)
                    LOG_WARN << "server send truncate" << LOG_ENDL;
                if (cs.deleteSendBuff)
                {
                    delete[] cs.sendBuff;
                    cs.sendBuff = nullptr;
                }
            }
        }
        if (closeClient)
        {
            LOG_DEBUG << "server initiative close" << LOG_ENDL;
            buffered_on_error(bufev, 0, arg);
        }
        // reset client
        client->recvbuff_end = 0;
        client->read_times = 0;
    }
    else
    {
        LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size
            << ", recvbuff_end=" << client->recvbuff_end
            << ", read_times=" << client->read_times
            << LOG_ENDL;
    }
    
    // check read times
@@ -200,6 +222,23 @@
// We only provide this because libevent expects it, but we don't use it.
static void buffered_on_write(struct bufferevent *bufev, void *arg)
{
    struct EVClient *client = (struct EVClient *)arg;
    if (evbuffer_get_length(bufev->output) == 0)
    {
        bufferevent_disable(client->buf_ev, EV_WRITE);
        if (client->toClose)
        {
            LOG_DEBUG << "server initiative close" << LOG_ENDL;
            buffered_on_error(bufev, 0, arg);
        }
        else
        {
            //bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work
            //bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED);
            // flush socket
            shutdown(client->fd, SHUT_WR);
        }
    }
}
// This function will be called by libevent when there is a connection ready to be accepted.
@@ -217,6 +256,12 @@
    // Set the client socket to non-blocking mode.
    if (setnonblock(client_fd) < 0)
        LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL;
    if (setnodelay(client_fd) < 0)
        LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL;
    //if (setlinger(client_fd) < 0)
    //    LOG_WARN << "failed to set client socket linger" << LOG_ENDL;
    // We've accepted a new client, create a client object.
    struct EVClient* client = new EVClient;
FaceServer/face_daemon_proto.h
@@ -32,6 +32,14 @@
    uint8_t buff[0];
};
struct FDP_FaceDetectPB
{
    int32_t db_id;
    FDP_FaceDetectPB(int32_t _db_id) : db_id(_db_id)
    {}
};
struct FDP_FaceDetectResult
{
    int32_t db_id;
FaceServer/facelist-1001-0-2.pb
Binary files differ
FaceServer/facelist-1001-0-4.pb
Binary files differ
FaceServer/main_face_daemon.cpp
@@ -56,9 +56,10 @@
    //return true;
    
    EVPHeader* evpHeader = (EVPHeader*)client.recvBuff;
    FDP_FaceDetectPB* fdpFaceDetectPB = (FDP_FaceDetectPB*)(client.recvBuff + sizeof(EVPHeader));
    
    PbFaceList pbFaceList;
    pbFaceList.ParseFromArray(client.recvBuff + sizeof(EVPHeader), evpHeader->size - sizeof(EVPHeader));
    pbFaceList.ParseFromArray(client.recvBuff + sizeof(EVPHeader) + sizeof(FDP_FaceDetectPB), evpHeader->size - sizeof(EVPHeader) - sizeof(FDP_FaceDetectPB));
    LOGP(DEBUG, "pbFaceList: magic=%u, image_count=%u, src_width=%u, src_height=%u", 
        pbFaceList.magic(), pbFaceList.image_count(), pbFaceList.src_width(), pbFaceList.src_height());
@@ -85,9 +86,7 @@
    result.push_back(FDP_FaceDetectResult(2,456));
    result.push_back(FDP_FaceDetectResult(0,0));
    
    send_SensetimeFaceDetectResultJson(client, result);
    return false;
    return send_SensetimeFaceDetectResultJson(client, result);
}
bool ev_dispatcher_proto_pb(EVClientStub& client)
FaceServer/test_client.cpp
@@ -19,7 +19,8 @@
#include "ev_proto.h"
#include "face_daemon_proto.h"
void make_msg(char* mesg, int length)
/*
void make_msg(char* mesg, int& length)
{
    EVPHeader* evpHeader = new (mesg) EVPHeader;
    evpHeader->proto = EVPProto::EVPP_PROTOBUF;
@@ -30,11 +31,19 @@
    size_t fsize = fread(mesg + sizeof(EVPHeader), 1, length - sizeof(EVPHeader), pFile);
    fclose(pFile);
}
*/
void make_msg(char* mesg, int& length)
{
    FILE* pFile = fopen("facelist-1001-0-2.pb", "rb");
    length = fread(mesg, 1, length, pFile);
    fclose(pFile);
}
int main()
{
    // build the message to be sent
    int length = 19454; // the size of message
    int length = 1024 * 1024; // the size of message
    char* mesg = (char*)malloc((length+1)*sizeof(char)); // Look out the end mark '/0' of a C string
    if (mesg == NULL)
        exit(1);
@@ -75,16 +84,19 @@
    bufferevent_write(conn,mesg,length);
    // check the output evbuffer
    struct evbuffer* output = bufferevent_get_output(conn);
    int len = 0;
    len = evbuffer_get_length(output);
    printf("output buffer has %d bytes left\n", len);
    event_base_dispatch(base);
 
    //char readbuf[100*1024];
    //int readbufsize = read(fd, readbuf, sizeof(readbuf));
    //printf("read:\n%s\n", readbuf);
    char readbuf[10];
    int readbufsize = read(fd, readbuf, sizeof(readbuf));
    while(readbufsize>0)
    {
        readbuf[readbufsize] = '\0';
        printf("%s", readbuf);
        readbufsize = read(fd, readbuf, sizeof(readbuf));
    }
    printf("\n");
    free(mesg);
    mesg = NULL;