From 45e00aca28504b27f3ad6b4abf364c3d57f34510 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 22 二月 2021 14:05:28 +0800 Subject: [PATCH] lock free queue --- test_net_socket/test_net_mod_socket.cpp | 310 ++++++++++++++++++++++++++++----------------------- 1 files changed, 169 insertions(+), 141 deletions(-) diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index 8711bca..ab46f31 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -10,6 +10,8 @@ #define SCALE 100000 +static Logger *logger = LoggerFactory::getLogger(); + typedef struct Targ { net_node_t *node; char *nodelist; @@ -73,6 +75,11 @@ } } +void start_resycle() { + shm_mm_wrapper_start_resycle(); +} + + // 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅 void *print_sub_msg(void *sockt) { pthread_detach(pthread_self()); @@ -89,68 +96,83 @@ } + +void * bus_server; -void *bus_handler(void *sockt) { - // pthread_detach(pthread_self()); - - char action[512]; - while ( true) { - printf("Input action: Close?\n"); - if(scanf("%s",action) < 1) { - printf("Invalide action\n"); - continue; - } - - if(strcmp(action, "close") == 0) { - bus_server_socket_wrapper_close(sockt); - break; - } else { - printf("Invalide action\n"); - } - } - +static void stop_bus_handler(int sig) { + bus_server_socket_wrapper_stop(bus_server); } - void start_bus_server(argument_t &arg) { printf("Start bus server\n"); - void * server_socket = bus_server_socket_wrapper_open(); - pthread_t tid; - // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴bus - if(arg.interactive) - pthread_create(&tid, NULL, bus_handler, server_socket); + bus_server = bus_server_socket_wrapper_open(); + + signal(SIGINT, stop_bus_handler); + signal(SIGTERM, stop_bus_handler); - if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { + if(bus_server_socket_wrapper_start_bus(bus_server) != 0) { printf("start bus failed\n"); exit(1); } - if (pthread_join(tid, NULL) != 0) { - perror(" pthread_join"); + bus_server_socket_wrapper_close(bus_server); +} + +void *serverSockt; + + +static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { + char sendbuf[512]; + printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); + sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(serverSockt), (char *)recvbuf); + // buf 鍜� size鏄繑鍥炲�� + *sendbuf_ptr = sendbuf; + *sendsize_ptr = strlen(sendbuf) + 1; + //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺� + free(recvbuf); + return; +} + +bool stop = false; + +static void stop_replyserver_handler(int sig) { + printf("stop_handler\n"); + + int rv = net_mod_socket_stop(serverSockt); + if(rv ==0) { + logger->debug("send stop suc"); + return; + } else { + logger->debug("send stop fail.%s\n", bus_strerror(rv)); } } - - void start_reply(int mkey) { - printf("start reply\n"); - void *ser = net_mod_socket_open(); - net_mod_socket_bind(ser, mkey); - char sendbuf[512]; - int rv; - while(true) { - rv = net_mod_socket_recvandsend_timeout(ser, [&]( void *recvbuf, int recvsize, int key, void ** sendbuf_ptr, int *sendsize_ptr, void * user_data){ - printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); - sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); - // buf 鍜� size鏄繑鍥炲�� - *sendbuf_ptr = sendbuf; - *sendsize_ptr = strlen(sendbuf) + 1; - //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺� - free(recvbuf); - return; - }, 0, 2000000, NULL ); + logger->debug("start reply\n"); + signal(SIGINT, stop_replyserver_handler); + signal(SIGTERM, stop_replyserver_handler); + + serverSockt = net_mod_socket_open(); + net_mod_socket_bind(serverSockt, mkey); + + int rv = 0 ; + while( true) { + rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); + if (rv == 0) + continue; + if(rv == EBUS_STOPED) { + logger->debug("Stopping\n"); + break; + } + logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); + } + + // rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); + net_mod_socket_close(serverSockt); + logger->debug("stopted\n"); + // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); @@ -353,7 +375,8 @@ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; - fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); // fflush(stdout); } @@ -422,7 +445,7 @@ net_node_t *node_arr; int node_arr_size = parse_node_list(targ->nodelist, &node_arr); - char *topic = "news"; + const char *topic = "news"; // char filename[512]; // sprintf(filename, "test%d.tmp", targ->id); // FILE *fp = NULL; @@ -501,99 +524,6 @@ LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n); } net_mod_socket_close(client); -} - - -int main(int argc, char *argv[]) { - shm_mm_wrapper_init(512); - - argument_t opt = parse_args(argc, argv); - - // port = atoi(argv[2]); - - if(opt.fun == NULL) { - usage(argv[0]); - exit(1); - } - - if (strcmp("start_net_proxy", opt.fun) == 0 ) { - if(opt.port == 0) { - usage(argv[0]); - exit(1); - } - start_net_proxy(opt); - - } - else if (strcmp("start_bus_server", opt.fun) == 0) { - - start_bus_server(opt); - } - else if (strcmp("start_reply", opt.fun) == 0) { - if(opt.key == 0) { - usage(argv[0]); - exit(1); - } - start_reply(opt.key); - } - else if (strcmp("start_net_client", opt.fun) == 0) { - if(opt.sendlist == 0) { - fprintf(stderr, "Missing sendlist .\n"); - usage(argv[0]); - exit(1); - } - if(opt.publist == 0) { - fprintf(stderr, "Missing publist.\n"); - usage(argv[0]); - exit(1); - } - start_net_client(opt.sendlist, opt.publist); - } - else if (strcmp("one_sendto_many", opt.fun) == 0) { - if(opt.sendlist == 0) { - fprintf(stderr, "Missing sendlist .\n"); - usage(argv[0]); - exit(1); - } - - one_sendto_many(opt.sendlist); - } - else if (strcmp("test_net_sendandrecv", opt.fun) == 0) { - if(opt.sendlist == 0) { - fprintf(stderr, "Missing sendlist .\n"); - usage(argv[0]); - exit(1); - } - - test_net_sendandrecv(opt.sendlist); - } - else if (strcmp("test_net_pub_threads", opt.fun) == 0) { - if(opt.publist == 0) { - fprintf(stderr, "Missing publist .\n"); - usage(argv[0]); - exit(1); - } - - test_net_pub_threads(opt.publist); - } - else if (strcmp("test_net_pub", opt.fun) == 0) { - if(opt.publist == 0) { - fprintf(stderr, "Missing publist .\n"); - usage(argv[0]); - exit(1); - } - - test_net_pub(opt.publist); - } - - else { - usage(argv[0]); - exit(1); - - } - - printf("==========end========\n"); - shm_mm_wrapper_destroy(); - } @@ -773,3 +703,101 @@ } printf("============node list end==========\n"); } + + + + +int main(int argc, char *argv[]) { + shm_mm_wrapper_init(512); + + argument_t opt = parse_args(argc, argv); + + // port = atoi(argv[2]); + + if(opt.fun == NULL) { + usage(argv[0]); + exit(1); + } + + if (strcmp("start_net_proxy", opt.fun) == 0 ) { + if(opt.port == 0) { + usage(argv[0]); + exit(1); + } + start_net_proxy(opt); + + } + else if (strcmp("start_bus_server", opt.fun) == 0) { + + start_bus_server(opt); + } + else if (strcmp("start_reply", opt.fun) == 0) { + if(opt.key == 0) { + usage(argv[0]); + exit(1); + } + start_reply(opt.key); + } + else if (strcmp("start_net_client", opt.fun) == 0) { + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + if(opt.publist == 0) { + fprintf(stderr, "Missing publist.\n"); + usage(argv[0]); + exit(1); + } + start_net_client(opt.sendlist, opt.publist); + } + else if (strcmp("one_sendto_many", opt.fun) == 0) { + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + one_sendto_many(opt.sendlist); + } + else if (strcmp("test_net_sendandrecv", opt.fun) == 0) { + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_sendandrecv(opt.sendlist); + } + else if (strcmp("test_net_pub_threads", opt.fun) == 0) { + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub_threads(opt.publist); + } + else if (strcmp("test_net_pub", opt.fun) == 0) { + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub(opt.publist); + } + else if (strcmp("start_resycle", opt.fun) == 0) { + start_resycle(); + } + + else { + usage(argv[0]); + exit(1); + + } + + printf("==========end========\n"); + // shm_mm_wrapper_destroy(); + +} -- Gitblit v1.8.0