From b90ba316b54db321d3e8aaac7df93b46d80b9d9c Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期二, 22 十二月 2020 15:47:41 +0800 Subject: [PATCH] 三个没有回收的信号 --- test_net_socket/test_net_mod_socket.c | 633 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 633 insertions(+), 0 deletions(-) diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c new file mode 100644 index 0000000..f777773 --- /dev/null +++ b/test_net_socket/test_net_mod_socket.c @@ -0,0 +1,633 @@ +#include "net_mod_server_socket_wrapper.h" +#include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + +#include "shm_mm_wraper.h" +#include "usg_common.h" +#include <getopt.h> + +#define SCALE 100000 + +typedef struct Targ { + char *nodelist; + int id; + +}Targ; + +struct argument_t { + char *fun; + int port; + int key; + char *sendlist; + char *publist; + char **cmd_arr; + int cmd_arr_len; +}; + +argument_t parse_args (int argc, char *argv[]); +void usage(char *name); +int parse_node_list(char *str, net_node_t *node_arr_addr[]) ; +void print_node_list(net_node_t *node_arr, int len); + + + +void * client; + +void *proxy_server_handler(void *sockt) { + pthread_detach(pthread_self()); + + char action[512]; + while ( true) { + printf("Input action: Close?\n"); + if(scanf("%s",action) < 1) { + printf("Invalide action\n"); + continue; + } + + if(strcmp(action, "close") == 0) { + net_mod_server_socket_close(sockt); + break; + } else { + printf("Invalide action\n"); + } + } +} + +void start_net_proxy(int port) { + pthread_t tid; + printf("Start net proxy\n"); + void *serverSocket = net_mod_server_socket_open(port); + + // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴server + pthread_create(&tid, NULL, proxy_server_handler, serverSocket); + if(net_mod_server_socket_start(serverSocket) != 0) { + err_exit(errno, "net_mod_server_socket_start"); + } +} + +// 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅 +void *print_sub_msg(void *sockt) { + pthread_detach(pthread_self()); + void *recvbuf; + int size; + int key; + while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) { + printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); + free(recvbuf); + } + +} + + +void *bus_handler(void *sockt) { + pthread_detach(pthread_self()); + + char action[512]; + while ( true) { + printf("Input action: Close?\n"); + if(scanf("%s",action) < 1) { + printf("Invalide action\n"); + continue; + } + + if(strcmp(action, "close") == 0) { + bus_server_socket_wrapper_close(sockt); + break; + } else { + printf("Invalide action\n"); + } + } + +} + + + +void start_bus_server() { + printf("Start bus server\n"); + void * server_socket = bus_server_socket_wrapper_open(); + pthread_t tid; + // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴bus + // pthread_create(&tid, NULL, bus_handler, server_socket); + if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { + printf("start bus failed\n"); + exit(1); + } +} + + + +void start_reply(int key) { + printf("start reply\n"); + void *client = net_mod_socket_open(); + net_mod_socket_bind(client, key); + int size; + void *recvbuf; + char sendbuf[512]; + int rv; + int remote_port; + while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) { + // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); + sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); + net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port); + free(recvbuf); + } +} + +// 浜や簰寮忓鎴风 +void start_net_client(char *sendlist, char*publist ){ + client = net_mod_socket_open(); + char content[MAXLINE]; + char action[512]; + char topic[512]; + int buskey; + + int recv_arr_size, i, n; + net_mod_recv_msg_t *recv_arr; + + pthread_t tid; + // 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭� + pthread_create(&tid, NULL, print_sub_msg, client); + + //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11 + net_node_t *node_arr; + int node_arr_size = parse_node_list(sendlist, &node_arr); + // print_node_list(node_arr, node_arr_size); + + //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8 + net_node_t *pub_node_arr; + int pub_node_arr_size = parse_node_list(publist, &pub_node_arr); + // print_node_list(pub_node_arr, pub_node_arr_size); + + while (true) { + //printf("Usage: pub <topic> [content] or sub <topic>\n"); + printf("Can I help you? pub,sub,desub,send or quit\n"); + scanf("%s",action); + + if(strcmp(action, "pub") == 0) { + printf("Please input topic and content\n"); + scanf("%s %s", topic, content); + + n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1); + printf("pub %d nodes\n", n); + } + else if(strcmp(action, "send") == 0) { + getc(stdin); + printf("Please input content\n"); + + if (fgets(content, MAXLINE, stdin) != NULL) { + // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰� + n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); + printf(" %d nodes reply\n", n); + for(i=0; i<recv_arr_size; i++) { + printf("host:%s, port: %d, key:%d, content: %s\n", + recv_arr[i].host, + recv_arr[i].port, + recv_arr[i].key, + recv_arr[i].content + ); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } + } + else if(strcmp(action, "desub") == 0) { + printf("Please input topic!\n"); + + scanf("%s", topic); + if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) { + printf("%d Desub success!\n", net_mod_socket_get_key(client)); + } else { + printf("Desub failture!\n"); + exit(0); + } + + } + else if(strcmp(action, "sub") == 0) { + printf("Please input topic!\n"); + scanf("%s",topic); + + if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) { + printf("%d Sub success!\n", net_mod_socket_get_key(client)); + } else { + printf("Sub failture!\n"); + exit(0); + } + + } + else if(strcmp(action, "quit") == 0) { + break; + } else { + printf("error input argument\n"); + continue; + } + + } + net_mod_socket_close(client); + + +} + +void *_run_sendandrecv_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[512]; + + int i,j, n, recv_arr_size; + net_mod_recv_msg_t *recv_arr; + int total = 0; + + + net_node_t *node_arr; + int node_arr_size = parse_node_list(targ->nodelist, &node_arr); + + + char filename[512]; + sprintf(filename, "test%d.tmp", targ->id); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + // fp = stdout; + + int recvsize; + void *recvbuf; + for (i = 0; i < SCALE; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + fprintf(fp, "requst:%s\n", sendbuf); + n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + //printf("send %d nodes\n", n); + for(j=0; j < recv_arr_size; j++) { + fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + recv_arr[j].content + ); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + total += n; + } + fclose(fp); + + return (void *)total; +} + +void test_net_sendandrecv_threads(char *nodelist) { + + int status, i = 0, processors = 4; + void *res[processors]; + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + Targ targs[processors]; + pthread_t tids[processors]; + char sendbuf[512]; + struct timeval start, end; + long total = 0; + + client = net_mod_socket_open(); + printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].nodelist = nodelist; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_sendandrecv_, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + net_mod_socket_close(client); +} + + +void *_run_pub_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[512]; + + int i,j, n; + int total = 0; + + net_node_t *node_arr; + int node_arr_size = parse_node_list(targ->nodelist, &node_arr); + + char *topic = "news"; + + + + // char filename[512]; + // sprintf(filename, "test%d.tmp", targ->id); + // FILE *fp = NULL; + // fp = fopen(filename, "w+"); + // fp = stdout; + + + for (i = 0; i < SCALE; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + + n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); + // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + printf( "pub:%s to %d nodes\n", sendbuf, n); + total += n; + } + // fclose(fp); + + return (void *)total; +} + +void test_net_pub_threads(char *nodelist) { + + int status, i = 0, processors = 4; + void *res[processors]; + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + Targ targs[processors]; + pthread_t tids[processors]; + char sendbuf[512]; + struct timeval start, end; + long total = 0; + client = net_mod_socket_open(); + +printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].nodelist = nodelist; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_pub_, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + net_mod_socket_close(client); +} + + + + +int main(int argc, char *argv[]) { + shm_mm_wrapper_init(512); + + argument_t opt = parse_args(argc, argv); + + // port = atoi(argv[2]); + + if(opt.fun == NULL) { + usage(argv[0]); + exit(1); + } + + if (strcmp("start_net_proxy", opt.fun) == 0 ) { + if(opt.port == 0) { + usage(argv[0]); + exit(1); + } + start_net_proxy(opt.port); + + } + else if (strcmp("start_bus_server", opt.fun) == 0) { + + start_bus_server(); + } + else if (strcmp("start_reply", opt.fun) == 0) { + if(opt.key == 0) { + usage(argv[0]); + exit(1); + } + start_reply(opt.key); + } + else if (strcmp("start_net_client", opt.fun) == 0) { + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + if(opt.publist == 0) { + fprintf(stderr, "Missing publist.\n"); + usage(argv[0]); + exit(1); + } + start_net_client(opt.sendlist, opt.publist); + } + else if (strcmp("test_net_sendandrecv_threads", opt.fun) == 0) { + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_sendandrecv_threads(opt.sendlist); + } + else if (strcmp("test_net_pub_threads", opt.fun) == 0) { + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub_threads(opt.publist); + } + + else { + usage(argv[0]); + exit(1); + + } + + printf("==========end========\n"); + shm_mm_wrapper_destroy(); + +} + + + +void usage(char *name) +{ + fprintf(stderr, "Usage: %s [OPTIONS] [ARG...]\n\n", name); + fprintf(stderr, "Test net mod socket\n\n"); + fprintf(stderr, "Options:\n\n"); + #define fpe(str) fprintf(stderr, " %s", str); + fpe("-f, --funciton Function name\n"); + fpe("-p, --port TCP/IP Port\n"); + fpe("-k, --key SHM Key\n"); + fpe("--sendlist format锛�--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n"); + fpe("--publist format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n"); + fpe("\n"); +} + + + +argument_t parse_args (int argc, char *argv[]) +{ + int c; + + if(argc < 2) { + usage(argv[0]); + exit(1); + } + + if(argc == 2 && strcmp(argv[1], "--help") == 0) { + usage(argv[0]); + exit(0); + } + + argument_t mopt = {}; + + // mopt.volume_list_size = 0; + + opterr = 0; + + + static struct option long_options[] = + { + /* These options set a flag. */ + + {"fun", required_argument, 0, 'f'}, + {"key", required_argument, 0, 'k'}, + {"port", required_argument, 0, 'p'}, + {"sendlist", required_argument, (int *)mopt.sendlist, 0}, + {"publist", required_argument, (int *)mopt.publist, 0}, + {0, 0, 0, 0} + }; + /* getopt_long stores the option index here. */ + int option_index = 0; + while (1) + { + + + c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index); + + /* Detect the end of the options. */ + if (c == -1) + break; + + switch (c) + { + case 0: + /* If this option set a flag, do nothing else now. */ + if (long_options[option_index].flag != 0) + break; + + if(strcmp(long_options[option_index].name, "sendlist") == 0) { + mopt.sendlist = optarg; + } + else if(strcmp(long_options[option_index].name, "publist") == 0) { + mopt.publist = optarg; + } + else { + printf ("option %s", long_options[option_index].name); + if (optarg) + printf (" with arg %s", optarg); + printf ("\n"); + } + + break; + + case 'f': + mopt.fun = optarg; + break; + + case 'k': + mopt.key = atoi(optarg); + break; + + case 'p': + // printf ("==name with value `%s'\n", optarg); + mopt.port = atoi(optarg); + break; + + case '?': + printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind); + /* getopt_long already printed an error message. */ + usage(argv[0]); + exit(1); + break; + + default: + //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]); + break; + } + } + + // printf ("optind = %d, argc=%d \n", optind, argc); + /* Print any remaining command line arguments (not options). */ + if (optind < argc) + { + mopt.cmd_arr = &argv[optind]; + mopt.cmd_arr_len = argc - optind; + // printf ("non-option ARGV-elements: "); + // while (optind < argc) + // printf ("%d, %d, %s \n", optind, argc, argv[optind++]); + // putchar ('\n'); + } + return mopt; + +} + + +/** + * @str "192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11" + * @node_arr_addr 杩斿洖澶勭悊鍚庣殑缃戠粶鑺傜偣鏁扮粍 + * { + * {"192.168.5.22", 5000, 11}, + * {"192.168.20.10", 5000, 11}, + * {"192.168.20.104", 5000, 11} + * } + * @return 鏁扮粍鐨勯暱搴� + */ +int parse_node_list(char *str, net_node_t *node_arr_addr[]) { + int i, j; + char **property_arr; + int property_arr_len; + char **entry_arr; + int entry_arr_len = str_split(str, ",", &entry_arr); + + net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); + for(i = 0; i < entry_arr_len; i++) { + property_arr_len = str_split(entry_arr[i], ":", &property_arr); + // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); + node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; + + free(property_arr[1]); + if(property_arr_len == 3) { + node_arr[i].key = atoi(property_arr[2]); + free(property_arr[2]); + } + free(entry_arr[i]); + + } + *node_arr_addr = node_arr; + + + return entry_arr_len; +} + +void print_node_list(net_node_t *node_arr, int len) { + printf("============node list begin==========\n"); + for(int i = 0; i < len; i++) { + printf("%s,%d,%d,\n", node_arr[i].host, node_arr[i].port, node_arr[i].key); + } + printf("============node list end==========\n"); +} -- Gitblit v1.8.0