解决sendandrecv发送到一个不存在key的情况
| | |
| | | * 创建 |
| | | */ |
| | | void * bus_server_socket_wrapper_open() { |
| | | |
| | | printf("===bus_server_socket_wrapper_open\n"); |
| | | BusServerSocket *sockt = new BusServerSocket; |
| | | return (void *)sockt; |
| | | } |
| | |
| | | * 关闭 |
| | | */ |
| | | void bus_server_socket_wrapper_close(void *_socket) { |
| | | printf("bus_server_socket_wrapper_close\n"); |
| | | printf("===bus_server_socket_wrapper_close\n"); |
| | | BusServerSocket *sockt = (BusServerSocket *)_socket; |
| | | delete sockt; |
| | | } |
| | |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | |
| | | void *net_mod_server_socket_open(int port) { |
| | | printf("====net_mod_server_socket_open\n"); |
| | | net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)malloc(sizeof(net_mod_server_socket_t)); |
| | | sockt->sockt = new NetModServerSocket(port); |
| | | return (void *)sockt; |
| | | } |
| | | |
| | | void net_mod_server_socket_close(void *_sockt) { |
| | | printf("net_mod_server_socket_close\n"); |
| | | printf("====net_mod_server_socket_close\n"); |
| | | net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt; |
| | | delete sockt->sockt; |
| | | free(sockt); |
| | |
| | | |
| | | int i, n, recv_size, connfd; |
| | | net_node_t *node; |
| | | void *recv_buf; |
| | | void *recv_buf = NULL; |
| | | |
| | | net_mod_request_head_t request_head = {}; |
| | | |
| | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL || strcmp(node->host, "") == 0 ) { |
| | | // 本地发送 |
| | | shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | strcpy( ret_arr[n_recv_suc].host,""); |
| | | ret_arr[n_recv_suc].port = 0; |
| | | ret_arr[n_recv_suc].key = node->key; |
| | | ret_arr[n_recv_suc].content = recv_buf; |
| | | ret_arr[n_recv_suc].content_length = recv_size; |
| | | n_recv_suc++; |
| | | if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) { |
| | | strcpy( ret_arr[n_recv_suc].host,""); |
| | | ret_arr[n_recv_suc].port = 0; |
| | | ret_arr[n_recv_suc].key = node->key; |
| | | ret_arr[n_recv_suc].content = recv_buf; |
| | | ret_arr[n_recv_suc].content_length = recv_size; |
| | | n_recv_suc++; |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | |
| | | return n_recv_suc; |
| | | |
| | | } |
| | | |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | | |
| | | for(int i =0; i< size; i++) { |
| | | if(arr[i].content != NULL) |
| | | free(arr[i].content); |
| | | } |
| | | free(arr); |
| | | } |
| | | |
| | | |
| | | int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { |
| | | return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); |
| | |
| | | return shmModSocket.get_key(); |
| | | } |
| | | |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | | |
| | | for(int i =0; i< size; i++) { |
| | | free(arr[i].content); |
| | | } |
| | | free(arr); |
| | | } |
| | | |
| | | |
| | | |
| | |
| | | * 创建 |
| | | */ |
| | | void * net_mod_socket_open() { |
| | | printf("=====net_mod_socket_open\n"); |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)malloc(sizeof(net_mod_socket_t)); |
| | | sockt->sockt = new NetModSocket; |
| | | return (void *)sockt; |
| | |
| | | * 关闭 |
| | | */ |
| | | void net_mod_socket_close(void *_socket) { |
| | | printf("====net_mod_socket_close\n"); |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; |
| | | delete sockt->sockt; |
| | | free(sockt); |
| | |
| | | |
| | | |
| | | |
| | | void print_msg(char *head, shm_msg_t &msg) { |
| | | static void print_msg(char *head, shm_msg_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | | } |
| | | |
| | | static pthread_once_t _once_ = PTHREAD_ONCE_INIT; |
| | | static pthread_key_t _tmp_recv_socket_key_; |
| | | |
| | | static void *_server_run_msg_rev(void *_socket); |
| | | |
| | |
| | | static int _shm_close_dgram_socket(shm_socket_t *socket); |
| | | |
| | | static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); |
| | | |
| | | static void _destrory_tmp_recv_socket_(void *tmp_socket); |
| | | static void _create_tmp_recv_socket_key(void); |
| | | |
| | | // 检查key是否已经被使用,是返回0, 否返回1 |
| | | static inline int _shm_socket_check_key(shm_socket_t *socket) { |
| | |
| | | } |
| | | |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { |
| | | |
| | | shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | socket->socket_type = socket_type; |
| | | socket->key = -1; |
| | |
| | | socket->dispatch_thread = 0; |
| | | socket->status = SHM_CONN_CLOSED; |
| | | socket->mutex = SemUtil::get(IPC_PRIVATE, 1); |
| | | logger->debug("shm_open_socket\n"); |
| | | return socket; |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | static int _shm_close_socket(shm_socket_t *socket) { |
| | | |
| | | int ret; |
| | | |
| | | switch (socket->socket_type) { |
| | | case SHM_SOCKET_STREAM: |
| | | ret = _shm_close_stream_socket(socket, true); |
| | |
| | | default: |
| | | break; |
| | | } |
| | | SemUtil::remove(socket->mutex); |
| | | free(socket); |
| | | SemUtil::remove(socket->mutex); |
| | | logger->debug("shm_close_socket\n"); |
| | | return ret; |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | |
| | | // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_)); |
| | | |
| | | return _shm_close_socket(socket);; |
| | | } |
| | | |
| | | int shm_socket_bind(shm_socket_t *socket, int key) { |
| | |
| | | } |
| | | } |
| | | |
| | | static pthread_once_t _once_ = PTHREAD_ONCE_INIT; |
| | | static pthread_key_t _tmp_recv_socket_key_; |
| | | |
| | | /* Free thread-specific data buffer */ |
| | | static void _destrory_tmp_recv_socket_(void *tmp_socket) |
| | | { |
| | | int rv; |
| | | if(tmp_socket == NULL) |
| | | return; |
| | | logger->debug("%d destroy tmp socket\n", pthread_self()); |
| | | shm_close_socket((shm_socket_t *)tmp_socket); |
| | | _shm_close_socket((shm_socket_t *)tmp_socket); |
| | | rv = pthread_setspecific(_tmp_recv_socket_key_, NULL); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | } |
| | | } |
| | | |
| | | /* One-time key creation function */ |
| | |
| | | /* Allocate a unique thread-specific data key and save the address |
| | | of the destructor for thread-specific data buffers */ |
| | | s = pthread_key_create(&_tmp_recv_socket_key_, _destrory_tmp_recv_socket_); |
| | | //s = pthread_key_create(&_tmp_recv_socket_key_, NULL); |
| | | if (s != 0) { |
| | | logger->error(s, "pthread_key_create"); |
| | | abort(); /* dump core and terminate */ |
| | |
| | | logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | |
| | | rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket); |
| | | if (rv != 0) { |
| | | rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | } |
| | |
| | | }Targ; |
| | | |
| | | struct argument_t { |
| | | bool interactive; |
| | | char *fun; |
| | | int port; |
| | | int key; |
| | |
| | | } |
| | | } |
| | | |
| | | void start_net_proxy(int port) { |
| | | void start_net_proxy(argument_t &arg) { |
| | | pthread_t tid; |
| | | printf("Start net proxy\n"); |
| | | void *serverSocket = net_mod_server_socket_open(port); |
| | | void *serverSocket = net_mod_server_socket_open(arg.port); |
| | | |
| | | // 创建一个线程,可以关闭server |
| | | pthread_create(&tid, NULL, proxy_server_handler, serverSocket); |
| | | if(arg.interactive) { |
| | | pthread_create(&tid, NULL, proxy_server_handler, serverSocket); |
| | | } |
| | | |
| | | if(net_mod_server_socket_start(serverSocket) != 0) { |
| | | err_exit(errno, "net_mod_server_socket_start"); |
| | | } |
| | |
| | | |
| | | |
| | | |
| | | void start_bus_server() { |
| | | void start_bus_server(argument_t &arg) { |
| | | printf("Start bus server\n"); |
| | | void * server_socket = bus_server_socket_wrapper_open(); |
| | | pthread_t tid; |
| | | // 创建一个线程,可以关闭bus |
| | | // pthread_create(&tid, NULL, bus_handler, server_socket); |
| | | if(arg.interactive) |
| | | pthread_create(&tid, NULL, bus_handler, server_socket); |
| | | |
| | | if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { |
| | | printf("start bus failed\n"); |
| | | exit(1); |
| | |
| | | //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11 |
| | | net_node_t *node_arr; |
| | | int node_arr_size = parse_node_list(sendlist, &node_arr); |
| | | // print_node_list(node_arr, node_arr_size); |
| | | print_node_list(node_arr, node_arr_size); |
| | | |
| | | //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8 |
| | | net_node_t *pub_node_arr; |
| | | int pub_node_arr_size = parse_node_list(publist, &pub_node_arr); |
| | | // print_node_list(pub_node_arr, pub_node_arr_size); |
| | | print_node_list(pub_node_arr, pub_node_arr_size); |
| | | |
| | | while (true) { |
| | | //printf("Usage: pub <topic> [content] or sub <topic>\n"); |
| | |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_proxy(opt.port); |
| | | start_net_proxy(opt); |
| | | |
| | | } |
| | | else if (strcmp("start_bus_server", opt.fun) == 0) { |
| | | |
| | | start_bus_server(); |
| | | start_bus_server(opt); |
| | | } |
| | | else if (strcmp("start_reply", opt.fun) == 0) { |
| | | if(opt.key == 0) { |
| | |
| | | argument_t mopt = {}; |
| | | |
| | | // mopt.volume_list_size = 0; |
| | | mopt.interactive = false; |
| | | |
| | | opterr = 0; |
| | | |
| | |
| | | {"fun", required_argument, 0, 'f'}, |
| | | {"key", required_argument, 0, 'k'}, |
| | | {"port", required_argument, 0, 'p'}, |
| | | {"interactive", no_argument, 0, 'i'}, |
| | | {"sendlist", required_argument, (int *)mopt.sendlist, 0}, |
| | | {"publist", required_argument, (int *)mopt.publist, 0}, |
| | | {0, 0, 0, 0} |
| | |
| | | { |
| | | |
| | | |
| | | c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index); |
| | | c = getopt_long (argc, argv, "+f:k:p:i", long_options, &option_index); |
| | | |
| | | /* Detect the end of the options. */ |
| | | if (c == -1) |
| | |
| | | |
| | | case 'k': |
| | | mopt.key = atoi(optarg); |
| | | break; |
| | | |
| | | case 'i': |
| | | mopt.interactive = true; |
| | | break; |
| | | |
| | | case 'p': |
| | |
| | | net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t)); |
| | | for(i = 0; i < entry_arr_len; i++) { |
| | | property_arr_len = str_split(entry_arr[i], ":", &property_arr); |
| | | // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); |
| | | printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); |
| | | |
| | | node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; |
| | | |
| | | free(property_arr[1]); |
| | |
| | | void print_node_list(net_node_t *node_arr, int len) { |
| | | printf("============node list begin==========\n"); |
| | | for(int i = 0; i < len; i++) { |
| | | printf("%s,%d,%d,\n", node_arr[i].host, node_arr[i].port, node_arr[i].key); |
| | | printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key); |
| | | } |
| | | printf("============node list end==========\n"); |
| | | } |