From aa2f3b2a9968bb4928463bdae05fb026d16b60bb Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 04 十二月 2020 19:07:01 +0800 Subject: [PATCH] 固定bus key --- test_net_socket/test_net_mod_socket.c | 209 ++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 155 insertions(+), 54 deletions(-) diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c index 15c9e55..08f3589 100644 --- a/test_net_socket/test_net_mod_socket.c +++ b/test_net_socket/test_net_mod_socket.c @@ -1,11 +1,15 @@ #include "net_mod_server_socket_wrapper.h" #include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + #include "shm_mm_wraper.h" #include "usg_common.h" #include <getopt.h> +#define SCALE 100000 + typedef struct Targ { - char *sendlist; + char *nodelist; int id; }Targ; @@ -24,6 +28,10 @@ void usage(char *name); int parse_node_list(char *str, net_node_t *node_arr_addr[]) ; void print_node_list(net_node_t *node_arr, int len); + + + +void * client; void start_net_proxy(int port) { printf("Start net proxy\n"); @@ -46,8 +54,38 @@ } + +void start_bus_server() { + printf("Start bus server\n"); + void * server_socket = bus_server_socket_wrapper_open(); + if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { + printf("start bus failed\n"); + exit(1); + } +} + + + +void start_reply(int key) { + printf("start reply\n"); + void *client = net_mod_socket_open(); + net_mod_socket_bind(client, key); + int size; + void *recvbuf; + char sendbuf[512]; + int rv; + int remote_port; + while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) { + // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); + sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); + net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port); + free(recvbuf); + } +} + + void start_net_client(char *sendlist, char*publist ){ - void * client = net_mod_socket_open(); + client = net_mod_socket_open(); char content[MAXLINE]; char action[512]; char topic[512]; @@ -87,9 +125,8 @@ if (fgets(content, MAXLINE, stdin) != NULL) { // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰� - n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, - strlen(content), &recv_arr, &recv_arr_size, 5000); - printf("send %d nodes\n", n); + n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); + printf(" %d nodes reply\n", n); for(i=0; i<recv_arr_size; i++) { printf("host:%s, port: %d, key:%d, content: %s\n", recv_arr[i].host, @@ -104,10 +141,10 @@ } } else if(strcmp(action, "desub") == 0) { - printf("Please input buskey and topic!\n"); + printf("Please input topic!\n"); - scanf("%d %s", &buskey, topic); - if (net_mod_socket_desub(client, topic, strlen(topic), buskey) == 0) { + scanf("%s", topic); + if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) { printf("%d Desub success!\n", net_mod_socket_get_key(client)); } else { printf("Desub failture!\n"); @@ -116,11 +153,10 @@ } else if(strcmp(action, "sub") == 0) { - printf("Please input buskey and topic!\n"); - scanf("%d %s",&buskey, topic); + printf("Please input topic!\n"); + scanf("%s",topic); - printf("===%d %s\n",buskey, topic); - if (net_mod_socket_sub(client, topic, strlen(topic), buskey) == 0) { + if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) { printf("%d Sub success!\n", net_mod_socket_get_key(client)); } else { printf("Sub failture!\n"); @@ -140,25 +176,25 @@ } + + -#define SCALE 100000 -void *runclient(void *arg) { + +void *_run_sendandrecv_(void *arg) { Targ *targ = (Targ *)arg; char sendbuf[512]; int i,j, n, recv_arr_size; net_mod_recv_msg_t *recv_arr; - + int total = 0; net_node_t *node_arr; - int node_arr_size = parse_node_list(targ->sendlist, &node_arr); + int node_arr_size = parse_node_list(targ->nodelist, &node_arr); - void * client = net_mod_socket_open(); - char filename[512]; sprintf(filename, "test%d.tmp", targ->id); FILE *fp = NULL; @@ -182,13 +218,14 @@ } // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + total += n; } fclose(fp); - net_mod_socket_close(client); - return (void *)i; + + return (void *)total; } -void start_net_mclient(char *sendlist) { +void test_net_sendandrecv_threads(char *nodelist) { int status, i = 0, processors = 4; void *res[processors]; @@ -199,12 +236,13 @@ struct timeval start, end; long total = 0; -printf("寮�濮嬫祴璇�...\n"); + client = net_mod_socket_open(); + printf("寮�濮嬫祴璇�...\n"); gettimeofday(&start, NULL); for (i = 0; i < processors; i++) { - targs[i].sendlist = sendlist; + targs[i].nodelist = nodelist; targs[i].id = i; - pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); + pthread_create(&tids[i], NULL, _run_sendandrecv_, (void *)&targs[i]); } for (i = 0; i < processors; i++) { @@ -223,34 +261,81 @@ long diffusec = difftime - diffsec*1000000; fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); // fflush(stdout); + net_mod_socket_close(client); } -void start_bus_server(int key) { - printf("Start bus server\n"); - void * server_socket = net_mod_socket_open(); - - net_mod_socket_bind(server_socket, key); - - net_mod_socket_start_bus(server_socket); -} + +void *_run_pub_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[512]; + + int i,j, n; + int total = 0; + + net_node_t *node_arr; + int node_arr_size = parse_node_list(targ->nodelist, &node_arr); + + char *topic = "news"; + + // char filename[512]; + // sprintf(filename, "test%d.tmp", targ->id); + // FILE *fp = NULL; + // fp = fopen(filename, "w+"); + // fp = stdout; -void start_reply(int key) { - printf("start reply\n"); - void *socket = net_mod_socket_open(); - net_mod_socket_bind(socket, key); - int size; - void *recvbuf; - char sendbuf[512]; - int rv; - int remote_port; - while ( (rv = net_mod_socket_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { - printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); - sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); - net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); - free(recvbuf); + + for (i = 0; i < SCALE; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + + n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); + // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + printf( "pub:%s to %d nodes\n", sendbuf, n); + total += n; } + // fclose(fp); + + return (void *)total; +} + +void test_net_pub_threads(char *nodelist) { + + int status, i = 0, processors = 4; + void *res[processors]; + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + Targ targs[processors]; + pthread_t tids[processors]; + char sendbuf[512]; + struct timeval start, end; + long total = 0; + client = net_mod_socket_open(); + +printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].nodelist = nodelist; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_pub_, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + net_mod_socket_close(client); } @@ -277,7 +362,7 @@ usage(argv[0]); exit(1); } - start_bus_server(opt.key); + start_bus_server(); } else if (strcmp("start_reply", opt.fun) == 0) { if(opt.key == 0) { @@ -299,15 +384,25 @@ } start_net_client(opt.sendlist, opt.publist); } - else if (strcmp("start_net_mclient", opt.fun) == 0) { + else if (strcmp("test_net_sendandrecv_threads", opt.fun) == 0) { if(opt.sendlist == 0) { fprintf(stderr, "Missing sendlist .\n"); usage(argv[0]); exit(1); } - start_net_mclient(opt.sendlist); + test_net_sendandrecv_threads(opt.sendlist); } + else if (strcmp("test_net_pub_threads", opt.fun) == 0) { + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub_threads(opt.publist); + } + else { usage(argv[0]); exit(1); @@ -327,6 +422,8 @@ fpe("-f, --funciton Function name\n"); fpe("-p, --port TCP/IP Port\n"); fpe("-k, --key SHM Key\n"); + fpe("--sendlist format锛�--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n"); + fpe("--publist format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n"); fpe("\n"); } @@ -345,7 +442,6 @@ usage(argv[0]); exit(0); } - argument_t mopt = {}; @@ -461,11 +557,16 @@ net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); for(i = 0; i < entry_arr_len; i++) { property_arr_len = str_split(entry_arr[i], ":", &property_arr); - printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); - node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), atoi(property_arr[2])}; - free(entry_arr[i]); + // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); + node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; + free(property_arr[1]); - free(property_arr[2]); + if(property_arr_len == 3) { + node_arr[i].key = atoi(property_arr[2]); + free(property_arr[2]); + } + free(entry_arr[i]); + } *node_arr_addr = node_arr; -- Gitblit v1.8.0