From 2561a007b8d8999a4750046d0cfb3b1ad5af50ac Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 09 四月 2024 15:29:32 +0800 Subject: [PATCH] test for perf --- test_net_socket/shm_util.cpp | 298 +++++++++++++++++++++++++++++++++++++++++------------------ 1 files changed, 205 insertions(+), 93 deletions(-) diff --git a/test_net_socket/shm_util.cpp b/test_net_socket/shm_util.cpp index 34efbc4..1b3dca3 100644 --- a/test_net_socket/shm_util.cpp +++ b/test_net_socket/shm_util.cpp @@ -21,6 +21,8 @@ struct argument_t { bool interactive; + bool force; + int bind; int port; int key; char *sendlist; @@ -42,7 +44,7 @@ pthread_detach(pthread_self()); char action[512]; - while ( true) { + while ( true ) { printf("Input action: Close?\n"); if(scanf("%s",action) < 1) { printf("Invalide action\n"); @@ -147,13 +149,18 @@ } } -void start_reply(int mkey) { +void start_recvfrom(int mkey, bool force) { logger->debug("start reply\n"); signal(SIGINT, stop_replyserver_handler); signal(SIGTERM, stop_replyserver_handler); serverSockt = net_mod_socket_open(); - net_mod_socket_bind(serverSockt, mkey); + if(force) { + net_mod_socket_force_bind(serverSockt, mkey); + } else { + net_mod_socket_bind(serverSockt, mkey); + } + int rv = 0 ; while( true) { @@ -163,6 +170,9 @@ if(rv == EBUS_STOPED) { logger->debug("Stopping\n"); break; + } else if(rv == EBUS_KEY_INUSED){ + printf("key宸茬粡琚崰鐢╘n"); + exit(1); } logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); @@ -190,6 +200,8 @@ int recv_arr_size, i, n; net_mod_recv_msg_t *recv_arr; + net_mod_err_t *errarr; + int errarr_size = 0; pthread_t tid; // 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭� @@ -224,19 +236,31 @@ if (fgets(content, MAXLINE, stdin) != NULL) { // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰� // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); - n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000); printf(" %d nodes reply\n", n); - for(i=0; i<recv_arr_size; i++) { - printf("reply from (host:%s, port: %d, key:%d) >> %s\n", - recv_arr[i].host, - recv_arr[i].port, - recv_arr[i].key, - (char *)recv_arr[i].content - ); - } + + if(recv_arr_size > 0) { + for(i=0; i<recv_arr_size; i++) { + printf("reply from (host:%s, port: %d, key:%d) >> %s\n", + recv_arr[i].host, + recv_arr[i].port, + recv_arr[i].key, + (char *)recv_arr[i].content + ); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); + } } } else if(strcmp(action, "desub") == 0) { @@ -280,10 +304,12 @@ Targ *targ = (Targ *)arg; char sendbuf[128]; - int j, n; - int recv_arr_size; + int i, j, n; + int recv_arr_size = 0; net_mod_recv_msg_t *recv_arr; int total = 0; + net_mod_err_t *errarr; + int errarr_size = 0; int rkey, lkey; unsigned int l = 0 , rl; @@ -302,30 +328,42 @@ sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); // fprintf(fp, "requst:%s\n", sendbuf); // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); - n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); + n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1); printf("%d: send %d nodes\n", l, n); - for(j=0; j < recv_arr_size; j++) { - fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", - net_mod_socket_get_key(client), - sendbuf, - targ->node->key, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key, - (char *)recv_arr[j].content - ); + if(recv_arr_size > 0) { + for(j=0; j < recv_arr_size; j++) { + fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", + net_mod_socket_get_key(client), + sendbuf, + targ->node->key, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); - printf("key == %d\n", net_mod_socket_get_key(client)); - assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); - assert(targ->node->key == rkey); - assert(net_mod_socket_get_key(client) == lkey); - assert(rl == l); + printf("key == %d\n", net_mod_socket_get_key(client)); + assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); + assert(targ->node->key == rkey); + assert(net_mod_socket_get_key(client) == lkey); + assert(rl == l); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); + } + total += n; } + if(fp != NULL) fclose(fp); // net_mod_socket_close(client); @@ -374,7 +412,7 @@ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; - fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + fprintf(stderr,"鍙戦�佹暟鐩�:%d, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); // fflush(stdout); @@ -383,11 +421,14 @@ // 鏃犻檺寰幆send void test_net_sendandrecv(char *nodelist) { - int n, j; + int i, n, j; void * client; int recv_arr_size; net_mod_recv_msg_t *recv_arr; net_node_t *node_arr; + net_mod_err_t *errarr; + int errarr_size = 0; + int node_arr_size = parse_node_list(nodelist, &node_arr); char buf[128]; pid_t pid, retPid ; @@ -403,30 +444,35 @@ while(true) { sprintf(buf, hello_format, pid, l); n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, - &recv_arr, &recv_arr_size, 1000); + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000); printf(" %d nodes reply\n", n); - for(j = 0; j < recv_arr_size; j++) { - printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", - (long)pid, - buf, - (char *)recv_arr[j].content, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key + if(recv_arr_size > 0) { + for(j = 0; j < recv_arr_size; j++) { + printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", + (long)pid, + buf, + (char *)recv_arr[j].content, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key + ); + assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); + assert(retPid == pid); + assert(retl == l); + assert(remoteKey == recv_arr[j].key); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } - ); - - - - assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); - assert(retPid == pid); - assert(retl == l); - assert(remoteKey == recv_arr[j].key); + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); l++; } @@ -513,7 +559,7 @@ net_node_t *node_arr; int node_arr_size = parse_node_list(nodelist, &node_arr); - char *topic = "news"; + const char *topic = "news"; sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid()); void * client = net_mod_socket_open(); @@ -526,10 +572,34 @@ } void list () { + LockFreeQueue<shm_packet_t> * mqueue; hashtable_t *hashtable = mm_get_hashtable(); + printf("%10s \t %-10s \t %10s\n", "KEY", "LENGTH", "STATUS"); hashtable_foreach(hashtable, [&](int key, void * value){ - printf("%d\n", key); + if(key >= 100 ) { + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key); + if((long)mqueue == 0x1) { + printf("%10d \t %-10s\n", key, "Not In Used"); + } else { + printf("%10d \t %-10d\n", key, mqueue->size()); + } + + } else { + printf("%10d\n", key); + } + }); +} + +void info(int key) { + LockFreeQueue<shm_packet_t> * mqueue; + hashtable_t *hashtable = mm_get_hashtable(); + mqueue = (LockFreeQueue<shm_packet_t> *) hashtable_get(hashtable, key); + printf("%10s: %-10p\n", "PTR", mqueue); + printf("%10s: %-10d\n", "KEY", key); + printf("%10s: %-10d\n", "LENGTH", mqueue->size()); + + } @@ -543,31 +613,45 @@ } } -void do_sendandrecv(int key, char *sendbuf) { - int n, j; +void do_sendandrecv(char *sendlist, char *sendbuf) { + int i, n, j; int recv_arr_size; net_mod_recv_msg_t *recv_arr; + net_mod_err_t *errarr; + int errarr_size = 0; - net_node_t node_arr[] = {NULL, 0, key}; + + net_node_t *node_arr; + int node_arr_size = parse_node_list(sendlist, &node_arr); + + print_node_list(node_arr, node_arr_size); void * client = net_mod_socket_open(); - n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5); - if(n == 0) { - printf("send failed\n"); - return; - } - printf(" %d nodes reply\n", n); - for(j=0; j < recv_arr_size; j++) { + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 5000); + + printf(" %d nodes reply\n", recv_arr_size); + if(recv_arr_size > 0) { + for(j=0; j < recv_arr_size; j++) { - fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n", - net_mod_socket_get_key(client), - sendbuf, - key, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key, - (char *)recv_arr[j].content - ); + fprintf(stdout, "===> suc: %d send '%s'. received from (host=%s, port= %d, key=%d), '%s'\n\n", + net_mod_socket_get_key(client), + sendbuf, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } +// printf("errarr_size = %d\n", errarr_size); + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("===> error: (host:%s, port: %d, key:%d). %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); } net_mod_socket_close(client); @@ -596,6 +680,8 @@ fpe("./shm_util list\n"); fpe("# remove key 1001\n"); fpe("./shm_util rm 1001\n"); + fpe("./shm_util info 1002\n"); + fpe("./shm_util recvfrom --bind 1002 [--force]\n") fpe("\n"); } @@ -627,6 +713,8 @@ {"key", required_argument, 0, 'k'}, {"port", required_argument, 0, 'p'}, {"interactive", no_argument, 0, 'i'}, + {"force", no_argument, 0, 'f'}, + {"bind", required_argument, (int *)mopt.bind, 0}, {"sendlist", required_argument, (int *)mopt.sendlist, 0}, {"publist", required_argument, (int *)mopt.publist, 0}, {0, 0, 0, 0} @@ -637,7 +725,7 @@ { - c = getopt_long (argc, argv, "+f:k:p:i", long_options, &option_index); + c = getopt_long (argc, argv, "+fk:p:i", long_options, &option_index); /* Detect the end of the options. */ if (c == -1) @@ -656,6 +744,9 @@ else if(strcmp(long_options[option_index].name, "publist") == 0) { mopt.publist = optarg; } + else if(strcmp(long_options[option_index].name, "bind") == 0) { + mopt.bind = atoi(optarg); + } else { printf ("option %s", long_options[option_index].name); if (optarg) @@ -671,6 +762,10 @@ case 'i': mopt.interactive = true; + break; + + case 'f': + mopt.force = true; break; case 'p': @@ -726,6 +821,10 @@ net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); for(i = 0; i < entry_arr_len; i++) { + if(strchr(entry_arr[i], ':') == NULL) { + node_arr[i]= {NULL, 0, atoi(entry_arr[i])}; + continue; + } property_arr_len = str_split(entry_arr[i], ":", &property_arr); printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); @@ -760,7 +859,7 @@ int i; char *prog; char * fun; - argument_t opt; + argument_t opt = {}; shm_mm_wrapper_init(512); @@ -780,24 +879,38 @@ else if (strcmp("list", fun) == 0 ) { list(); } + else if (strcmp("info", fun) == 0 ) { + if(argc < 2) { + + usage(prog); + + } else { + for(i = 1; i < argc; i++) { + int key = atoi(argv[i]); + info(key); + } + } + } else if (strcmp("rm", fun) == 0 ) { if(argc < 2) { usage(prog); - exit(1); + + } else { + for(i = 1; i < argc; i++) { + int key = atoi(argv[i]); + remove(key); + } } - for(i = 1; i < argc; i++) { - int key = atoi(argv[i]); - remove(key); - } + } else if (strcmp("sendandrecv", fun) == 0 ) { if(argc < 3) { usage(prog); exit(1); } - int key = atoi(argv[1]); + char *sendlist = argv[1]; char *content = argv[2]; - do_sendandrecv(key, content); + do_sendandrecv(sendlist, content); } else if (strcmp("start_bus_server", fun) == 0) { @@ -818,14 +931,14 @@ } - else if (strcmp("start_reply", fun) == 0) { + else if (strcmp("recvfrom", fun) == 0) { opt = parse_args(argc, argv); - opt = parse_args(argc, argv); - if(opt.key == 0) { + if(opt.bind == 0) { usage(argv[0]); - exit(1); + } else { + start_recvfrom(opt.bind, opt.force); } - start_reply(opt.key); + } else if (strcmp("start_net_client", fun) == 0) { opt = parse_args(argc, argv); @@ -883,13 +996,12 @@ } else { - printf("%Invalid funciton name\n"); + printf("Invalid funciton name\n"); usage(argv[0]); exit(1); } - printf("==========end========\n"); - // shm_mm_wrapper_destroy(); + shm_mm_wrapper_destroy(); } -- Gitblit v1.8.0