| | |
| | | |
| | | int i, n, recv_size, connfd; |
| | | net_node_t *node; |
| | | void *recv_buf; |
| | | void *recv_buf = NULL; |
| | | struct timespec timeout; |
| | | int ret; |
| | | int n_req = 0, n_recv_suc = 0, n_resp =0; |
| | | |
| | | net_mod_request_head_t request_head = {}; |
| | | |
| | | int n_req = 0, n_recv_suc = 0, n_resp =0; |
| | | |
| | | |
| | | net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); |
| | | |
| | | int ret; |
| | | |
| | | NetConnPool *mpool; |
| | | |
| | | /* Make first caller allocate key for thread-specific data */ |
| | |
| | | mpool = (NetConnPool *)pthread_getspecific(poolKey); |
| | | if (mpool == NULL) |
| | | { |
| | | /* If first call from this thread, allocate |
| | | buffer for thread, and save its location */ |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | logger->debug("Create connPool"); |
| | | mpool = new NetConnPool(); |
| | | if (mpool == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc"); |
| | |
| | | for (i = 0; i< arrlen; i++) { |
| | | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL) { |
| | | if(node->host == NULL || strcmp(node->host, "") == 0 ) { |
| | | // 本地发送 |
| | | shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | strcpy( ret_arr[n_recv_suc].host,"localshm"); |
| | | ret_arr[n_recv_suc].port = 0; |
| | | ret_arr[n_recv_suc].key = node->key; |
| | | ret_arr[n_recv_suc].content = recv_buf; |
| | | ret_arr[n_recv_suc].content_length = recv_size; |
| | | n_recv_suc++; |
| | | |
| | | if(msec == 0) { |
| | | ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | } else if(msec > 0){ |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); |
| | | } else { |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | } |
| | | if( ret == 0) { |
| | | strcpy( ret_arr[n_recv_suc].host,""); |
| | | ret_arr[n_recv_suc].port = 0; |
| | | ret_arr[n_recv_suc].key = node->key; |
| | | ret_arr[n_recv_suc].content = recv_buf; |
| | | ret_arr[n_recv_suc].content_length = recv_size; |
| | | n_recv_suc++; |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | |
| | | |
| | | mpool->maxi = -1; |
| | | |
| | | *recv_arr = ret_arr; |
| | | if(recv_arr != NULL) { |
| | | *recv_arr = ret_arr; |
| | | } else { |
| | | free_recv_msg_arr(ret_arr, n_recv_suc); |
| | | } |
| | | |
| | | if(recv_arr_size != NULL) { |
| | | *recv_arr_size = n_recv_suc; |
| | | } |
| | | return n_recv_suc; |
| | | |
| | | } |
| | | |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | | |
| | | for(int i =0; i< size; i++) { |
| | | if(arr[i].content != NULL) |
| | | free(arr[i].content); |
| | | } |
| | | free(arr); |
| | | } |
| | | |
| | | |
| | | int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { |
| | | return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); |
| | |
| | | // int pub(char *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, |
| | | int content_size, int timeout) { |
| | | int content_size, int msec) { |
| | | int i, connfd; |
| | | net_node_t *node; |
| | | struct timespec timeout; |
| | | |
| | | net_mod_request_head_t request_head; |
| | | net_mod_recv_msg_t recv_msg; |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | // 本地发送 |
| | | if(node_arr == NULL || arrlen == 0) { |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | } |
| | | if(ret == 0 ) { |
| | | n_pub_suc++; |
| | | } |
| | | } |
| | | |
| | | for (i = 0; i < arrlen; i++) { |
| | | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { |
| | | n_pub_suc++; |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | } |
| | | |
| | | if(ret == 0 ) { |
| | | n_pub_suc++; |
| | | } |
| | | |
| | | |
| | | } else { |
| | | sprintf(portstr, "%d", node->port); |
| | |
| | | request_head.key = node->key; |
| | | request_head.content_length = content_size; |
| | | request_head.topic_length = strlen(topic) + 1; |
| | | request_head.timeout = timeout; |
| | | request_head.timeout = msec; |
| | | |
| | | if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) { |
| | | LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port); |
| | |
| | | while(n_resp < n_req) |
| | | { |
| | | /* Wait for listening/connected descriptor(s) to become ready */ |
| | | if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) { |
| | | if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) { |
| | | // wirite_set 和 read_set 在指定时间内都没准备好 |
| | | break; |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::start_bus() { |
| | | return shmModSocket.start_bus(); |
| | | } |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | |
| | | return shmModSocket.get_key(); |
| | | } |
| | | |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | | |
| | | for(int i =0; i< size; i++) { |
| | | free(arr[i].content); |
| | | } |
| | | free(arr); |
| | | } |
| | | |
| | | |
| | | |