#include #include "net_mod_server_socket_wrapper.h" #include "net_mod_socket_wrapper.h" #include "bus_server_socket_wrapper.h" #include "shm_mm_wrapper.h" #include "usg_common.h" #include #include "logger_factory.h" #define SCALE 100000 static Logger *logger = LoggerFactory::getLogger(); typedef struct Targ { net_node_t *node; char *nodelist; long id; }Targ; struct argument_t { bool interactive; int port; int key; char *sendlist; char *publist; char **cmd_arr; int cmd_arr_len; }; argument_t parse_args (int argc, char *argv[]); void usage(char *name); int parse_node_list(const char *str, net_node_t *node_arr_addr[]) ; void print_node_list(net_node_t *node_arr, int len); void * client; void *proxy_server_handler(void *sockt) { pthread_detach(pthread_self()); char action[512]; while ( true) { printf("Input action: Close?\n"); if(scanf("%s",action) < 1) { printf("Invalide action\n"); continue; } if(strcmp(action, "close") == 0) { net_mod_server_socket_close(sockt); shm_mm_wrapper_destroy(); break; } else { printf("Invalide action\n"); } } } void start_net_proxy(argument_t &arg) { pthread_t tid; printf("Start net proxy\n"); void *serverSocket = net_mod_server_socket_open(arg.port); // 创建一个线程,可以关闭server if(arg.interactive) { pthread_create(&tid, NULL, proxy_server_handler, serverSocket); } if(net_mod_server_socket_start(serverSocket) != 0) { err_exit(errno, "net_mod_server_socket_start"); } } void start_resycle() { shm_mm_wrapper_start_resycle(); } // 打印接受到的订阅消息 void *print_sub_msg(void *sockt) { pthread_detach(pthread_self()); void *recvbuf; int size; int key; int rv; while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { printf("收到订阅消息:%s\n", (char *)recvbuf); free(recvbuf); } printf("print_sub_msg return . rv = %d\n", rv); } void * bus_server; static void stop_bus_handler(int sig) { bus_server_socket_wrapper_stop(bus_server); } void start_bus_server(argument_t &arg) { printf("Start bus server\n"); bus_server = bus_server_socket_wrapper_open(); signal(SIGINT, stop_bus_handler); signal(SIGTERM, stop_bus_handler); if(bus_server_socket_wrapper_start_bus(bus_server) != 0) { printf("start bus failed\n"); exit(1); } bus_server_socket_wrapper_close(bus_server); } void *serverSockt; static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { char sendbuf[512]; printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(serverSockt), (char *)recvbuf); // buf 和 size是返回值 *sendbuf_ptr = sendbuf; *sendsize_ptr = strlen(sendbuf) + 1; //recvbuf是分配到堆里的,使用完后不要忘记释放掉 free(recvbuf); return; } bool stop = false; static void stop_replyserver_handler(int sig) { printf("stop_handler\n"); int rv = net_mod_socket_stop(serverSockt); if(rv ==0) { logger->debug("send stop suc"); return; } else { logger->debug("send stop fail.%s\n", bus_strerror(rv)); } } void start_reply(int mkey) { logger->debug("start reply\n"); signal(SIGINT, stop_replyserver_handler); signal(SIGTERM, stop_replyserver_handler); serverSockt = net_mod_socket_open(); net_mod_socket_bind(serverSockt, mkey); int rv = 0 ; while( true) { rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); if (rv == 0) continue; if(rv == EBUS_STOPED) { logger->debug("Stopping\n"); break; } logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); } // rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); net_mod_socket_close(serverSockt); logger->debug("stopted\n"); // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); // net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, key); // free(recvbuf); // } } // 交互式客户端 void start_net_client(char *sendlist, char*publist ){ client = net_mod_socket_open(); char content[MAXLINE]; char action[512]; char topic[512]; int buskey; int recv_arr_size, i, n; net_mod_recv_msg_t *recv_arr; pthread_t tid; // 创建一个线程接受订阅消息 pthread_create(&tid, NULL, print_sub_msg, client); //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11 net_node_t *node_arr; int node_arr_size = parse_node_list(sendlist, &node_arr); print_node_list(node_arr, node_arr_size); //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8 net_node_t *pub_node_arr; int pub_node_arr_size = parse_node_list(publist, &pub_node_arr); print_node_list(pub_node_arr, pub_node_arr_size); while (true) { //printf("Usage: pub [content] or sub \n"); printf("Can I help you? pub,sub,desub,send or quit\n"); scanf("%s",action); if(strcmp(action, "pub") == 0) { printf("Please input topic and content\n"); scanf("%s %s", topic, content); n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1); printf("pub %d nodes\n", n); } else if(strcmp(action, "send") == 0) { getc(stdin); printf("Please input content\n"); if (fgets(content, MAXLINE, stdin) != NULL) { // 收到消息的节点即使没有对应的信息, 也要回复一个表示无的消息,否则会一直等待 // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1); printf(" %d nodes reply\n", n); for(i=0; i> %s\n", recv_arr[i].host, recv_arr[i].port, recv_arr[i].key, (char *)recv_arr[i].content ); } // 使用完后,不要忘记释放掉 net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); } } else if(strcmp(action, "desub") == 0) { printf("Please input topic!\n"); scanf("%s", topic); if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) { printf("%d Desub success!\n", net_mod_socket_get_key(client)); } else { printf("Desub failture!\n"); exit(0); } } else if(strcmp(action, "sub") == 0) { printf("Please input topic!\n"); scanf("%s",topic); if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) { printf("%d Sub success!\n", net_mod_socket_get_key(client)); } else { printf("Sub failture!\n"); exit(0); } } else if(strcmp(action, "quit") == 0) { break; } else { printf("error input argument\n"); continue; } } net_mod_socket_close(client); } void *_run_one_sendto_many_(void *arg) { Targ *targ = (Targ *)arg; char sendbuf[128]; int j, n; int recv_arr_size; net_mod_recv_msg_t *recv_arr; int total = 0; int rkey, lkey; unsigned int l = 0 , rl; const char *hello_format = "%d say Hello %d"; const char *reply_format = "%d RECEIVED %d say Hello %d"; char filename[128]; sprintf(filename, "test%d.tmp", targ->node->key); FILE *fp = NULL; fp = fopen(filename, "w+"); // fp = stdout; int recvsize; void *recvbuf; for (l = 0; l < SCALE; l++) { sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); // fprintf(fp, "requst:%s\n", sendbuf); // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); printf("%d: send %d nodes\n", l, n); for(j=0; j < recv_arr_size; j++) { fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", net_mod_socket_get_key(client), sendbuf, targ->node->key, recv_arr[j].host, recv_arr[j].port, recv_arr[j].key, (char *)recv_arr[j].content ); printf("key == %d\n", net_mod_socket_get_key(client)); assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); assert(targ->node->key == rkey); assert(net_mod_socket_get_key(client) == lkey); assert(rl == l); } // 使用完后,不要忘记释放掉 net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); total += n; } if(fp != NULL) fclose(fp); // net_mod_socket_close(client); return (void *)total; } //多线程send void one_sendto_many(char *nodelist) { int status, i = 0; // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); char sendbuf[512]; struct timeval start, end; long total = 0; client = net_mod_socket_open(); net_mod_socket_bind(client, shm_mm_wrapper_alloc_key()); net_node_t *node_arr; int node_arr_size = parse_node_list(nodelist, &node_arr); Targ targs[node_arr_size]; pthread_t tids[node_arr_size]; void *res[node_arr_size]; printf("开始测试...\n"); gettimeofday(&start, NULL); for (i = 0; i < node_arr_size; i++) { targs[i].node = node_arr + i; targs[i].id = i; pthread_create(&tids[i], NULL, _run_one_sendto_many_, (void *)&targs[i]); } for (i = 0; i < node_arr_size; i++) { if (pthread_join(tids[i], &res[i]) != 0) { perror("multyThreadClient pthread_join"); } else { total += (long)res[i]; //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } gettimeofday(&end, NULL); double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; fprintf(stderr,"发送数目:%ld, 成功数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); // fflush(stdout); } // 无限循环send void test_net_sendandrecv(char *nodelist) { int n, j; void * client; int recv_arr_size; net_mod_recv_msg_t *recv_arr; net_node_t *node_arr; int node_arr_size = parse_node_list(nodelist, &node_arr); char buf[128]; pid_t pid, retPid ; unsigned int l , retl; int remoteKey; const char *hello_format = "%d say Hello %u "; const char *reply_format = "%d RECEIVED %d say Hello %d"; pid = getpid(); l = 0; client = net_mod_socket_open(); while(true) { sprintf(buf, hello_format, pid, l); n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, &recv_arr, &recv_arr_size, 1000); printf(" %d nodes reply\n", n); for(j = 0; j < recv_arr_size; j++) { printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", (long)pid, buf, (char *)recv_arr[j].content, recv_arr[j].host, recv_arr[j].port, recv_arr[j].key ); assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); assert(retPid == pid); assert(retl == l); assert(remoteKey == recv_arr[j].key); } // 使用完后,不要忘记释放掉 net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); l++; } net_mod_socket_close(client); } void *_run_pub_(void *arg) { Targ *targ = (Targ *)arg; char sendbuf[128]; int i,j, n; int total = 0; net_node_t *node_arr; int node_arr_size = parse_node_list(targ->nodelist, &node_arr); const char *topic = "news"; // char filename[512]; // sprintf(filename, "test%d.tmp", targ->id); // FILE *fp = NULL; // fp = fopen(filename, "w+"); // fp = stdout; for (i = 0; i < SCALE; i++) { sprintf(sendbuf, "thread(%ld) %d", targ->id, i); n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); LoggerFactory::getLogger()->debug( "pub:%s to %d nodes\n", sendbuf, n); total += n; } // fclose(fp); return (void *)total; } //多线程pub void test_net_pub_threads(char *nodelist) { int status, i = 0, processors = 4; void *res[processors]; // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); Targ targs[processors]; pthread_t tids[processors]; char sendbuf[512]; struct timeval start, end; long total = 0; client = net_mod_socket_open(); printf("开始测试...\n"); gettimeofday(&start, NULL); for (i = 0; i < processors; i++) { targs[i].nodelist = nodelist; targs[i].id = i; pthread_create(&tids[i], NULL, _run_pub_, (void *)&targs[i]); } for (i = 0; i < processors; i++) { if (pthread_join(tids[i], &res[i]) != 0) { perror("multyThreadClient pthread_join"); } else { total += (long)res[i]; //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } gettimeofday(&end, NULL); double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffusec = difftime - diffsec*1000000; fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total ); // fflush(stdout); net_mod_socket_close(client); } // 无限循环pub void test_net_pub(char *nodelist) { int n; char sendbuf[512]; net_node_t *node_arr; int node_arr_size = parse_node_list(nodelist, &node_arr); char *topic = "news"; sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid()); void * client = net_mod_socket_open(); while (true) { n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n); } net_mod_socket_close(client); } void list () { hashtable_t *hashtable = mm_get_hashtable(); hashtable_foreach(hashtable, [&](int key, void * value){ printf("%d\n", key); }); } void remove(int key) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue * mqueue = (LockFreeQueue *)hashtable_get(hashtable, key); if(mqueue != NULL) { delete mqueue; hashtable_remove(hashtable, key); } } void do_sendandrecv(int key, char *sendbuf) { int n, j; int recv_arr_size; net_mod_recv_msg_t *recv_arr; net_node_t node_arr[] = {NULL, 0, key}; void * client = net_mod_socket_open(); n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5); if(n == 0) { printf("send failed\n"); return; } printf(" %d nodes reply\n", n); for(j=0; j < recv_arr_size; j++) { fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n", net_mod_socket_get_key(client), sendbuf, key, recv_arr[j].host, recv_arr[j].port, recv_arr[j].key, (char *)recv_arr[j].content ); } net_mod_socket_close(client); } void usage(char *name) { #define fpe(str) fprintf(stderr, " %s", str); fprintf(stderr, "Usage: %s {function} [OPTIONS] [ARG...]\n\n", name); fprintf(stderr, "Test shmsocket\n\n"); fprintf(stderr, "Options:\n\n"); fpe("-p, --port TCP/IP Port\n"); fpe("-k, --key SHM Key\n"); fpe("--sendlist format:--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n"); fpe("--publist format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n"); fpe("\n"); fprintf(stderr, "Examples:\n\n"); fpe("# sendandrecv to socket which has key 100\n"); fpe("./shm_util sendandrecv 100 \"hello\"\n"); fpe("# list all key\n"); fpe("./shm_util list\n"); fpe("# remove key 1001\n"); fpe("./shm_util rm 1001\n"); fpe("\n"); } argument_t parse_args (int argc, char *argv[]) { int c; if(argc < 2) { usage(argv[0]); exit(1); } argument_t mopt = {}; // mopt.volume_list_size = 0; mopt.interactive = false; opterr = 0; static struct option long_options[] = { /* These options set a flag. */ {"key", required_argument, 0, 'k'}, {"port", required_argument, 0, 'p'}, {"interactive", no_argument, 0, 'i'}, {"sendlist", required_argument, (int *)mopt.sendlist, 0}, {"publist", required_argument, (int *)mopt.publist, 0}, {0, 0, 0, 0} }; /* getopt_long stores the option index here. */ int option_index = 0; while (1) { c = getopt_long (argc, argv, "+f:k:p:i", long_options, &option_index); /* Detect the end of the options. */ if (c == -1) break; switch (c) { case 0: /* If this option set a flag, do nothing else now. */ if (long_options[option_index].flag != 0) break; if(strcmp(long_options[option_index].name, "sendlist") == 0) { mopt.sendlist = optarg; } else if(strcmp(long_options[option_index].name, "publist") == 0) { mopt.publist = optarg; } else { printf ("option %s", long_options[option_index].name); if (optarg) printf (" with arg %s", optarg); printf ("\n"); } break; case 'k': mopt.key = atoi(optarg); break; case 'i': mopt.interactive = true; break; case 'p': // printf ("==name with value `%s'\n", optarg); mopt.port = atoi(optarg); break; case '?': printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind); /* getopt_long already printed an error message. */ usage(argv[0]); exit(1); break; default: //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]); break; } } // printf ("optind = %d, argc=%d \n", optind, argc); /* Print any remaining command line arguments (not options). */ if (optind < argc) { mopt.cmd_arr = &argv[optind]; mopt.cmd_arr_len = argc - optind; // printf ("non-option ARGV-elements: "); // while (optind < argc) // printf ("%d, %d, %s \n", optind, argc, argv[optind++]); // putchar ('\n'); } return mopt; } /** * @str "192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11" * @node_arr_addr 返回处理后的网络节点数组 * { * {"192.168.5.22", 5000, 11}, * {"192.168.20.10", 5000, 11}, * {"192.168.20.104", 5000, 11} * } * @return 数组的长度 */ int parse_node_list(const char *str, net_node_t *node_arr_addr[]) { int i, j; char **property_arr; int property_arr_len; char **entry_arr; int entry_arr_len = str_split(str, ",", &entry_arr); net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); for(i = 0; i < entry_arr_len; i++) { property_arr_len = str_split(entry_arr[i], ":", &property_arr); printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; free(property_arr[1]); if(property_arr_len == 3) { node_arr[i].key = atoi(property_arr[2]); free(property_arr[2]); } free(entry_arr[i]); } *node_arr_addr = node_arr; return entry_arr_len; } void print_node_list(net_node_t *node_arr, int len) { printf("============node list begin==========\n"); for(int i = 0; i < len; i++) { printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key); } printf("============node list end==========\n"); } int main(int argc, char *argv[]) { int i; char *prog; char * fun; argument_t opt; shm_mm_wrapper_init(512); if(argc < 2) { usage(argv[0]); exit(1); } prog = argv[0]; fun = argv[1]; argc--; argv++; if (strcmp("help", fun) == 0 ) { usage(prog); } else if (strcmp("list", fun) == 0 ) { list(); } else if (strcmp("rm", fun) == 0 ) { if(argc < 2) { usage(prog); exit(1); } for(i = 1; i < argc; i++) { int key = atoi(argv[i]); remove(key); } } else if (strcmp("sendandrecv", fun) == 0 ) { if(argc < 3) { usage(prog); exit(1); } int key = atoi(argv[1]); char *content = argv[2]; do_sendandrecv(key, content); } else if (strcmp("start_bus_server", fun) == 0) { start_bus_server(opt); } else if (strcmp("start_resycle", fun) == 0) { start_resycle(); } else if (strcmp("start_net_proxy", fun) == 0 ) { opt = parse_args(argc, argv); if(opt.port == 0) { usage(prog); exit(1); } start_net_proxy(opt); } else if (strcmp("start_reply", fun) == 0) { opt = parse_args(argc, argv); opt = parse_args(argc, argv); if(opt.key == 0) { usage(argv[0]); exit(1); } start_reply(opt.key); } else if (strcmp("start_net_client", fun) == 0) { opt = parse_args(argc, argv); if(opt.sendlist == 0) { fprintf(stderr, "Missing sendlist .\n"); usage(argv[0]); exit(1); } if(opt.publist == 0) { fprintf(stderr, "Missing publist.\n"); usage(argv[0]); exit(1); } start_net_client(opt.sendlist, opt.publist); } else if (strcmp("one_sendto_many", fun) == 0) { opt = parse_args(argc, argv); if(opt.sendlist == 0) { fprintf(stderr, "Missing sendlist .\n"); usage(argv[0]); exit(1); } one_sendto_many(opt.sendlist); } else if (strcmp("test_net_sendandrecv", fun) == 0) { opt = parse_args(argc, argv); if(opt.sendlist == 0) { fprintf(stderr, "Missing sendlist .\n"); usage(argv[0]); exit(1); } test_net_sendandrecv(opt.sendlist); } else if (strcmp("test_net_pub_threads", fun) == 0) { opt = parse_args(argc, argv); if(opt.publist == 0) { fprintf(stderr, "Missing publist .\n"); usage(argv[0]); exit(1); } test_net_pub_threads(opt.publist); } else if (strcmp("test_net_pub", fun) == 0) { opt = parse_args(argc, argv); if(opt.publist == 0) { fprintf(stderr, "Missing publist .\n"); usage(argv[0]); exit(1); } test_net_pub(opt.publist); } else { printf("%Invalid funciton name\n"); usage(argv[0]); exit(1); } printf("==========end========\n"); // shm_mm_wrapper_destroy(); }