From 8c42a659c0cc9178d1f1305acb41dfbf4a8697ef Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 22 十月 2020 16:20:26 +0800
Subject: [PATCH] update
---
src/socket/net_mod_socket.c | 75 ++++++++++++++++++++-----------------
1 files changed, 40 insertions(+), 35 deletions(-)
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index df4e4a2..9058272 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -11,7 +11,7 @@
{
init_req_rep_req_resp_pool();
- if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal");
+ if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal");
}
@@ -20,9 +20,7 @@
for (auto map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); map_iter++) {
clientfd = map_iter->second;
Close(clientfd);
-
}
-
}
@@ -125,6 +123,7 @@
net_node_t *node;
void *recv_buf;
int timeout = 5 * 1000;
+ net_mod_request_head_t request_head = {};
int n_req = 0, n_recv_suc = 0, n_resp;
@@ -151,8 +150,16 @@
continue;
}
-// printf("write_request %s:%d\n", node->host, node->port);
- if(write_request(connfd, node->key, send_buf, send_size) != 0) {
+
+ request_head.mod = REQ_REP;
+ memcpy(request_head.host, node->host, sizeof(request_head.host));
+ request_head.port = node->port;
+ request_head.key = node->key;
+ request_head.content_length = send_size;
+
+
+ // printf("write_request %s:%d\n", request_head.host, request_head.port);
+ if(write_request(connfd, request_head, send_buf, send_size) != 0) {
LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
close_connect(connfd);
// req_resp_pool.conns[i].fd = -1;
@@ -233,8 +240,8 @@
}
-int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) {
- net_mod_request_head_t request_head = {};
+int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, void *send_buf, int send_size) {
+
int buf_size;
char *buf;
int max_buf_size;
@@ -256,16 +263,8 @@
}
}
- request_head.mod = REQ_REP;
- request_head.key = key;
- request_head.content_length = send_size;
- request_head.topic_length = 0;
-
- // optval = 1;
- // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, send_buf, send_size);
-
if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request rio_writen");
@@ -293,6 +292,7 @@
}
response_head = NetModSocket::decode_response_head(response_head_bs);
+// printf(">>>> read_response %s\n", response_head.host);
if(response_head.code != 0) {
// 瀵规柟娌℃湁瀵瑰簲鐨刱ey
return 1;
@@ -447,22 +447,24 @@
int max_buf_size, buf_size;
net_mod_request_head_t request_head;
-
char portstr[32];
+ int nsuc = 0;
- buf = (char *)malloc(MAXBUF);
- max_buf_size = MAXBUF;
- if(buf == NULL) {
+ if((buf = (char *)malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
exit(1);
+ } else {
+ max_buf_size = MAXBUF;
}
- int nsuc = 0;
+
for (i = 0; i< arrlen; i++) {
node = &node_arr[i];
if(node->host == NULL) {
// 鏈湴鍙戦��
- shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+ if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
+ nsuc++;
+ }
} else {
sprintf(portstr, "%d", node->port);
@@ -493,12 +495,13 @@
if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
- close(clientfd);
- continue;
+ } else {
+ nsuc++;
}
+ close(clientfd);
}
- nsuc++;
+
}
free(buf);
@@ -522,25 +525,26 @@
uint32_t topic_length;
*/
-void * NetModSocket::encode_request_head(net_mod_request_head_t & request) {
+void * NetModSocket::encode_request_head(net_mod_request_head_t & head) {
void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH);
char *tmp_ptr = (char *)headbs;
- PUT(tmp_ptr, htonl(request.mod));
+
+ PUT(tmp_ptr, htonl(head.mod));
tmp_ptr += 4;
- memcpy(tmp_ptr, request.host, NI_MAXHOST);
+ memcpy(tmp_ptr, head.host, sizeof(head.host));
- tmp_ptr += NI_MAXHOST;
- PUT(tmp_ptr, htonl(request.port));
+ tmp_ptr += sizeof(head.host);
+ PUT(tmp_ptr, htonl(head.port));
tmp_ptr += 4;
- PUT(tmp_ptr, htonl(request.key));
+ PUT(tmp_ptr, htonl(head.key));
tmp_ptr += 4;
- PUT(tmp_ptr, htonl(request.content_length));
+ PUT(tmp_ptr, htonl(head.content_length));
tmp_ptr += 4;
- PUT(tmp_ptr, htonl(request.topic_length));
+ PUT(tmp_ptr, htonl(head.topic_length));
return headbs;
@@ -552,10 +556,11 @@
head.mod = ntohl(GET(tmp_ptr));
- tmp_ptr += NI_MAXHOST;
- memcpy(head.host, tmp_ptr, NI_MAXHOST);
-
tmp_ptr += 4;
+ memcpy(head.host, tmp_ptr, sizeof(head.host));
+
+
+ tmp_ptr += sizeof(head.host);
head.port = ntohl(GET(tmp_ptr));
tmp_ptr += 4;
--
Gitblit v1.8.0