From 26ed48c4e616014ee760fd13d13dbdc8539c34e3 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期二, 22 十二月 2020 19:21:55 +0800 Subject: [PATCH] 解决sendandrecv发送到一个不存在key的情况 --- src/socket/net_mod_socket.c | 37 +++++++----- src/socket/shm_socket.c | 40 ++++++++++-- test_net_socket/test_net_mod_socket.c | 37 ++++++++---- src/socket/bus_server_socket_wrapper.c | 4 src/socket/net_mod_socket_wrapper.c | 2 src/socket/net_mod_server_socket_wrapper.c | 3 6 files changed, 84 insertions(+), 39 deletions(-) diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c index 124b1a7..5c793c1 100644 --- a/src/socket/bus_server_socket_wrapper.c +++ b/src/socket/bus_server_socket_wrapper.c @@ -7,7 +7,7 @@ * 鍒涘缓 */ void * bus_server_socket_wrapper_open() { - + printf("===bus_server_socket_wrapper_open\n"); BusServerSocket *sockt = new BusServerSocket; return (void *)sockt; } @@ -16,7 +16,7 @@ * 鍏抽棴 */ void bus_server_socket_wrapper_close(void *_socket) { - printf("bus_server_socket_wrapper_close\n"); + printf("===bus_server_socket_wrapper_close\n"); BusServerSocket *sockt = (BusServerSocket *)_socket; delete sockt; } diff --git a/src/socket/net_mod_server_socket_wrapper.c b/src/socket/net_mod_server_socket_wrapper.c index 6f8be04..c05832c 100644 --- a/src/socket/net_mod_server_socket_wrapper.c +++ b/src/socket/net_mod_server_socket_wrapper.c @@ -2,13 +2,14 @@ #include "net_mod_server_socket_wrapper.h" void *net_mod_server_socket_open(int port) { + printf("====net_mod_server_socket_open\n"); net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)malloc(sizeof(net_mod_server_socket_t)); sockt->sockt = new NetModServerSocket(port); return (void *)sockt; } void net_mod_server_socket_close(void *_sockt) { - printf("net_mod_server_socket_close\n"); + printf("====net_mod_server_socket_close\n"); net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt; delete sockt->sockt; free(sockt); diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c index 1e1fc27..48b0e7c 100644 --- a/src/socket/net_mod_socket.c +++ b/src/socket/net_mod_socket.c @@ -85,7 +85,7 @@ int i, n, recv_size, connfd; net_node_t *node; - void *recv_buf; + void *recv_buf = NULL; net_mod_request_head_t request_head = {}; @@ -131,13 +131,15 @@ node = &node_arr[i]; if(node->host == NULL || strcmp(node->host, "") == 0 ) { // 鏈湴鍙戦�� - shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); - strcpy( ret_arr[n_recv_suc].host,""); - ret_arr[n_recv_suc].port = 0; - ret_arr[n_recv_suc].key = node->key; - ret_arr[n_recv_suc].content = recv_buf; - ret_arr[n_recv_suc].content_length = recv_size; - n_recv_suc++; + if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) { + strcpy( ret_arr[n_recv_suc].host,""); + ret_arr[n_recv_suc].port = 0; + ret_arr[n_recv_suc].key = node->key; + ret_arr[n_recv_suc].content = recv_buf; + ret_arr[n_recv_suc].content_length = recv_size; + n_recv_suc++; + } + continue; } @@ -234,6 +236,17 @@ return n_recv_suc; } + + +void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { + + for(int i =0; i< size; i++) { + if(arr[i].content != NULL) + free(arr[i].content); + } + free(arr); +} + int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); @@ -631,14 +644,6 @@ return shmModSocket.get_key(); } - -void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { - - for(int i =0; i< size; i++) { - free(arr[i].content); - } - free(arr); -} diff --git a/src/socket/net_mod_socket_wrapper.c b/src/socket/net_mod_socket_wrapper.c index f464487..61373dd 100644 --- a/src/socket/net_mod_socket_wrapper.c +++ b/src/socket/net_mod_socket_wrapper.c @@ -7,6 +7,7 @@ * 鍒涘缓 */ void * net_mod_socket_open() { + printf("=====net_mod_socket_open\n"); net_mod_socket_t *sockt = (net_mod_socket_t *)malloc(sizeof(net_mod_socket_t)); sockt->sockt = new NetModSocket; return (void *)sockt; @@ -16,6 +17,7 @@ * 鍏抽棴 */ void net_mod_socket_close(void *_socket) { + printf("====net_mod_socket_close\n"); net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; delete sockt->sockt; free(sockt); diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 9581b69..efb3ef7 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -7,9 +7,12 @@ -void print_msg(char *head, shm_msg_t &msg) { +static void print_msg(char *head, shm_msg_t &msg) { // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); } + +static pthread_once_t _once_ = PTHREAD_ONCE_INIT; +static pthread_key_t _tmp_recv_socket_key_; static void *_server_run_msg_rev(void *_socket); @@ -18,6 +21,9 @@ static int _shm_close_dgram_socket(shm_socket_t *socket); static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); + +static void _destrory_tmp_recv_socket_(void *tmp_socket); +static void _create_tmp_recv_socket_key(void); // 妫�鏌ey鏄惁宸茬粡琚娇鐢紝鏄繑鍥�0, 鍚﹁繑鍥�1 static inline int _shm_socket_check_key(shm_socket_t *socket) { @@ -38,6 +44,7 @@ } shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { + shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->socket_type = socket_type; socket->key = -1; @@ -45,12 +52,14 @@ socket->dispatch_thread = 0; socket->status = SHM_CONN_CLOSED; socket->mutex = SemUtil::get(IPC_PRIVATE, 1); + logger->debug("shm_open_socket\n"); return socket; } -int shm_close_socket(shm_socket_t *socket) { +static int _shm_close_socket(shm_socket_t *socket) { int ret; + switch (socket->socket_type) { case SHM_SOCKET_STREAM: ret = _shm_close_stream_socket(socket, true); @@ -61,9 +70,17 @@ default: break; } - SemUtil::remove(socket->mutex); free(socket); + SemUtil::remove(socket->mutex); + logger->debug("shm_close_socket\n"); return ret; +} + +int shm_close_socket(shm_socket_t *socket) { + + // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_)); + + return _shm_close_socket(socket);; } int shm_socket_bind(shm_socket_t *socket, int key) { @@ -387,14 +404,20 @@ } } -static pthread_once_t _once_ = PTHREAD_ONCE_INIT; -static pthread_key_t _tmp_recv_socket_key_; /* Free thread-specific data buffer */ static void _destrory_tmp_recv_socket_(void *tmp_socket) { + int rv; + if(tmp_socket == NULL) + return; logger->debug("%d destroy tmp socket\n", pthread_self()); - shm_close_socket((shm_socket_t *)tmp_socket); + _shm_close_socket((shm_socket_t *)tmp_socket); + rv = pthread_setspecific(_tmp_recv_socket_key_, NULL); + if ( rv != 0) { + logger->error(rv, "shm_sendandrecv : pthread_setspecific"); + exit(1); + } } /* One-time key creation function */ @@ -405,6 +428,7 @@ /* Allocate a unique thread-specific data key and save the address of the destructor for thread-specific data buffers */ s = pthread_key_create(&_tmp_recv_socket_key_, _destrory_tmp_recv_socket_); + //s = pthread_key_create(&_tmp_recv_socket_key_, NULL); if (s != 0) { logger->error(s, "pthread_key_create"); abort(); /* dump core and terminate */ @@ -444,8 +468,8 @@ logger->debug("%d create tmp socket\n", pthread_self() ); tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); - rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket); - if (rv != 0) { + rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket); + if ( rv != 0) { logger->error(rv, "shm_sendandrecv : pthread_setspecific"); exit(1); } diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c index 1ca04d0..f509e2c 100644 --- a/test_net_socket/test_net_mod_socket.c +++ b/test_net_socket/test_net_mod_socket.c @@ -15,6 +15,7 @@ }Targ; struct argument_t { + bool interactive; char *fun; int port; int key; @@ -54,13 +55,16 @@ } } -void start_net_proxy(int port) { +void start_net_proxy(argument_t &arg) { pthread_t tid; printf("Start net proxy\n"); - void *serverSocket = net_mod_server_socket_open(port); + void *serverSocket = net_mod_server_socket_open(arg.port); // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴server - pthread_create(&tid, NULL, proxy_server_handler, serverSocket); + if(arg.interactive) { + pthread_create(&tid, NULL, proxy_server_handler, serverSocket); + } + if(net_mod_server_socket_start(serverSocket) != 0) { err_exit(errno, "net_mod_server_socket_start"); } @@ -103,12 +107,14 @@ -void start_bus_server() { +void start_bus_server(argument_t &arg) { printf("Start bus server\n"); void * server_socket = bus_server_socket_wrapper_open(); pthread_t tid; // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴bus - // pthread_create(&tid, NULL, bus_handler, server_socket); + if(arg.interactive) + pthread_create(&tid, NULL, bus_handler, server_socket); + if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { printf("start bus failed\n"); exit(1); @@ -152,12 +158,12 @@ //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11 net_node_t *node_arr; int node_arr_size = parse_node_list(sendlist, &node_arr); - // print_node_list(node_arr, node_arr_size); + print_node_list(node_arr, node_arr_size); //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8 net_node_t *pub_node_arr; int pub_node_arr_size = parse_node_list(publist, &pub_node_arr); - // print_node_list(pub_node_arr, pub_node_arr_size); + print_node_list(pub_node_arr, pub_node_arr_size); while (true) { //printf("Usage: pub <topic> [content] or sub <topic>\n"); @@ -405,12 +411,12 @@ usage(argv[0]); exit(1); } - start_net_proxy(opt.port); + start_net_proxy(opt); } else if (strcmp("start_bus_server", opt.fun) == 0) { - start_bus_server(); + start_bus_server(opt); } else if (strcmp("start_reply", opt.fun) == 0) { if(opt.key == 0) { @@ -497,6 +503,7 @@ argument_t mopt = {}; // mopt.volume_list_size = 0; + mopt.interactive = false; opterr = 0; @@ -508,6 +515,7 @@ {"fun", required_argument, 0, 'f'}, {"key", required_argument, 0, 'k'}, {"port", required_argument, 0, 'p'}, + {"interactive", no_argument, 0, 'i'}, {"sendlist", required_argument, (int *)mopt.sendlist, 0}, {"publist", required_argument, (int *)mopt.publist, 0}, {0, 0, 0, 0} @@ -518,7 +526,7 @@ { - c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index); + c = getopt_long (argc, argv, "+f:k:p:i", long_options, &option_index); /* Detect the end of the options. */ if (c == -1) @@ -552,6 +560,10 @@ case 'k': mopt.key = atoi(optarg); + break; + + case 'i': + mopt.interactive = true; break; case 'p': @@ -608,7 +620,8 @@ net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); for(i = 0; i < entry_arr_len; i++) { property_arr_len = str_split(entry_arr[i], ":", &property_arr); - // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); + printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); + node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; free(property_arr[1]); @@ -628,7 +641,7 @@ void print_node_list(net_node_t *node_arr, int len) { printf("============node list begin==========\n"); for(int i = 0; i < len; i++) { - printf("%s,%d,%d,\n", node_arr[i].host, node_arr[i].port, node_arr[i].key); + printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key); } printf("============node list end==========\n"); } -- Gitblit v1.8.0