| | |
| | | { |
| | | init_req_rep_req_resp_pool(); |
| | | |
| | | if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal"); |
| | | if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal"); |
| | | } |
| | | |
| | | |
| | |
| | | for (auto map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); map_iter++) { |
| | | clientfd = map_iter->second; |
| | | Close(clientfd); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | net_node_t *node; |
| | | void *recv_buf; |
| | | int timeout = 5 * 1000; |
| | | net_mod_request_head_t request_head = {}; |
| | | |
| | | int n_req = 0, n_recv_suc = 0, n_resp; |
| | | |
| | |
| | | continue; |
| | | } |
| | | |
| | | // printf("write_request %s:%d\n", node->host, node->port); |
| | | if(write_request(connfd, node->key, send_buf, send_size) != 0) { |
| | | |
| | | request_head.mod = REQ_REP; |
| | | memcpy(request_head.host, node->host, sizeof(request_head.host)); |
| | | request_head.port = node->port; |
| | | request_head.key = node->key; |
| | | request_head.content_length = send_size; |
| | | |
| | | |
| | | printf("write_request %s:%d\n", request_head.host, request_head.port); |
| | | if(write_request(connfd, request_head, send_buf, send_size) != 0) { |
| | | LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port); |
| | | close_connect(connfd); |
| | | // req_resp_pool.conns[i].fd = -1; |
| | |
| | | |
| | | } |
| | | |
| | | int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) { |
| | | net_mod_request_head_t request_head = {}; |
| | | int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, void *send_buf, int send_size) { |
| | | |
| | | int buf_size; |
| | | char *buf; |
| | | int max_buf_size; |
| | |
| | | } |
| | | } |
| | | |
| | | request_head.mod = REQ_REP; |
| | | request_head.key = key; |
| | | request_head.content_length = send_size; |
| | | request_head.topic_length = 0; |
| | | |
| | | // optval = 1; |
| | | // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); |
| | | memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH); |
| | | memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, send_buf, send_size); |
| | | |
| | | |
| | | if(rio_writen(clientfd, buf, buf_size) != buf_size ) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request rio_writen"); |
| | |
| | | } |
| | | |
| | | response_head = NetModSocket::decode_response_head(response_head_bs); |
| | | printf(">>>> read_response %s\n", response_head.host); |
| | | if(response_head.code != 0) { |
| | | // 对方没有对应的key |
| | | return 1; |
| | |
| | | uint32_t topic_length; |
| | | */ |
| | | |
| | | void * NetModSocket::encode_request_head(net_mod_request_head_t & request) { |
| | | void * NetModSocket::encode_request_head(net_mod_request_head_t & head) { |
| | | void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH); |
| | | char *tmp_ptr = (char *)headbs; |
| | | PUT(tmp_ptr, htonl(request.mod)); |
| | | |
| | | PUT(tmp_ptr, htonl(head.mod)); |
| | | |
| | | tmp_ptr += 4; |
| | | memcpy(tmp_ptr, request.host, NI_MAXHOST); |
| | | memcpy(tmp_ptr, head.host, sizeof(head.host)); |
| | | |
| | | tmp_ptr += NI_MAXHOST; |
| | | PUT(tmp_ptr, htonl(request.port)); |
| | | tmp_ptr += sizeof(head.host); |
| | | PUT(tmp_ptr, htonl(head.port)); |
| | | |
| | | tmp_ptr += 4; |
| | | PUT(tmp_ptr, htonl(request.key)); |
| | | PUT(tmp_ptr, htonl(head.key)); |
| | | |
| | | tmp_ptr += 4; |
| | | PUT(tmp_ptr, htonl(request.content_length)); |
| | | PUT(tmp_ptr, htonl(head.content_length)); |
| | | |
| | | tmp_ptr += 4; |
| | | PUT(tmp_ptr, htonl(request.topic_length)); |
| | | PUT(tmp_ptr, htonl(head.topic_length)); |
| | | |
| | | |
| | | return headbs; |
| | |
| | | |
| | | head.mod = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += NI_MAXHOST; |
| | | memcpy(head.host, tmp_ptr, NI_MAXHOST); |
| | | |
| | | tmp_ptr += 4; |
| | | memcpy(head.host, tmp_ptr, sizeof(head.host)); |
| | | |
| | | |
| | | tmp_ptr += sizeof(head.host); |
| | | head.port = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |