#include "ev_server.h" #include "ev_proto.h" #include #include #include #include #include #include #include // for TCP_NODELAY #include #include #include #include #include #include #include #include // A struct for client specific data, also includes pointer to create a list of clients. struct EVClient { // The clients socket. int fd; // The bufferedevent for this client. struct bufferevent *buf_ev; uint8_t* recvbuff; size_t recvbuff_end; size_t recvbuff_max; size_t read_times; // read times for a command bool toClose; evclient_proc_t proc; EVClient() : fd(-1), buf_ev(nullptr), recvbuff(nullptr), recvbuff_end(0), recvbuff_max(0), read_times(0), toClose(false), proc(nullptr) { } }; #ifndef USER_DEFINE_EVCLIENT_PROC evclient_proc_t evclient_proc = nullptr; #endif // Set a socket to non-blocking mode. static int setnodelay(int fd) { int flag = 1; return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); } static int setnonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int setlinger(int fd) { struct linger so_linger; so_linger.l_onoff = 1; so_linger.l_linger = 0; return setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); } // Called by libevent when there is an error on the underlying socket descriptor. static void buffered_on_error(struct bufferevent *bufev, short what, void *arg) { struct EVClient *client = (struct EVClient *)arg; if (what & EVBUFFER_EOF) { //Client disconnected, remove the read event and the free the client structure. LOG_INFO << "Client disconnected." << LOG_ENDL; } else { LOG_WARN << "Client socket error, disconnecting." << LOG_ENDL; } bufferevent_free(client->buf_ev); close(client->fd); delete[] client->recvbuff; delete client; } // Called by libevent when there is data to read. static void buffered_on_read(struct bufferevent *bufev, void *arg) { struct EVClient *client = (struct EVClient *)arg; // Write back the read buffer. It is important to note that // bufferevent_write_buffer will drain the incoming data so it // is effectively gone after we call it. //LOG_DEBUG << (char*)bufev->input << LOG_ENDL; //bufferevent_write_buffer(bufev, bufev->input); //char buff[100] = {'\0'}; //size_t readSize = bufferevent_read(bufev, buff, sizeof(buff)); //LOG_DEBUG << "readSize=" << readSize << "\t" << buff << LOG_ENDL; EVPHeader* evpHeader = (EVPHeader*)client->recvbuff; // read header if expected if (client->recvbuff_end == 0) { uint8_t headerBuff[sizeof(EVPHeader)]; size_t readSize = bufferevent_read(bufev, headerBuff, sizeof(headerBuff)); client->read_times = 1; if (readSize != sizeof(headerBuff)) { LOG_WARN << "client send incomplete header" << LOG_ENDL; buffered_on_error(bufev, 0, arg); return; } evpHeader = (EVPHeader*)headerBuff; evpHeader->ntoh(); // check header if (evpHeader->proto <= EVPProto::EVPP__FIRST || evpHeader->proto >= EVPProto::EVPP__LAST || evpHeader->cmd <= EVPCommand::EVPC__FIRST || evpHeader->cmd >= EVPCommand::EVPC__LAST || evpHeader->size < sizeof(EVPHeader) || evpHeader->size > CLIENT_BUFFER_MAX) { LOG_WARN << "client send invalid header" << LOG_ENDL; buffered_on_error(bufev, 0, arg); return; } if (client->recvbuff_max < evpHeader->size) { delete[] client->recvbuff; client->recvbuff = nullptr; client->recvbuff_max = 0; } if (client->recvbuff == nullptr) { uint32_t _CLIENT_BUFFER_MAX = CLIENT_BUFFER_MAX; client->recvbuff_max = std::min(evpHeader->size, _CLIENT_BUFFER_MAX); client->recvbuff = new uint8_t[client->recvbuff_max]; } memcpy(client->recvbuff, headerBuff, sizeof(headerBuff)); client->recvbuff_end = sizeof(headerBuff); } // read sub command or data size_t readSize = 0; do { readSize = bufferevent_read(bufev, client->recvbuff + client->recvbuff_end, client->recvbuff_max - client->recvbuff_end); if (readSize == 0) break; else client->read_times++; client->recvbuff_end += readSize; } while (readSize > 0); // test if command complete if (evpHeader->size == client->recvbuff_end) { // call client proc if (client->proc != nullptr) { EVClientStub cs(client->recvbuff, client->recvbuff_end); cs.id = client->fd; client->toClose = !(client->proc(cs)); if (cs.sendBuff != nullptr && cs.sendBuffSize > 0) { size_t writeSize = bufferevent_write(bufev, cs.sendBuff, cs.sendBuffSize); bufferevent_enable(client->buf_ev, EV_WRITE); if (writeSize != 0) LOG_WARN << "server send truncate" << LOG_ENDL; if (cs.deleteSendBuff) { delete[] cs.sendBuff; cs.sendBuff = nullptr; } } } // reset client client->recvbuff_end = 0; client->read_times = 0; } else { //LOG_WARN << "recvbuff incomplete, evpHeader.size=" << evpHeader->size // << ", recvbuff_end=" << client->recvbuff_end // << ", read_times=" << client->read_times // << LOG_ENDL; } // check read times if (client->read_times > CLIENT_READ_TIMES_MAX) { LOG_WARN << "client read times to max" << LOG_ENDL; buffered_on_error(bufev, 0, arg); } } // Called by libevent when the write buffer reaches 0. // We only provide this because libevent expects it, but we don't use it. static void buffered_on_write(struct bufferevent *bufev, void *arg) { struct EVClient *client = (struct EVClient *)arg; if (evbuffer_get_length(bufev->output) == 0) { bufferevent_disable(client->buf_ev, EV_WRITE); if (client->toClose) { LOG_DEBUG << "server initiative close" << LOG_ENDL; buffered_on_error(bufev, 0, arg); } else { //bufferevent_flush(bufev, EV_WRITE, BEV_FLUSH);//#todo not work //bufferevent_flush(bufev, EV_WRITE, BEV_FINISHED); // flush socket shutdown(client->fd, SHUT_WR); } } } // This function will be called by libevent when there is a connection ready to be accepted. static void on_accept(int fd, short ev, void *arg) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); if (client_fd < 0) { LOG_WARN << "accept failed" << LOG_ENDL; return; } // Set the client socket to non-blocking mode. if (setnonblock(client_fd) < 0) LOG_WARN << "failed to set client socket non-blocking" << LOG_ENDL; if (setnodelay(client_fd) < 0) LOG_WARN << "failed to set client socket no-delay" << LOG_ENDL; //if (setlinger(client_fd) < 0) // LOG_WARN << "failed to set client socket linger" << LOG_ENDL; // We've accepted a new client, create a client object. struct EVClient* client = new EVClient; if (client == NULL) { LOG_ERROR << "malloc failed" << LOG_ENDL; } client->fd = client_fd; client->proc = evclient_proc; // Create the buffered event. client->buf_ev = bufferevent_new(client_fd, buffered_on_read, buffered_on_write, buffered_on_error, client); // We have to enable it before our callbacks will be called. bufferevent_enable(client->buf_ev, EV_READ); LOG_INFO << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << LOG_ENDL; } int server_main(int argc, char **argv) { LOG_NOTICE << "server_main" << LOG_ENDL; // Initialize libevent. event_init(); // Create our listening socket. int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { LOG_ERROR << "create socket failed" << LOG_ENDL; return EXIT_FAILURE; } struct sockaddr_in listen_addr; memset(&listen_addr, 0, sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port = htons(SERVER_PORT); int reuseaddr_on = REUSEADDR_ON; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); if (setnonblock(listen_fd) < 0) { LOG_ERROR << "failed to set server socket to non-blocking" << LOG_ENDL; return EXIT_FAILURE; } if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) { LOG_ERROR << "bind failed" << LOG_ENDL; return EXIT_FAILURE; } if (listen(listen_fd, 5) < 0) { LOG_ERROR << "listen failed" << LOG_ENDL; return EXIT_FAILURE; } // Set the socket to non-blocking, this is essential in event based programming with libevent. // We now have a listening socket, we create a read event to be notified when a client connects. struct event ev_accept; event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); event_add(&ev_accept, NULL); // Start the event loop. event_dispatch(); close(listen_fd); listen_fd = 0; return EXIT_SUCCESS; } void server_stop() { LOG_NOTICE << "server_stop" << LOG_ENDL; event_loopexit(NULL); } void ev_send_status_packet(EVClientStub& client, EVPStatus::EVPS status) { client.sendBuffSize = sizeof(EVPHeader)+sizeof(EVP_Status); client.sendBuff = new uint8_t[client.sendBuffSize]; client.deleteSendBuff = true; EVPHeader* evpHeader = new (client.sendBuff) EVPHeader; evpHeader->cmd = EVPCommand::EVPC_STATUS; evpHeader->size = client.sendBuffSize; evpHeader->hton(); EVP_Status* evpStatus = new (client.sendBuff + sizeof(EVPHeader)) EVP_Status; evpStatus->status = status; evpHeader->hton(); }