From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@aiot.com> Date: 星期三, 11 八月 2021 09:50:20 +0800 Subject: [PATCH] fix from 3.1 first commit --- shm_util/shm_util.cpp | 953 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 953 insertions(+), 0 deletions(-) diff --git a/shm_util/shm_util.cpp b/shm_util/shm_util.cpp new file mode 100644 index 0000000..4eb03f8 --- /dev/null +++ b/shm_util/shm_util.cpp @@ -0,0 +1,953 @@ +#include <assert.h> +#include "net_mod_server_socket_wrapper.h" +#include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + +#include "shm_mm_wrapper.h" +#include "usg_common.h" +#include <getopt.h> +#include "logger_factory.h" + +#define SCALE 100000 + +static Logger *logger = LoggerFactory::getLogger(); + +typedef struct Targ { + net_node_t *node; + char *nodelist; + long id; + +}Targ; + +struct argument_t { + bool interactive; + bool force; + int bind; + int port; + int key; + char *sendlist; + char *publist; + char **cmd_arr; + int cmd_arr_len; +}; + +argument_t parse_args (int argc, char *argv[]); +void usage(char *name); +int parse_node_list(const char *str, net_node_t *node_arr_addr[]) ; +void print_node_list(net_node_t *node_arr, int len); + + + +void * client; + +void *proxy_server_handler(void *sockt) { + pthread_detach(pthread_self()); + + char action[512]; + while ( true ) { + printf("Input action: Close?\n"); + if(scanf("%s",action) < 1) { + printf("Invalide action\n"); + continue; + } + + if(strcmp(action, "close") == 0) { + net_mod_server_socket_close(sockt); + shm_mm_wrapper_destroy(); + break; + } else { + printf("Invalide action\n"); + } + } +} + +void start_net_proxy(argument_t &arg) { + pthread_t tid; + printf("Start net proxy\n"); + void *serverSocket = net_mod_server_socket_open(arg.port); + + // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴server + if(arg.interactive) { + pthread_create(&tid, NULL, proxy_server_handler, serverSocket); + } + + if(net_mod_server_socket_start(serverSocket) != 0) { + err_exit(errno, "net_mod_server_socket_start"); + } +} + +void start_resycle() { + shm_mm_wrapper_start_resycle(); +} + + +// 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅 +void *print_sub_msg(void *sockt) { + pthread_detach(pthread_self()); + void *recvbuf; + int size; + int key; + int rv; + while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { + printf("鏀跺埌璁㈤槄娑堟伅:%s\n", (char *)recvbuf); + free(recvbuf); + } + + printf("print_sub_msg return . rv = %d\n", rv); + +} + + +void * bus_server; + +static void stop_bus_handler(int sig) { + bus_server_socket_wrapper_stop(bus_server); +} + + +void start_bus_server(argument_t &arg) { + printf("Start bus server\n"); + bus_server = bus_server_socket_wrapper_open(); + + signal(SIGINT, stop_bus_handler); + signal(SIGTERM, stop_bus_handler); + + if(bus_server_socket_wrapper_start_bus(bus_server) != 0) { + printf("start bus failed\n"); + exit(1); + } + + bus_server_socket_wrapper_close(bus_server); +} + +void *serverSockt; + + +static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { + char sendbuf[512]; + printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); + sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(serverSockt), (char *)recvbuf); + // buf 鍜� size鏄繑鍥炲�� + *sendbuf_ptr = sendbuf; + *sendsize_ptr = strlen(sendbuf) + 1; + //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺� + free(recvbuf); + return; +} + +bool stop = false; + +static void stop_replyserver_handler(int sig) { + printf("stop_handler\n"); + + int rv = net_mod_socket_stop(serverSockt); + if(rv ==0) { + logger->debug("send stop suc"); + return; + } else { + logger->debug("send stop fail.%s\n", bus_strerror(rv)); + } +} + +void start_recvfrom(int mkey, bool force) { + logger->debug("start reply\n"); + signal(SIGINT, stop_replyserver_handler); + signal(SIGTERM, stop_replyserver_handler); + + serverSockt = net_mod_socket_open(); + if(force) { + net_mod_socket_force_bind(serverSockt, mkey); + } else { + net_mod_socket_bind(serverSockt, mkey); + } + + + int rv = 0 ; + while( true) { + rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); + if (rv == 0) + continue; + if(rv == EBUS_STOPED) { + logger->debug("Stopping\n"); + break; + } else if(rv == EBUS_KEY_INUSED){ + printf("key宸茬粡琚崰鐢╘n"); + exit(1); + } + logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); + + } + + // rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); + net_mod_socket_close(serverSockt); + logger->debug("stopted\n"); + + // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { + // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); + // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); + // net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, key); + // free(recvbuf); + // } +} + +// 浜や簰寮忓鎴风 +void start_net_client(char *sendlist, char*publist ){ + client = net_mod_socket_open(); + char content[MAXLINE]; + char action[512]; + char topic[512]; + int buskey; + + int recv_arr_size, i, n; + net_mod_recv_msg_t *recv_arr; + + pthread_t tid; + // 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭� + pthread_create(&tid, NULL, print_sub_msg, client); + + //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11 + net_node_t *node_arr; + int node_arr_size = parse_node_list(sendlist, &node_arr); + print_node_list(node_arr, node_arr_size); + + //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8 + net_node_t *pub_node_arr; + int pub_node_arr_size = parse_node_list(publist, &pub_node_arr); + print_node_list(pub_node_arr, pub_node_arr_size); + + while (true) { + //printf("Usage: pub <topic> [content] or sub <topic>\n"); + printf("Can I help you? pub,sub,desub,send or quit\n"); + scanf("%s",action); + + if(strcmp(action, "pub") == 0) { + printf("Please input topic and content\n"); + scanf("%s %s", topic, content); + + n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1); + printf("pub %d nodes\n", n); + } + else if(strcmp(action, "send") == 0) { + getc(stdin); + printf("Please input content\n"); + + if (fgets(content, MAXLINE, stdin) != NULL) { + // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰� + // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1); + printf(" %d nodes reply\n", n); + for(i=0; i<recv_arr_size; i++) { + printf("reply from (host:%s, port: %d, key:%d) >> %s\n", + recv_arr[i].host, + recv_arr[i].port, + recv_arr[i].key, + (char *)recv_arr[i].content + ); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } + } + else if(strcmp(action, "desub") == 0) { + printf("Please input topic!\n"); + + scanf("%s", topic); + if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) { + printf("%d Desub success!\n", net_mod_socket_get_key(client)); + } else { + printf("Desub failture!\n"); + exit(0); + } + + } + else if(strcmp(action, "sub") == 0) { + printf("Please input topic!\n"); + scanf("%s",topic); + + if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) { + printf("%d Sub success!\n", net_mod_socket_get_key(client)); + } else { + printf("Sub failture!\n"); + exit(0); + } + + } + else if(strcmp(action, "quit") == 0) { + break; + } else { + printf("error input argument\n"); + continue; + } + + } + net_mod_socket_close(client); + + +} + +void *_run_one_sendto_many_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[128]; + + int j, n; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + int total = 0; + + int rkey, lkey; + unsigned int l = 0 , rl; + const char *hello_format = "%d say Hello %d"; + const char *reply_format = "%d RECEIVED %d say Hello %d"; + + char filename[128]; + sprintf(filename, "test%d.tmp", targ->node->key); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + // fp = stdout; + + int recvsize; + void *recvbuf; + for (l = 0; l < SCALE; l++) { + sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); + // fprintf(fp, "requst:%s\n", sendbuf); + // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); + printf("%d: send %d nodes\n", l, n); + for(j=0; j < recv_arr_size; j++) { + + fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", + net_mod_socket_get_key(client), + sendbuf, + targ->node->key, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); + + printf("key == %d\n", net_mod_socket_get_key(client)); + assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); + assert(targ->node->key == rkey); + assert(net_mod_socket_get_key(client) == lkey); + assert(rl == l); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + total += n; + } + if(fp != NULL) + fclose(fp); + // net_mod_socket_close(client); + return (void *)total; +} + +//澶氱嚎绋媠end +void one_sendto_many(char *nodelist) { + + int status, i = 0; + + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + + char sendbuf[512]; + struct timeval start, end; + long total = 0; + + client = net_mod_socket_open(); + net_mod_socket_bind(client, shm_mm_wrapper_alloc_key()); + + net_node_t *node_arr; + int node_arr_size = parse_node_list(nodelist, &node_arr); + Targ targs[node_arr_size]; + pthread_t tids[node_arr_size]; + void *res[node_arr_size]; + + printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < node_arr_size; i++) { + targs[i].node = node_arr + i; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_one_sendto_many_, (void *)&targs[i]); + } + + for (i = 0; i < node_arr_size; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + +} + +// 鏃犻檺寰幆send +void test_net_sendandrecv(char *nodelist) { + + int n, j; + void * client; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + net_node_t *node_arr; + int node_arr_size = parse_node_list(nodelist, &node_arr); + char buf[128]; + pid_t pid, retPid ; + unsigned int l , retl; + int remoteKey; + const char *hello_format = "%d say Hello %u "; + const char *reply_format = "%d RECEIVED %d say Hello %d"; + + pid = getpid(); + l = 0; + + client = net_mod_socket_open(); + while(true) { + sprintf(buf, hello_format, pid, l); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, + &recv_arr, &recv_arr_size, 1000); + printf(" %d nodes reply\n", n); + for(j = 0; j < recv_arr_size; j++) { + + printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", + (long)pid, + buf, + (char *)recv_arr[j].content, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key + + ); + + + + assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); + assert(retPid == pid); + assert(retl == l); + assert(remoteKey == recv_arr[j].key); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + l++; + } + + net_mod_socket_close(client); + +} + +void *_run_pub_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[128]; + + int i,j, n; + int total = 0; + + net_node_t *node_arr; + int node_arr_size = parse_node_list(targ->nodelist, &node_arr); + + const char *topic = "news"; + // char filename[512]; + // sprintf(filename, "test%d.tmp", targ->id); + // FILE *fp = NULL; + // fp = fopen(filename, "w+"); + // fp = stdout; + + + for (i = 0; i < SCALE; i++) { + sprintf(sendbuf, "thread(%ld) %d", targ->id, i); + + n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); + // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + LoggerFactory::getLogger()->debug( "pub:%s to %d nodes\n", sendbuf, n); + total += n; + } + // fclose(fp); + + return (void *)total; +} + +//澶氱嚎绋媝ub +void test_net_pub_threads(char *nodelist) { + + int status, i = 0, processors = 4; + void *res[processors]; + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + Targ targs[processors]; + pthread_t tids[processors]; + char sendbuf[512]; + struct timeval start, end; + long total = 0; + client = net_mod_socket_open(); + +printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].nodelist = nodelist; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_pub_, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + net_mod_socket_close(client); +} + +// 鏃犻檺寰幆pub +void test_net_pub(char *nodelist) { + + int n; + char sendbuf[512]; + net_node_t *node_arr; + int node_arr_size = parse_node_list(nodelist, &node_arr); + + char *topic = "news"; + sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid()); + + void * client = net_mod_socket_open(); + while (true) { + n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); + // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n); + } + net_mod_socket_close(client); +} + +void list () { + LockFreeQueue<shm_packet_t> * mqueue; + hashtable_t *hashtable = mm_get_hashtable(); + printf("%10s \t %-10s \t %10s\n", "KEY", "LENGTH", "STATUS"); + hashtable_foreach(hashtable, [&](int key, void * value){ + if(key >= 100 ) { + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key); + if((long)mqueue == 0x1) { + printf("%10d \t %-10s\n", key, "Not In Used"); + } else { + printf("%10d \t %-10d\n", key, mqueue->size()); + } + + } else { + printf("%10d\n", key); + } + + }); +} + +void info(int key) { + LockFreeQueue<shm_packet_t> * mqueue; + hashtable_t *hashtable = mm_get_hashtable(); + mqueue = (LockFreeQueue<shm_packet_t> *) hashtable_get(hashtable, key); + printf("%10s: %-10p\n", "PTR", mqueue); + printf("%10s: %-10d\n", "KEY", key); + printf("%10s: %-10d\n", "LENGTH", mqueue->size()); + + +} + + +void remove(int key) { + hashtable_t *hashtable = mm_get_hashtable(); + + LockFreeQueue<shm_packet_t> * mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key); + if(mqueue != NULL) { + delete mqueue; + hashtable_remove(hashtable, key); + } +} + +void do_sendandrecv(int key, char *sendbuf) { + int n, j; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + + net_node_t node_arr[] = {NULL, 0, key}; + + void * client = net_mod_socket_open(); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000); + if(n == 0) { + printf("send failed\n"); + return; + } + printf(" %d nodes reply\n", n); + for(j=0; j < recv_arr_size; j++) { + + fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n", + net_mod_socket_get_key(client), + sendbuf, + key, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); + } + + net_mod_socket_close(client); +} + + + +void usage(char *name) +{ + #define fpe(str) fprintf(stderr, " %s", str); + + fprintf(stderr, "Usage: %s {function} [OPTIONS] [ARG...]\n\n", name); + fprintf(stderr, "Test shmsocket\n\n"); + + fprintf(stderr, "Options:\n\n"); + fpe("-p, --port TCP/IP Port\n"); + fpe("-k, --key SHM Key\n"); + fpe("--sendlist format锛�--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n"); + fpe("--publist format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n"); + fpe("\n"); + + fprintf(stderr, "Examples:\n\n"); + fpe("# sendandrecv to socket which has key 100\n"); + fpe("./shm_util sendandrecv 100 \"hello\"\n"); + fpe("# list all key\n"); + fpe("./shm_util list\n"); + fpe("# remove key 1001\n"); + fpe("./shm_util rm 1001\n"); + fpe("./shm_util info 1002\n"); + fpe("./shm_util recvfrom --bind 1002 [--force]\n") + fpe("\n"); +} + + + +argument_t parse_args (int argc, char *argv[]) +{ + int c; + + if(argc < 2) { + usage(argv[0]); + exit(1); + } + + + + argument_t mopt = {}; + + // mopt.volume_list_size = 0; + mopt.interactive = false; + + opterr = 0; + + + static struct option long_options[] = + { + /* These options set a flag. */ + + {"key", required_argument, 0, 'k'}, + {"port", required_argument, 0, 'p'}, + {"interactive", no_argument, 0, 'i'}, + {"force", no_argument, 0, 'f'}, + {"bind", required_argument, (int *)mopt.bind, 0}, + {"sendlist", required_argument, (int *)mopt.sendlist, 0}, + {"publist", required_argument, (int *)mopt.publist, 0}, + {0, 0, 0, 0} + }; + /* getopt_long stores the option index here. */ + int option_index = 0; + while (1) + { + + + c = getopt_long (argc, argv, "+fk:p:i", long_options, &option_index); + + /* Detect the end of the options. */ + if (c == -1) + break; + + switch (c) + { + case 0: + /* If this option set a flag, do nothing else now. */ + if (long_options[option_index].flag != 0) + break; + + if(strcmp(long_options[option_index].name, "sendlist") == 0) { + mopt.sendlist = optarg; + } + else if(strcmp(long_options[option_index].name, "publist") == 0) { + mopt.publist = optarg; + } + else if(strcmp(long_options[option_index].name, "bind") == 0) { + mopt.bind = atoi(optarg); + } + else { + printf ("option %s", long_options[option_index].name); + if (optarg) + printf (" with arg %s", optarg); + printf ("\n"); + } + + break; + + case 'k': + mopt.key = atoi(optarg); + break; + + case 'i': + mopt.interactive = true; + break; + + case 'f': + mopt.force = true; + break; + + case 'p': + // printf ("==name with value `%s'\n", optarg); + mopt.port = atoi(optarg); + break; + + case '?': + printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind); + /* getopt_long already printed an error message. */ + usage(argv[0]); + exit(1); + break; + + default: + //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]); + break; + } + } + + // printf ("optind = %d, argc=%d \n", optind, argc); + /* Print any remaining command line arguments (not options). */ + if (optind < argc) + { + mopt.cmd_arr = &argv[optind]; + mopt.cmd_arr_len = argc - optind; + // printf ("non-option ARGV-elements: "); + // while (optind < argc) + // printf ("%d, %d, %s \n", optind, argc, argv[optind++]); + // putchar ('\n'); + } + return mopt; + +} + + +/** + * @str "192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11" + * @node_arr_addr 杩斿洖澶勭悊鍚庣殑缃戠粶鑺傜偣鏁扮粍 + * { + * {"192.168.5.22", 5000, 11}, + * {"192.168.20.10", 5000, 11}, + * {"192.168.20.104", 5000, 11} + * } + * @return 鏁扮粍鐨勯暱搴� + */ +int parse_node_list(const char *str, net_node_t *node_arr_addr[]) { + int i, j; + char **property_arr; + int property_arr_len; + char **entry_arr; + int entry_arr_len = str_split(str, ",", &entry_arr); + + net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); + for(i = 0; i < entry_arr_len; i++) { + property_arr_len = str_split(entry_arr[i], ":", &property_arr); + printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); + + node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; + + free(property_arr[1]); + if(property_arr_len == 3) { + node_arr[i].key = atoi(property_arr[2]); + free(property_arr[2]); + } + free(entry_arr[i]); + + } + *node_arr_addr = node_arr; + + + return entry_arr_len; +} + +void print_node_list(net_node_t *node_arr, int len) { + printf("============node list begin==========\n"); + for(int i = 0; i < len; i++) { + printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key); + } + printf("============node list end==========\n"); +} + + + + +int main(int argc, char *argv[]) { + int i; + char *prog; + char * fun; + argument_t opt = {}; + + shm_mm_wrapper_init(512); + + if(argc < 2) { + usage(argv[0]); + exit(1); + } + prog = argv[0]; + fun = argv[1]; + argc--; + argv++; + + + if (strcmp("help", fun) == 0 ) { + usage(prog); + } + else if (strcmp("list", fun) == 0 ) { + list(); + } + else if (strcmp("info", fun) == 0 ) { + if(argc < 2) { + + usage(prog); + + } else { + for(i = 1; i < argc; i++) { + int key = atoi(argv[i]); + info(key); + } + } + } + else if (strcmp("rm", fun) == 0 ) { + if(argc < 2) { + usage(prog); + + } else { + for(i = 1; i < argc; i++) { + int key = atoi(argv[i]); + remove(key); + } + } + + } + else if (strcmp("sendandrecv", fun) == 0 ) { + if(argc < 3) { + usage(prog); + exit(1); + } + int key = atoi(argv[1]); + char *content = argv[2]; + do_sendandrecv(key, content); + } + else if (strcmp("start_bus_server", fun) == 0) { + + start_bus_server(opt); + } + else if (strcmp("start_resycle", fun) == 0) { + + start_resycle(); + } + + else if (strcmp("start_net_proxy", fun) == 0 ) { + opt = parse_args(argc, argv); + if(opt.port == 0) { + usage(prog); + exit(1); + } + start_net_proxy(opt); + + } + + else if (strcmp("recvfrom", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.bind == 0) { + usage(argv[0]); + } else { + start_recvfrom(opt.bind, opt.force); + } + + } + else if (strcmp("start_net_client", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + if(opt.publist == 0) { + fprintf(stderr, "Missing publist.\n"); + usage(argv[0]); + exit(1); + } + start_net_client(opt.sendlist, opt.publist); + } + else if (strcmp("one_sendto_many", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + one_sendto_many(opt.sendlist); + } + else if (strcmp("test_net_sendandrecv", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_sendandrecv(opt.sendlist); + } + else if (strcmp("test_net_pub_threads", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub_threads(opt.publist); + } + else if (strcmp("test_net_pub", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub(opt.publist); + } + + else { + printf("%Invalid funciton name\n"); + usage(argv[0]); + exit(1); + + } + + shm_mm_wrapper_destroy(); + +} -- Gitblit v1.8.0