From bc2afe32e8db4318f2a2adea49d85b10d0d4cc97 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 08 二月 2021 16:51:24 +0800 Subject: [PATCH] update --- test_net_socket/test_net_mod_socket.cpp | 114 +++++++++++++++++++++++++++++++++------------------------ 1 files changed, 66 insertions(+), 48 deletions(-) diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index 8711bca..05827a1 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -10,6 +10,8 @@ #define SCALE 100000 +static Logger *logger = LoggerFactory::getLogger(); + typedef struct Targ { net_node_t *node; char *nodelist; @@ -89,68 +91,83 @@ } + +void * bus_server; -void *bus_handler(void *sockt) { - // pthread_detach(pthread_self()); - - char action[512]; - while ( true) { - printf("Input action: Close?\n"); - if(scanf("%s",action) < 1) { - printf("Invalide action\n"); - continue; - } - - if(strcmp(action, "close") == 0) { - bus_server_socket_wrapper_close(sockt); - break; - } else { - printf("Invalide action\n"); - } - } - +static void stop_bus_handler(int sig) { + bus_server_socket_wrapper_stop(bus_server); } - void start_bus_server(argument_t &arg) { printf("Start bus server\n"); - void * server_socket = bus_server_socket_wrapper_open(); - pthread_t tid; - // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴bus - if(arg.interactive) - pthread_create(&tid, NULL, bus_handler, server_socket); + bus_server = bus_server_socket_wrapper_open(); + + signal(SIGINT, stop_bus_handler); + signal(SIGTERM, stop_bus_handler); - if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { + if(bus_server_socket_wrapper_start_bus(bus_server) != 0) { printf("start bus failed\n"); exit(1); } - if (pthread_join(tid, NULL) != 0) { - perror(" pthread_join"); + bus_server_socket_wrapper_close(bus_server); +} + +void *serverSockt; + + +static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { + char sendbuf[512]; + printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); + sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(serverSockt), (char *)recvbuf); + // buf 鍜� size鏄繑鍥炲�� + *sendbuf_ptr = sendbuf; + *sendsize_ptr = strlen(sendbuf) + 1; + //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺� + free(recvbuf); + return; +} + +bool stop = false; + +static void stop_replyserver_handler(int sig) { + printf("stop_handler\n"); + + int rv = net_mod_socket_stop(serverSockt); + if(rv ==0) { + logger->debug("send stop suc"); + return; + } else { + logger->debug("send stop fail.%s\n", bus_strerror(rv)); } } - - void start_reply(int mkey) { - printf("start reply\n"); - void *ser = net_mod_socket_open(); - net_mod_socket_bind(ser, mkey); - char sendbuf[512]; - int rv; - while(true) { - rv = net_mod_socket_recvandsend_timeout(ser, [&]( void *recvbuf, int recvsize, int key, void ** sendbuf_ptr, int *sendsize_ptr, void * user_data){ - printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); - sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); - // buf 鍜� size鏄繑鍥炲�� - *sendbuf_ptr = sendbuf; - *sendsize_ptr = strlen(sendbuf) + 1; - //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺� - free(recvbuf); - return; - }, 0, 2000000, NULL ); + logger->debug("start reply\n"); + signal(SIGINT, stop_replyserver_handler); + signal(SIGTERM, stop_replyserver_handler); + + serverSockt = net_mod_socket_open(); + net_mod_socket_bind(serverSockt, mkey); + + int rv = 0 ; + while( true) { + rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); + if (rv == 0) + continue; + if(rv == EBUS_STOPED) { + logger->debug("Stopping\n"); + break; + } + logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); + } + + // rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); + net_mod_socket_close(serverSockt); + logger->debug("stopted\n"); + // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); @@ -353,7 +370,8 @@ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; - fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); // fflush(stdout); } @@ -422,7 +440,7 @@ net_node_t *node_arr; int node_arr_size = parse_node_list(targ->nodelist, &node_arr); - char *topic = "news"; + const char *topic = "news"; // char filename[512]; // sprintf(filename, "test%d.tmp", targ->id); // FILE *fp = NULL; -- Gitblit v1.8.0