From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@aiot.com> Date: 星期三, 11 八月 2021 09:50:20 +0800 Subject: [PATCH] fix from 3.1 first commit --- doc/product-consume-model.png | 0 src/shm/hashtable.cpp.2 | 0 demo/pub_sub | 0 src/shm/hashtable.h | 4 shm_util/shm_util.cpp | 953 ++++++++++++++++++++++++++++++++++++++++++++++++++ doc/malloc_node.png | 0 src/socket/shm_mod_socket.cpp | 10 test_socket/heart_beat.sh | 0 doc/lock_free_queue_paper/Implementing Lock-Free Queues.pdf | 0 doc/network_req_rep.png | 0 src/shm/hashtable.cpp | 31 + src/shm/mm.cpp | 1 build.sh | 0 src/socket/shm_socket.cpp | 77 ++- test_queue/test_lostdata.sh | 0 /dev/null | 1 src/socket/shm_mod_socket.h | 2 doc/bus_service.png | 0 src/queue/shm_queue.h | 4 src/socket/bus_server_socket.cpp | 26 src/shm/shm_mm.h | 8 demo/dgram_mod_req_rep.sh | 0 doc/malloc_list.png | 0 test_net_socket/net_mod_socket.sh | 0 systype.sh | 0 25 files changed, 1,044 insertions(+), 73 deletions(-) diff --git a/build.sh b/build.sh old mode 100755 new mode 100644 diff --git a/demo/dgram_mod_req_rep.sh b/demo/dgram_mod_req_rep.sh old mode 100755 new mode 100644 diff --git a/demo/pub_sub b/demo/pub_sub old mode 100755 new mode 100644 Binary files differ diff --git a/doc/bus_service.png b/doc/bus_service.png old mode 100755 new mode 100644 Binary files differ diff --git a/doc/lock_free_queue_paper/Implementing Lock-Free Queues.pdf b/doc/lock_free_queue_paper/Implementing Lock-Free Queues.pdf old mode 100755 new mode 100644 Binary files differ diff --git a/doc/malloc_list.png b/doc/malloc_list.png old mode 100755 new mode 100644 Binary files differ diff --git a/doc/malloc_node.png b/doc/malloc_node.png old mode 100755 new mode 100644 Binary files differ diff --git a/doc/network_req_rep.png b/doc/network_req_rep.png old mode 100755 new mode 100644 Binary files differ diff --git a/doc/product-consume-model.png b/doc/product-consume-model.png old mode 100755 new mode 100644 Binary files differ diff --git a/shm_util/shm_util.cpp b/shm_util/shm_util.cpp deleted file mode 120000 index 5878b94..0000000 --- a/shm_util/shm_util.cpp +++ /dev/null @@ -1 +0,0 @@ -../test_net_socket/shm_util.cpp \ No newline at end of file diff --git a/shm_util/shm_util.cpp b/shm_util/shm_util.cpp new file mode 100644 index 0000000..4eb03f8 --- /dev/null +++ b/shm_util/shm_util.cpp @@ -0,0 +1,953 @@ +#include <assert.h> +#include "net_mod_server_socket_wrapper.h" +#include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + +#include "shm_mm_wrapper.h" +#include "usg_common.h" +#include <getopt.h> +#include "logger_factory.h" + +#define SCALE 100000 + +static Logger *logger = LoggerFactory::getLogger(); + +typedef struct Targ { + net_node_t *node; + char *nodelist; + long id; + +}Targ; + +struct argument_t { + bool interactive; + bool force; + int bind; + int port; + int key; + char *sendlist; + char *publist; + char **cmd_arr; + int cmd_arr_len; +}; + +argument_t parse_args (int argc, char *argv[]); +void usage(char *name); +int parse_node_list(const char *str, net_node_t *node_arr_addr[]) ; +void print_node_list(net_node_t *node_arr, int len); + + + +void * client; + +void *proxy_server_handler(void *sockt) { + pthread_detach(pthread_self()); + + char action[512]; + while ( true ) { + printf("Input action: Close?\n"); + if(scanf("%s",action) < 1) { + printf("Invalide action\n"); + continue; + } + + if(strcmp(action, "close") == 0) { + net_mod_server_socket_close(sockt); + shm_mm_wrapper_destroy(); + break; + } else { + printf("Invalide action\n"); + } + } +} + +void start_net_proxy(argument_t &arg) { + pthread_t tid; + printf("Start net proxy\n"); + void *serverSocket = net_mod_server_socket_open(arg.port); + + // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴server + if(arg.interactive) { + pthread_create(&tid, NULL, proxy_server_handler, serverSocket); + } + + if(net_mod_server_socket_start(serverSocket) != 0) { + err_exit(errno, "net_mod_server_socket_start"); + } +} + +void start_resycle() { + shm_mm_wrapper_start_resycle(); +} + + +// 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅 +void *print_sub_msg(void *sockt) { + pthread_detach(pthread_self()); + void *recvbuf; + int size; + int key; + int rv; + while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { + printf("鏀跺埌璁㈤槄娑堟伅:%s\n", (char *)recvbuf); + free(recvbuf); + } + + printf("print_sub_msg return . rv = %d\n", rv); + +} + + +void * bus_server; + +static void stop_bus_handler(int sig) { + bus_server_socket_wrapper_stop(bus_server); +} + + +void start_bus_server(argument_t &arg) { + printf("Start bus server\n"); + bus_server = bus_server_socket_wrapper_open(); + + signal(SIGINT, stop_bus_handler); + signal(SIGTERM, stop_bus_handler); + + if(bus_server_socket_wrapper_start_bus(bus_server) != 0) { + printf("start bus failed\n"); + exit(1); + } + + bus_server_socket_wrapper_close(bus_server); +} + +void *serverSockt; + + +static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { + char sendbuf[512]; + printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); + sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(serverSockt), (char *)recvbuf); + // buf 鍜� size鏄繑鍥炲�� + *sendbuf_ptr = sendbuf; + *sendsize_ptr = strlen(sendbuf) + 1; + //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺� + free(recvbuf); + return; +} + +bool stop = false; + +static void stop_replyserver_handler(int sig) { + printf("stop_handler\n"); + + int rv = net_mod_socket_stop(serverSockt); + if(rv ==0) { + logger->debug("send stop suc"); + return; + } else { + logger->debug("send stop fail.%s\n", bus_strerror(rv)); + } +} + +void start_recvfrom(int mkey, bool force) { + logger->debug("start reply\n"); + signal(SIGINT, stop_replyserver_handler); + signal(SIGTERM, stop_replyserver_handler); + + serverSockt = net_mod_socket_open(); + if(force) { + net_mod_socket_force_bind(serverSockt, mkey); + } else { + net_mod_socket_bind(serverSockt, mkey); + } + + + int rv = 0 ; + while( true) { + rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); + if (rv == 0) + continue; + if(rv == EBUS_STOPED) { + logger->debug("Stopping\n"); + break; + } else if(rv == EBUS_KEY_INUSED){ + printf("key宸茬粡琚崰鐢╘n"); + exit(1); + } + logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); + + } + + // rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); + net_mod_socket_close(serverSockt); + logger->debug("stopted\n"); + + // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { + // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); + // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); + // net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, key); + // free(recvbuf); + // } +} + +// 浜や簰寮忓鎴风 +void start_net_client(char *sendlist, char*publist ){ + client = net_mod_socket_open(); + char content[MAXLINE]; + char action[512]; + char topic[512]; + int buskey; + + int recv_arr_size, i, n; + net_mod_recv_msg_t *recv_arr; + + pthread_t tid; + // 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭� + pthread_create(&tid, NULL, print_sub_msg, client); + + //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11 + net_node_t *node_arr; + int node_arr_size = parse_node_list(sendlist, &node_arr); + print_node_list(node_arr, node_arr_size); + + //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8 + net_node_t *pub_node_arr; + int pub_node_arr_size = parse_node_list(publist, &pub_node_arr); + print_node_list(pub_node_arr, pub_node_arr_size); + + while (true) { + //printf("Usage: pub <topic> [content] or sub <topic>\n"); + printf("Can I help you? pub,sub,desub,send or quit\n"); + scanf("%s",action); + + if(strcmp(action, "pub") == 0) { + printf("Please input topic and content\n"); + scanf("%s %s", topic, content); + + n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1); + printf("pub %d nodes\n", n); + } + else if(strcmp(action, "send") == 0) { + getc(stdin); + printf("Please input content\n"); + + if (fgets(content, MAXLINE, stdin) != NULL) { + // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰� + // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1); + printf(" %d nodes reply\n", n); + for(i=0; i<recv_arr_size; i++) { + printf("reply from (host:%s, port: %d, key:%d) >> %s\n", + recv_arr[i].host, + recv_arr[i].port, + recv_arr[i].key, + (char *)recv_arr[i].content + ); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } + } + else if(strcmp(action, "desub") == 0) { + printf("Please input topic!\n"); + + scanf("%s", topic); + if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) { + printf("%d Desub success!\n", net_mod_socket_get_key(client)); + } else { + printf("Desub failture!\n"); + exit(0); + } + + } + else if(strcmp(action, "sub") == 0) { + printf("Please input topic!\n"); + scanf("%s",topic); + + if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) { + printf("%d Sub success!\n", net_mod_socket_get_key(client)); + } else { + printf("Sub failture!\n"); + exit(0); + } + + } + else if(strcmp(action, "quit") == 0) { + break; + } else { + printf("error input argument\n"); + continue; + } + + } + net_mod_socket_close(client); + + +} + +void *_run_one_sendto_many_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[128]; + + int j, n; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + int total = 0; + + int rkey, lkey; + unsigned int l = 0 , rl; + const char *hello_format = "%d say Hello %d"; + const char *reply_format = "%d RECEIVED %d say Hello %d"; + + char filename[128]; + sprintf(filename, "test%d.tmp", targ->node->key); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + // fp = stdout; + + int recvsize; + void *recvbuf; + for (l = 0; l < SCALE; l++) { + sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); + // fprintf(fp, "requst:%s\n", sendbuf); + // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); + printf("%d: send %d nodes\n", l, n); + for(j=0; j < recv_arr_size; j++) { + + fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", + net_mod_socket_get_key(client), + sendbuf, + targ->node->key, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); + + printf("key == %d\n", net_mod_socket_get_key(client)); + assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); + assert(targ->node->key == rkey); + assert(net_mod_socket_get_key(client) == lkey); + assert(rl == l); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + total += n; + } + if(fp != NULL) + fclose(fp); + // net_mod_socket_close(client); + return (void *)total; +} + +//澶氱嚎绋媠end +void one_sendto_many(char *nodelist) { + + int status, i = 0; + + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + + char sendbuf[512]; + struct timeval start, end; + long total = 0; + + client = net_mod_socket_open(); + net_mod_socket_bind(client, shm_mm_wrapper_alloc_key()); + + net_node_t *node_arr; + int node_arr_size = parse_node_list(nodelist, &node_arr); + Targ targs[node_arr_size]; + pthread_t tids[node_arr_size]; + void *res[node_arr_size]; + + printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < node_arr_size; i++) { + targs[i].node = node_arr + i; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_one_sendto_many_, (void *)&targs[i]); + } + + for (i = 0; i < node_arr_size; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", + SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + +} + +// 鏃犻檺寰幆send +void test_net_sendandrecv(char *nodelist) { + + int n, j; + void * client; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + net_node_t *node_arr; + int node_arr_size = parse_node_list(nodelist, &node_arr); + char buf[128]; + pid_t pid, retPid ; + unsigned int l , retl; + int remoteKey; + const char *hello_format = "%d say Hello %u "; + const char *reply_format = "%d RECEIVED %d say Hello %d"; + + pid = getpid(); + l = 0; + + client = net_mod_socket_open(); + while(true) { + sprintf(buf, hello_format, pid, l); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, + &recv_arr, &recv_arr_size, 1000); + printf(" %d nodes reply\n", n); + for(j = 0; j < recv_arr_size; j++) { + + printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", + (long)pid, + buf, + (char *)recv_arr[j].content, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key + + ); + + + + assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); + assert(retPid == pid); + assert(retl == l); + assert(remoteKey == recv_arr[j].key); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + l++; + } + + net_mod_socket_close(client); + +} + +void *_run_pub_(void *arg) { + Targ *targ = (Targ *)arg; + char sendbuf[128]; + + int i,j, n; + int total = 0; + + net_node_t *node_arr; + int node_arr_size = parse_node_list(targ->nodelist, &node_arr); + + const char *topic = "news"; + // char filename[512]; + // sprintf(filename, "test%d.tmp", targ->id); + // FILE *fp = NULL; + // fp = fopen(filename, "w+"); + // fp = stdout; + + + for (i = 0; i < SCALE; i++) { + sprintf(sendbuf, "thread(%ld) %d", targ->id, i); + + n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); + // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + LoggerFactory::getLogger()->debug( "pub:%s to %d nodes\n", sendbuf, n); + total += n; + } + // fclose(fp); + + return (void *)total; +} + +//澶氱嚎绋媝ub +void test_net_pub_threads(char *nodelist) { + + int status, i = 0, processors = 4; + void *res[processors]; + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + Targ targs[processors]; + pthread_t tids[processors]; + char sendbuf[512]; + struct timeval start, end; + long total = 0; + client = net_mod_socket_open(); + +printf("寮�濮嬫祴璇�...\n"); + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].nodelist = nodelist; + targs[i].id = i; + pthread_create(&tids[i], NULL, _run_pub_, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + // fflush(stdout); + net_mod_socket_close(client); +} + +// 鏃犻檺寰幆pub +void test_net_pub(char *nodelist) { + + int n; + char sendbuf[512]; + net_node_t *node_arr; + int node_arr_size = parse_node_list(nodelist, &node_arr); + + char *topic = "news"; + sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid()); + + void * client = net_mod_socket_open(); + while (true) { + n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); + // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n); + } + net_mod_socket_close(client); +} + +void list () { + LockFreeQueue<shm_packet_t> * mqueue; + hashtable_t *hashtable = mm_get_hashtable(); + printf("%10s \t %-10s \t %10s\n", "KEY", "LENGTH", "STATUS"); + hashtable_foreach(hashtable, [&](int key, void * value){ + if(key >= 100 ) { + mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key); + if((long)mqueue == 0x1) { + printf("%10d \t %-10s\n", key, "Not In Used"); + } else { + printf("%10d \t %-10d\n", key, mqueue->size()); + } + + } else { + printf("%10d\n", key); + } + + }); +} + +void info(int key) { + LockFreeQueue<shm_packet_t> * mqueue; + hashtable_t *hashtable = mm_get_hashtable(); + mqueue = (LockFreeQueue<shm_packet_t> *) hashtable_get(hashtable, key); + printf("%10s: %-10p\n", "PTR", mqueue); + printf("%10s: %-10d\n", "KEY", key); + printf("%10s: %-10d\n", "LENGTH", mqueue->size()); + + +} + + +void remove(int key) { + hashtable_t *hashtable = mm_get_hashtable(); + + LockFreeQueue<shm_packet_t> * mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key); + if(mqueue != NULL) { + delete mqueue; + hashtable_remove(hashtable, key); + } +} + +void do_sendandrecv(int key, char *sendbuf) { + int n, j; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + + net_node_t node_arr[] = {NULL, 0, key}; + + void * client = net_mod_socket_open(); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000); + if(n == 0) { + printf("send failed\n"); + return; + } + printf(" %d nodes reply\n", n); + for(j=0; j < recv_arr_size; j++) { + + fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n", + net_mod_socket_get_key(client), + sendbuf, + key, + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + (char *)recv_arr[j].content + ); + } + + net_mod_socket_close(client); +} + + + +void usage(char *name) +{ + #define fpe(str) fprintf(stderr, " %s", str); + + fprintf(stderr, "Usage: %s {function} [OPTIONS] [ARG...]\n\n", name); + fprintf(stderr, "Test shmsocket\n\n"); + + fprintf(stderr, "Options:\n\n"); + fpe("-p, --port TCP/IP Port\n"); + fpe("-k, --key SHM Key\n"); + fpe("--sendlist format锛�--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n"); + fpe("--publist format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n"); + fpe("\n"); + + fprintf(stderr, "Examples:\n\n"); + fpe("# sendandrecv to socket which has key 100\n"); + fpe("./shm_util sendandrecv 100 \"hello\"\n"); + fpe("# list all key\n"); + fpe("./shm_util list\n"); + fpe("# remove key 1001\n"); + fpe("./shm_util rm 1001\n"); + fpe("./shm_util info 1002\n"); + fpe("./shm_util recvfrom --bind 1002 [--force]\n") + fpe("\n"); +} + + + +argument_t parse_args (int argc, char *argv[]) +{ + int c; + + if(argc < 2) { + usage(argv[0]); + exit(1); + } + + + + argument_t mopt = {}; + + // mopt.volume_list_size = 0; + mopt.interactive = false; + + opterr = 0; + + + static struct option long_options[] = + { + /* These options set a flag. */ + + {"key", required_argument, 0, 'k'}, + {"port", required_argument, 0, 'p'}, + {"interactive", no_argument, 0, 'i'}, + {"force", no_argument, 0, 'f'}, + {"bind", required_argument, (int *)mopt.bind, 0}, + {"sendlist", required_argument, (int *)mopt.sendlist, 0}, + {"publist", required_argument, (int *)mopt.publist, 0}, + {0, 0, 0, 0} + }; + /* getopt_long stores the option index here. */ + int option_index = 0; + while (1) + { + + + c = getopt_long (argc, argv, "+fk:p:i", long_options, &option_index); + + /* Detect the end of the options. */ + if (c == -1) + break; + + switch (c) + { + case 0: + /* If this option set a flag, do nothing else now. */ + if (long_options[option_index].flag != 0) + break; + + if(strcmp(long_options[option_index].name, "sendlist") == 0) { + mopt.sendlist = optarg; + } + else if(strcmp(long_options[option_index].name, "publist") == 0) { + mopt.publist = optarg; + } + else if(strcmp(long_options[option_index].name, "bind") == 0) { + mopt.bind = atoi(optarg); + } + else { + printf ("option %s", long_options[option_index].name); + if (optarg) + printf (" with arg %s", optarg); + printf ("\n"); + } + + break; + + case 'k': + mopt.key = atoi(optarg); + break; + + case 'i': + mopt.interactive = true; + break; + + case 'f': + mopt.force = true; + break; + + case 'p': + // printf ("==name with value `%s'\n", optarg); + mopt.port = atoi(optarg); + break; + + case '?': + printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind); + /* getopt_long already printed an error message. */ + usage(argv[0]); + exit(1); + break; + + default: + //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]); + break; + } + } + + // printf ("optind = %d, argc=%d \n", optind, argc); + /* Print any remaining command line arguments (not options). */ + if (optind < argc) + { + mopt.cmd_arr = &argv[optind]; + mopt.cmd_arr_len = argc - optind; + // printf ("non-option ARGV-elements: "); + // while (optind < argc) + // printf ("%d, %d, %s \n", optind, argc, argv[optind++]); + // putchar ('\n'); + } + return mopt; + +} + + +/** + * @str "192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11" + * @node_arr_addr 杩斿洖澶勭悊鍚庣殑缃戠粶鑺傜偣鏁扮粍 + * { + * {"192.168.5.22", 5000, 11}, + * {"192.168.20.10", 5000, 11}, + * {"192.168.20.104", 5000, 11} + * } + * @return 鏁扮粍鐨勯暱搴� + */ +int parse_node_list(const char *str, net_node_t *node_arr_addr[]) { + int i, j; + char **property_arr; + int property_arr_len; + char **entry_arr; + int entry_arr_len = str_split(str, ",", &entry_arr); + + net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); + for(i = 0; i < entry_arr_len; i++) { + property_arr_len = str_split(entry_arr[i], ":", &property_arr); + printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); + + node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; + + free(property_arr[1]); + if(property_arr_len == 3) { + node_arr[i].key = atoi(property_arr[2]); + free(property_arr[2]); + } + free(entry_arr[i]); + + } + *node_arr_addr = node_arr; + + + return entry_arr_len; +} + +void print_node_list(net_node_t *node_arr, int len) { + printf("============node list begin==========\n"); + for(int i = 0; i < len; i++) { + printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key); + } + printf("============node list end==========\n"); +} + + + + +int main(int argc, char *argv[]) { + int i; + char *prog; + char * fun; + argument_t opt = {}; + + shm_mm_wrapper_init(512); + + if(argc < 2) { + usage(argv[0]); + exit(1); + } + prog = argv[0]; + fun = argv[1]; + argc--; + argv++; + + + if (strcmp("help", fun) == 0 ) { + usage(prog); + } + else if (strcmp("list", fun) == 0 ) { + list(); + } + else if (strcmp("info", fun) == 0 ) { + if(argc < 2) { + + usage(prog); + + } else { + for(i = 1; i < argc; i++) { + int key = atoi(argv[i]); + info(key); + } + } + } + else if (strcmp("rm", fun) == 0 ) { + if(argc < 2) { + usage(prog); + + } else { + for(i = 1; i < argc; i++) { + int key = atoi(argv[i]); + remove(key); + } + } + + } + else if (strcmp("sendandrecv", fun) == 0 ) { + if(argc < 3) { + usage(prog); + exit(1); + } + int key = atoi(argv[1]); + char *content = argv[2]; + do_sendandrecv(key, content); + } + else if (strcmp("start_bus_server", fun) == 0) { + + start_bus_server(opt); + } + else if (strcmp("start_resycle", fun) == 0) { + + start_resycle(); + } + + else if (strcmp("start_net_proxy", fun) == 0 ) { + opt = parse_args(argc, argv); + if(opt.port == 0) { + usage(prog); + exit(1); + } + start_net_proxy(opt); + + } + + else if (strcmp("recvfrom", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.bind == 0) { + usage(argv[0]); + } else { + start_recvfrom(opt.bind, opt.force); + } + + } + else if (strcmp("start_net_client", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + if(opt.publist == 0) { + fprintf(stderr, "Missing publist.\n"); + usage(argv[0]); + exit(1); + } + start_net_client(opt.sendlist, opt.publist); + } + else if (strcmp("one_sendto_many", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + one_sendto_many(opt.sendlist); + } + else if (strcmp("test_net_sendandrecv", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.sendlist == 0) { + fprintf(stderr, "Missing sendlist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_sendandrecv(opt.sendlist); + } + else if (strcmp("test_net_pub_threads", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub_threads(opt.publist); + } + else if (strcmp("test_net_pub", fun) == 0) { + opt = parse_args(argc, argv); + if(opt.publist == 0) { + fprintf(stderr, "Missing publist .\n"); + usage(argv[0]); + exit(1); + } + + test_net_pub(opt.publist); + } + + else { + printf("%Invalid funciton name\n"); + usage(argv[0]); + exit(1); + + } + + shm_mm_wrapper_destroy(); + +} diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 0921af3..74b9b33 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -87,19 +87,15 @@ template <typename ELEM_T> bool SHMQueue<ELEM_T>::bind(int key, bool force) { - - hashtable_lock(hashtable); void *tmp_ptr = hashtable_get(hashtable, key); if (tmp_ptr == NULL || tmp_ptr == (void *)1 || force) { queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(mqsize); hashtable_put(hashtable, key, (void *)queue); mkey = key; owner = true; - hashtable_unlock(hashtable); return true; } - hashtable_unlock(hashtable); return false; } diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp old mode 100755 new mode 100644 index e435172..a223c0c --- a/src/shm/hashtable.cpp +++ b/src/shm/hashtable.cpp @@ -138,13 +138,32 @@ } void *hashtable_get(hashtable_t *hashtable, int key) { + int rv; + + if((rv = svsem_wait(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(errno, "hashtable_get\n"); + } void * res = _hashtable_get(hashtable, key); + + if((rv = svsem_post(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(errno, "hashtable_get\n"); + } return res; } void hashtable_put(hashtable_t *hashtable, int key, void *value) { + int rv; + + if((rv = svsem_wait(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(errno, "hashtable_put\n"); + } _hashtable_put(hashtable, key, value); hashtable->queueCount++; + + if((rv = svsem_post(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(errno, "hashtable_put\n"); + } + } bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) { @@ -249,17 +268,6 @@ return keyset; } - - -int hashtable_lock(hashtable_t *hashtable) { - return svsem_wait(hashtable->mutex); -} - -int hashtable_unlock(hashtable_t *hashtable) { - return svsem_post(hashtable->mutex); -} - - void hashtable_removeall(hashtable_t *hashtable) { tailq_entry_t *item; @@ -294,7 +302,6 @@ { return key % MAPSIZE; - /*printf("hashfun = %ld\n", code);*/ } /** diff --git a/src/shm/hashtable.cpp.2 b/src/shm/hashtable.cpp.2 old mode 100755 new mode 100644 diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h old mode 100755 new mode 100644 index 90043c3..6c3cd27 --- a/src/shm/hashtable.h +++ b/src/shm/hashtable.h @@ -33,10 +33,6 @@ void *hashtable_remove(hashtable_t *hashtable, int key); void hashtable_removeall(hashtable_t *hashtable); - -int hashtable_lock(hashtable_t *hashtable); -int hashtable_unlock(hashtable_t *hashtable); - int hashtable_get_queue_count(hashtable_t *hashtable) ; /** * 閬嶅巻hash_table diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp index 8fff5c2..e55192c 100644 --- a/src/shm/mm.cpp +++ b/src/shm/mm.cpp @@ -113,7 +113,6 @@ newsize = ALIGN(size + (SIZE_T_SIZE << 1) + (PTR_SIZE << 1) ); - //fprintf(stderr, "mm_malloc : size=%u\n", size); /* Search the free list for a fit */ SemUtil::dec(mutex); if ((ptr = find_fit(newsize)) != NULL) diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h index 18c5370..63a9e06 100644 --- a/src/shm/shm_mm.h +++ b/src/shm/shm_mm.h @@ -30,16 +30,14 @@ template <typename T> T* shm_mm_attach(int key) { - void *ptr; - // T* tptr; - hashtable_t *hashtable = mm_get_hashtable(); + void *ptr; + // T* tptr; + hashtable_t *hashtable = mm_get_hashtable(); ptr = hashtable_get(hashtable, key); -// printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr); if(ptr == NULL || ptr == (void *)1 ) { ptr = mm_malloc(sizeof(T)); hashtable_put(hashtable, key, ptr); new(ptr) T; -// printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr); } return (T*)ptr; } diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 657941b..7a45696 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -39,7 +39,6 @@ subscripter_set = map_iter->second; if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { subscripter_set->erase(set_iter); -// printf("remove_subscripter %s, %d\n", map_iter->first, key); count++; } } @@ -201,7 +200,6 @@ subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { send_key = *set_iter; -// logger->debug("_proxy_pub send before %d \n", send_key); rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); if(rv == 0) { continue; @@ -232,26 +230,28 @@ char resp_buf[128]; bus_head_t head; + int rv; + char send_buf[512] = { 0x00 }; + const char *topic_delim = ","; -// logger.debug("_run_proxy_ server receive before\n"); + while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { -// logger.debug("_run_proxy_ server recvfrom %d after: %s \n", key, buf); head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; -// logger.debug("_run_proxy_ : %s\n", action); + if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); -// logger.debug("_run_proxy_ topic = %s\n", topic); while(topic) { + _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "desub") == 0) { -// logger.debug("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); + if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� _proxy_desub_all(key); @@ -259,6 +259,7 @@ topic = strtok(topics, topic_delim); while(topic) { + _proxy_desub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } @@ -270,7 +271,16 @@ _proxy_pub(topics, content, head.content_size, key); } - else if(strcmp(action, "stop") == 0) { + else if (strncmp(buf, "request", strlen("request")) == 0) { + sprintf(send_buf, "%4d", key); + strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); + + rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key); + if(rv != 0) { + logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n"); + } + } + else if(strcmp(action, "stop") == 0) { logger->info( "Stopping Bus..."); free(buf); break; diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index 466d0b5..abd9477 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -10,7 +10,6 @@ } ShmModSocket::~ShmModSocket() { - // logger->debug("Close ShmModSocket...\n"); struct timespec timeout = {1, 0}; if(bus_set != NULL) { for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { @@ -216,6 +215,7 @@ int buf_size; char *buf; int max_buf_size; + void *buf_ptr; if((buf = (char *) malloc(MAXBUF)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); exit(1); @@ -234,13 +234,15 @@ } } - memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE); + buf_ptr = ShmModSocket::encode_bus_head(request_head); + memcpy(buf, buf_ptr, BUS_HEAD_SIZE); if(topic_size != 0 ) memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); if(content_size != 0) memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); *retbuf = buf; + free(buf_ptr); return buf_size; } @@ -259,7 +261,7 @@ tmp_ptr += sizeof(head.action); PUT(tmp_ptr, htonl(head.topic_size)); - tmp_ptr += 4; + tmp_ptr += sizeof(head.topic_size); PUT(tmp_ptr, htonl(head.content_size)); return headbs; @@ -274,7 +276,7 @@ tmp_ptr += sizeof(head.action); head.topic_size = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(head.topic_size); head.content_size = ntohl(GET(tmp_ptr)); return head; diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index 0c65f52..9890aef 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -11,7 +11,7 @@ #include <set> #include "socket_def.h" -#define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t)) +#define BUS_HEAD_SIZE sizeof(bus_head_t) class BusServerSocket; struct bus_head_t diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 94b3fdd..978eda9 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -41,21 +41,16 @@ static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue<shm_packet_t> *queue; - hashtable_lock(hashtable); void *tmp_ptr = hashtable_get(hashtable, key); - if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { - queue = new LockFreeQueue<shm_packet_t>(16); + queue = new LockFreeQueue<shm_packet_t>(32); hashtable_put(hashtable, key, (void *)queue); - hashtable_unlock(hashtable); return queue; } else if(force) { - hashtable_unlock(hashtable); return (LockFreeQueue<shm_packet_t> *) tmp_ptr; } - hashtable_unlock(hashtable); return NULL; } @@ -67,7 +62,6 @@ hashtable_t *hashtable = mm_get_hashtable(); void *tmp_ptr = hashtable_get(hashtable, key); if (tmp_ptr == NULL || tmp_ptr == (void *)1) { - //logger->error("shm_socket._remote_queue_attach锛歝onnet at key %d failed!", key); return NULL; } @@ -112,23 +106,33 @@ static int _shm_socket_close_(shm_socket_t *sockt) { - int rv; + int rv, i; + hashtable_t *hashtable = mm_get_hashtable(); logger->debug("shm_socket_close\n"); - // hashtable_remove(hashtable, mkey); - // if(sockt->queue != NULL) { - // sockt->queue = NULL; - // } - - if(sockt->key != 0) { - auto it = shmQueueStMap->find(sockt->key); - if(it != shmQueueStMap->end()) { - it->second.status = SHM_QUEUE_ST_CLOSED; - it->second.closeTime = time(NULL); - } - } - + // if(sockt->key != 0) { + // auto it = shmQueueStMap->find(sockt->key); + // if(it != shmQueueStMap->end()) { + // it->second.status = SHM_QUEUE_ST_CLOSED; + // it->second.closeTime = time(NULL); + // } + // } + + + + if(sockt->queue != NULL) { + sockt->queue->close(); + for( i = 0; i < sockt->queue->size(); i++) { + mm_free((*(sockt->queue))[i].buf); + logger->info("======= %d free queue element buf\n", sockt->key); + } + sleep(1); + + hashtable_remove(hashtable, sockt->key); + // sockt->queue = NULL; + } + pthread_mutex_destroy(&(sockt->mutex) ); free(sockt); return 0; @@ -168,8 +172,6 @@ int shm_socket_get_key(shm_socket_t *sockt){ return sockt->key; } - - // 鐭繛鎺ユ柟寮忓彂閫� int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, @@ -234,7 +236,7 @@ shm_packet_t sendpak = {0}; sendpak.key = sockt->key; sendpak.size = sendsize; - memcpy(sendpak.uuid, recvpak.uuid, sizeof sendpak.uuid); + memcpy(sendpak.uuid, recvpak.uuid, sizeof(sendpak.uuid)); if(sendbuf !=NULL && sendsize > 0) { sendpak.buf = mm_malloc(sendsize); memcpy(sendpak.buf, sendbuf, sendsize); @@ -264,11 +266,14 @@ } - - if(buf != NULL && recvpak.buf != NULL) { - void *_buf = malloc(recvpak.size); - memcpy(_buf, recvpak.buf, recvpak.size); - *buf = _buf; + if(recvpak.buf != NULL) { + if (buf == NULL) { + logger->warn("!!!Alert: buf should be not NULL!\n"); + } else { + void *_buf = malloc(recvpak.size); + memcpy(_buf, recvpak.buf, recvpak.size); + *buf = _buf; + } } if(size != NULL) @@ -372,7 +377,7 @@ logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); if(strlen(recvpak.uuid) == 0) { continue; - } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) { + } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) { // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛 goto LABLE_SUC; } else { @@ -406,6 +411,8 @@ int rv = 0, tryn = 16; + static int Counter_suc = 0; + static int Counter_fail = 0; shm_packet_t sendpak; shm_packet_t recvpak; std::map<int, shm_packet_t>::iterator recvbufIter; @@ -422,7 +429,6 @@ if (tmp_socket == NULL) { /* If first call from this thread, allocate buffer for thread, and save its location */ - logger->debug("%lu create threadlocal socket\n", (long)pthread_self() ); tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM); rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); @@ -449,7 +455,6 @@ recvbufIter = tmp_socket->recvbuf2.find(key); if(recvbufIter != tmp_socket->recvbuf2.end()) { // 鍦ㄧ紦瀛橀噷鏌ュ埌浜唊ey鍖归厤鎴愬姛鐨� - // logger->info("get from recvbuf: %d", key); recvpak = recvbufIter->second; tmp_socket->recvbuf2.erase(recvbufIter); goto LABLE_SUC; @@ -462,7 +467,7 @@ return rv; } - if (key == recvpak.key) { + if (key == recvpak.key) { // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛 goto LABLE_SUC; } else { @@ -596,10 +601,16 @@ if (remoteQueue == NULL ) { goto ERR_CLOSED; + } else if(remoteQueue->isClosed()) { + goto ERR_CLOSED; } sendpak->key = sockt->key; rv = remoteQueue->push(*sendpak, timeout, flag); + + if(rv != 0) { + mm_free(sendpak->buf); + } if(rv == ETIMEDOUT) { return EBUS_TIMEOUT; } diff --git a/systype.sh b/systype.sh old mode 100755 new mode 100644 diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh old mode 100755 new mode 100644 diff --git a/test_queue/test_lostdata.sh b/test_queue/test_lostdata.sh old mode 100755 new mode 100644 diff --git a/test_socket/heart_beat.sh b/test_socket/heart_beat.sh old mode 100755 new mode 100644 -- Gitblit v1.8.0