From 95349b79a5a646736c706fe19645181146ee9486 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 20 十月 2020 16:29:59 +0800
Subject: [PATCH] update
---
src/socket/net_mod_socket.c | 318 +++++++++++++++++++++++++--------------
lib/libusgcommon.a | 0
test_net_socket/test_net_mod_socket.c | 16 +
.gdbinit | 0
src/socket/net_mod_socket.h | 27 +-
test_net_socket/net_mod_socket.sh | 10 +
src/common | 1
test_socket/Makefile | 5
include/usgcommon/logger.h | 2
src/Makefile | 23 +-
test_net_socket/Makefile | 8
src/socket/net_mod_server_socket.c | 44 +++-
12 files changed, 284 insertions(+), 170 deletions(-)
diff --git a/.gdbinit b/.gdbinit
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/.gdbinit
diff --git a/include/usgcommon/logger.h b/include/usgcommon/logger.h
index a735086..52859ce 100644
--- a/include/usgcommon/logger.h
+++ b/include/usgcommon/logger.h
@@ -22,7 +22,7 @@
struct timeval tv;
struct tm *info;
gettimeofday(&tv, NULL);
- info = localtime(&tv.tv_sec);
+ info = localtime(&(tv.tv_sec));
strftime(buf, MAXBUF - 1, "%Y-%d-%m %H:%M:%S", info);
snprintf(buf + strlen(buf), MAXBUF - strlen(buf) - 1, ",%ld [%s] ", tv.tv_usec, strlevel(level));
vsnprintf(buf + strlen(buf), MAXBUF - strlen(buf) - 1, fmt, ap);
diff --git a/lib/libusgcommon.a b/lib/libusgcommon.a
index 20f0e45..6236f22 100644
--- a/lib/libusgcommon.a
+++ b/lib/libusgcommon.a
Binary files differ
diff --git a/src/Makefile b/src/Makefile
index 9ac1f43..4dc86a4 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -7,14 +7,16 @@
PREFIX = $(DEST)
-LIBSQUEUE = libshm_queue.a
-DLIBSQUEUE = libshm_queue.so
+LIBSQUEUE = $(DEST)/lib/libshm_queue.a
+DLIBSQUEUE = $(DEST)/lib/libshm_queue.so
# 寮�婧愬伐鍏峰寘
-LDLIBS += -lusgcommon
+#LDLIBS += -lusgcommon
-INCLUDES += -I./queue -I./socket -I$(ROOT)/include/usgcommon
+#-I$(ROOT)/include/usgcommon
+
+INCLUDES += -I./queue -I./socket -I./common/include
SOURCES := $(wildcard *.c ./**/*.c)
OBJS = $(patsubst %.c, $(DEST)/%.o, $(SOURCES))
@@ -38,15 +40,16 @@
.PHONY: build
build: prebuild $(MYLIBS)
- mkdir -p $(DEST)/lib
- cp $(MYLIBS) $(DEST)/lib
- mkdir -p $(DEST)/include/shmqueue
- cp ./*.h ./queue/*.h ./socket/*.h $(DEST)/include/shmqueue
- cp $(ROOT)/lib/* $(DEST)/lib
+ cp $(ROOT)/.gdbinit $(DEST)
+ # mkdir -p $(DEST)/lib
+ # cp $(MYLIBS) $(DEST)/lib
+ # mkdir -p $(DEST)/include/shmqueue
+ # cp ./*.h ./queue/*.h ./socket/*.h $(DEST)/include/shmqueue
+ # cp $(ROOT)/lib/* $(DEST)/lib
.PHONY: prebuild
prebuild:
- @test -d $(DEST) || mkdir $(DEST)
+ @test -d $(DEST)/lib || mkdir -p $(DEST)/lib
#static lib
$(LIBSQUEUE): $(OBJS)
diff --git a/src/common b/src/common
new file mode 120000
index 0000000..eefe618
--- /dev/null
+++ b/src/common
@@ -0,0 +1 @@
+/home/wzq/wk/Basic-Common/common
\ No newline at end of file
diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index 51e10e9..9448a39 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -43,6 +43,7 @@
socklen_t clientlen;
struct sockaddr_storage clientaddr;
char portstr[32];
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal");
//shmModSocket = new ShmModSocket;
sprintf(portstr, "%d", port);
@@ -58,7 +59,7 @@
/* Wait for listening/connected descriptor(s) to become ready */
pool.ready_set = pool.read_set;
pool.nready = select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
-
+ // LoggerFactory::getLogger()->debug("select return \n");
/* If listening descriptor ready, add new client to pool */
if (FD_ISSET(listenfd, &pool.ready_set))
{
@@ -135,11 +136,12 @@
request_head = NetModSocket::decode_request_head(request_head_bs);
if(request_head.content_length > max_buf) {
- buf = realloc(buf, request_head.content_length);
- max_buf = request_head.content_length;
- if(buf == NULL) {
- LoggerFactory::getLogger()->error(errno, "process_client realloc");
+
+ if( (buf = realloc(buf, request_head.content_length)) == NULL) {
+ LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc buf");
exit(1);
+ } else {
+ max_buf = request_head.content_length;
}
}
@@ -148,7 +150,12 @@
}
if(request_head.mod == REQ_REP) {
+// printf("server response===========\n");
+ memcpy(response_head.host, request_head.host, NI_MAXHOST);
+ response_head.port = request_head.port;
+ response_head.key = request_head.key;
if(shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size) != 0) {
+
response_head.code = 1;
response_head.content_length = 0;
if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH )
@@ -160,8 +167,14 @@
response_buf_size = NET_MODE_RESPONSE_HEAD_LENGTH + recv_size;
if(max_response_buf < response_buf_size) {
- buf = (char *)realloc(response_buf, response_buf_size);
- max_response_buf = response_buf_size;
+ if( (response_buf = (char *)realloc(response_buf, response_buf_size)) == NULL ) {
+ LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc response_buf");
+ exit(1);
+ } else {
+ max_response_buf = response_buf_size;
+ }
+
+
}
memcpy(response_buf, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
memcpy(response_buf + NET_MODE_RESPONSE_HEAD_LENGTH, recv_buf, recv_size);
@@ -176,11 +189,11 @@
} else if(request_head.mod == BUS) {
if(request_head.topic_length > max_topic_buf) {
- topic_buf = realloc(topic_buf, request_head.topic_length);
- max_topic_buf = request_head.topic_length;
- if(topic_buf == NULL) {
- LoggerFactory::getLogger()->error(errno, "process_client realloc");
+ if( (topic_buf = realloc(topic_buf, request_head.topic_length)) == NULL ) {
+ LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc topic_buf");
exit(1);
+ } else {
+ max_topic_buf = request_head.topic_length;
}
}
@@ -200,7 +213,7 @@
{
int i, connfd;
//rio_t *rio;
-
+ Logger * logger = LoggerFactory::getLogger();
for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
{
@@ -211,10 +224,13 @@
{
pool.nready--;
if(process_client(connfd) != 0) {
- LoggerFactory::getLogger()->debug("===server close client %d\n", connfd);
- Close(connfd); //line:conc:echoservers:closeconnfd
+
+
+ Close(connfd);
FD_CLR(connfd, &pool.read_set);
pool.clientfd[i] = -1;
+ logger->debug("===server close client %d\n", connfd);
+ // printf("===server close client %d\n", connfd);
}
}
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index 560f8be..df4e4a2 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -11,7 +11,7 @@
{
init_req_rep_req_resp_pool();
-
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err_msg(errno, "signal");
}
@@ -31,13 +31,11 @@
/* Initially, there are no connected descriptors */
int i;
req_resp_pool.maxi = -1; //line:conc:echoservers:beginempty
- for (i = 0; i < FD_SETSIZE; i++)
- req_resp_pool.connfd[i] = -1; //line:conc:echoservers:endempty
+ for (i = 0; i < OPEN_MAX; i++) {
+ req_resp_pool.conns[i].fd = -1;
+ req_resp_pool.conns[i].events = 0;
+ }
- /* Initially, listenfd is only member of select read set */
- FD_ZERO(&req_resp_pool.read_set);
- FD_ZERO(&req_resp_pool.write_set);
- FD_ZERO(&req_resp_pool.except_set);
}
int NetModSocket::connect( net_node_t *node) {
@@ -48,68 +46,75 @@
char portstr[32];
sprintf(mapKey, "%s:%d", node->host, node->port);
- if( ( mapIter = req_resp_pool.connectionMap.find(mapKey)) != req_resp_pool.connectionMap.end()) {
+ mapIter = req_resp_pool.connectionMap.find(mapKey);
+ if( mapIter != req_resp_pool.connectionMap.end()) {
connfd = mapIter->second;
-
+// printf("hit: %s\n", mapKey);
} else {
-
+// printf("mis: %s\n", mapKey);
sprintf(portstr, "%d", node->port);
+// printf("open before: %s\n", mapKey);
connfd = open_clientfd(node->host, portstr);
+// printf("open after: %s\n", mapKey);
if(connfd < 0) {
+ LoggerFactory::getLogger()->error(errno, "connect %s:%d ", node->host, node->port);
return -1;
}
req_resp_pool.connectionMap.insert({mapKey, connfd});
}
- for (i = 0; i < FD_SETSIZE; i++) { /* Find an available slot */
- if (req_resp_pool.connfd[i] < 0)
+ for (i = 0; i < OPEN_MAX; i++) { /* Find an available slot */
+ if (req_resp_pool.conns[i].fd < 0)
{
/* Add connected descriptor to the req_resp_pool */
- req_resp_pool.connfd[i] = connfd;
- req_resp_pool.connfdNodeMap.insert({connfd, node});
- // Rio_readinitb(&req_resp_pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
-
+ req_resp_pool.conns[i].fd = connfd;
+
+ req_resp_pool.conns[i].events = POLLIN;
/* Add the descriptor to descriptor set */
- FD_SET(connfd, &req_resp_pool.read_set); //line:conc:echoservers:addconnfd
- FD_SET(connfd, &req_resp_pool.write_set);
- FD_SET(connfd, &req_resp_pool.except_set);
- /* Update max descriptor and req_resp_pool highwater mark */
- if (connfd > req_resp_pool.maxfd)
- req_resp_pool.maxfd = connfd;
- if (i > req_resp_pool.maxi)
- req_resp_pool.maxi = i;
break;
}
}
- if (i == FD_SETSIZE) {
+ if (i > req_resp_pool.maxi)
+ req_resp_pool.maxi = i;
+
+ if (i == OPEN_MAX) {
/* Couldn't find an empty slot */
LoggerFactory::getLogger()->error(errno, "add_client error: Too many clients");
return -1;
-
}
+
return connfd;
}
void NetModSocket::close_connect(int connfd) {
-
- net_node_t *node = req_resp_pool.connfdNodeMap.find(connfd)->second;
-
- // std::map<std::string, int>::iterator mapIter;
- if(close(connfd) != 0) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect");
- }
- FD_CLR(connfd, &req_resp_pool.read_set);
- FD_CLR(connfd, &req_resp_pool.write_set);
- FD_CLR(connfd, &req_resp_pool.except_set);
-
+ int i;
char mapKey[256];
- // char portstr[32];
- sprintf(mapKey, "%s:%d", node->host, node->port);
- req_resp_pool.connectionMap.erase(mapKey);
-// LoggerFactory::getLogger()->debug("close_connect");
+ std::map<std::string, int>::iterator map_iter;
+ if(close(connfd) != 0) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect close");
+ }
+
+
+ for (i = 0; i <= req_resp_pool.maxi; i++) {
+ if(req_resp_pool.conns[i].fd == connfd) {
+ req_resp_pool.conns[i].fd = -1;
+ }
+ }
+
+ for ( map_iter = req_resp_pool.connectionMap.begin(); map_iter != req_resp_pool.connectionMap.end(); ) {
+ if(connfd == map_iter->second) {
+// std::cout << "map_iter->first==" << map_iter->first << std::endl;
+ map_iter = req_resp_pool.connectionMap.erase(map_iter);
+ } else {
+ ++map_iter;
+ }
+ }
+
+ LoggerFactory::getLogger()->debug( "closed %d\n", connfd);
+
}
@@ -119,9 +124,9 @@
int i, n, recv_size, connfd;
net_node_t *node;
void *recv_buf;
- struct timeval timeout = {5, 0};
+ int timeout = 5 * 1000;
- int n_conn_suc = 0, n_recv_suc = 0;
+ int n_req = 0, n_recv_suc = 0, n_resp;
net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
@@ -146,71 +151,78 @@
continue;
}
- n_conn_suc++;
+// printf("write_request %s:%d\n", node->host, node->port);
+ if(write_request(connfd, node->key, send_buf, send_size) != 0) {
+ LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
+ close_connect(connfd);
+ // req_resp_pool.conns[i].fd = -1;
+ } else {
+ n_req++;
+ }
+
}
-// printf("n_conn_suc =%d\n", n_conn_suc);
+// printf(" req_resp_pool.maxi = %d\n", req_resp_pool.maxi);
+// printf(" n_req = %d\n", n_req);
- while(n_recv_suc < n_conn_suc)
+// int tmp = 0;
+ while(n_resp < n_req)
{
+// printf(" while %d\n", tmp++);
/* Wait for listening/connected descriptor(s) to become ready */
- req_resp_pool.ready_read_set = req_resp_pool.read_set;
- req_resp_pool.ready_write_set = req_resp_pool.write_set;
- req_resp_pool.ready_except_set = req_resp_pool.except_set;
- if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1,
- &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set,
- &req_resp_pool.ready_except_set, &timeout)) <= 0) {
- // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
+ if( (req_resp_pool.nready = poll(req_resp_pool.conns, req_resp_pool.maxi + 1, timeout) ) <= 0) {
+ // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
break;
}
// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready);
for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) {
- if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
+ if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
/* If the descriptor is ready, echo a text line from it */
- node = req_resp_pool.connfdNodeMap.find(connfd)->second;
- if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set))
+ if (req_resp_pool.conns[i].revents & POLLIN )
{
req_resp_pool.nready--;
+// printf("POLLIN %d\n", connfd);
if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
// 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
- FD_CLR(connfd, &req_resp_pool.read_set);
- req_resp_pool.connfd[i] = -1;
+ req_resp_pool.conns[i].fd = -1;
n_recv_suc++;
+
} else if(n == -1) {
+ req_resp_pool.conns[i].fd = -1;
close_connect(connfd);
+ } else {
+ req_resp_pool.conns[i].fd = -1;
+
}
+ n_resp++;
+// printf("read response %d\n", n);
}
- if (FD_ISSET(connfd, &req_resp_pool.ready_write_set))
- {
- req_resp_pool.nready--;
-// printf("write %d\n", connfd);
- if(write_request(connfd, node->key, send_buf, send_size) != 0) {
- close_connect(connfd);
- } else{
- // 涓�娆″啓鍏ュ畬鎴愬悗娓呯┖鍐欏叆浣�
- FD_CLR(connfd, &req_resp_pool.write_set);
- }
-
+ if (req_resp_pool.conns[i].revents & POLLOUT ) {
+ // printf("poll POLLOUT %d\n", connfd);
}
- if (FD_ISSET(connfd, &req_resp_pool.ready_except_set))
+
+ if (req_resp_pool.conns[i].revents & (POLLRDHUP | POLLHUP | POLLERR) )
{
+// printf("poll POLLERR %d\n", connfd);
req_resp_pool.nready--;
close_connect(connfd);
+ req_resp_pool.conns[i].fd = -1;
}
}
}
}
- FD_ZERO(&req_resp_pool.except_set);
for (i = 0; i <= req_resp_pool.maxi; i++) {
- if ( (connfd = req_resp_pool.connfd[i]) > 0 ) {
+ if ( (connfd = req_resp_pool.conns[i].fd) > 0 ) {
// 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺�
close_connect(connfd);
+ req_resp_pool.conns[i].fd = -1;
}
}
+
req_resp_pool.maxi = -1;
*recv_arr = ret_arr;
@@ -226,8 +238,7 @@
int buf_size;
char *buf;
int max_buf_size;
- buf = (char *)malloc(MAXBUF);
- if(buf == NULL) {
+ if((buf = (char *)malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc");
exit(1);
} else {
@@ -236,11 +247,12 @@
buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
if(max_buf_size < buf_size) {
- buf = (char *)realloc(buf, buf_size);
- max_buf_size = buf_size;
- if(buf == NULL) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request realloc");
+
+ if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request realloc buf");
exit(1);
+ } else {
+ max_buf_size = buf_size;
}
}
@@ -274,9 +286,8 @@
char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
net_mod_response_head_t response_head;
- net_node_t *node = req_resp_pool.connfdNodeMap.find(connfd)->second;
if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response rio_readnb response_head");
return -1;
}
@@ -293,15 +304,14 @@
exit(1);
}
if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response rio_readnb recv_buf");
return -1;
}
-
- strcpy( recv_msg->host, node->host);
- recv_msg->port = node->port;
- recv_msg->key = node->key;
+ strcpy( recv_msg->host, response_head.host);
+ recv_msg->port = response_head.port;
+ recv_msg->key = response_head.key;
recv_msg->content = recv_buf;
recv_msg->content_length = recv_size;
return 0;
@@ -349,8 +359,12 @@
buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
if(max_buf_size < buf_size) {
- buf = (char *)realloc(buf, buf_size);
- max_buf_size = buf_size;
+ if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf");
+ } else {
+ max_buf_size = buf_size;
+ }
+
}
@@ -366,7 +380,7 @@
if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe rio_writen buf");
close(clientfd);
continue;
@@ -375,7 +389,7 @@
// setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe rio_readnb response_head_bs");
close(clientfd);
continue;
@@ -392,7 +406,7 @@
exit(1);
}
if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) != response_head.content_length) {
- LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb");
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe rio_readnb recv_buf");
close(clientfd);
continue;
@@ -465,8 +479,12 @@
buf_size = NET_MODE_REQUEST_HEAD_LENGTH + content_size + request_head.topic_length;
if(max_buf_size < buf_size) {
- buf = (char *)realloc(buf, buf_size);
- max_buf_size = buf_size;
+ if( ( buf = (char *)realloc(buf, buf_size)) == NULL) {
+ LoggerFactory::getLogger()->error(errno, "NetModSocket::pub realloc buf ");
+ } else {
+ max_buf_size = buf_size;
+ }
+
}
memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
@@ -495,42 +513,108 @@
free(arr);
}
-// ssize_t recv(void *buf, size_t len) {
-
-// return rio_readlineb(&rio, buf, MAXLINE);
-
-// }
+/**
+ uint32_t mod;
+ char host[NI_MAXHOST];
+ uint32_t port;
+ uint32_t key;
+ uint32_t content_length;
+ uint32_t topic_length;
+*/
void * NetModSocket::encode_request_head(net_mod_request_head_t & request) {
- char * head = (char *)malloc(NET_MODE_REQUEST_HEAD_LENGTH);
- PUT(head, htonl(request.mod));
- PUT(head + 4, htonl(request.key));
- PUT(head + 8, htonl(request.content_length));
- PUT(head + 12, htonl(request.topic_length));
- return head;
+ void * headbs = malloc(NET_MODE_REQUEST_HEAD_LENGTH);
+ char *tmp_ptr = (char *)headbs;
+ PUT(tmp_ptr, htonl(request.mod));
+
+ tmp_ptr += 4;
+ memcpy(tmp_ptr, request.host, NI_MAXHOST);
+
+ tmp_ptr += NI_MAXHOST;
+ PUT(tmp_ptr, htonl(request.port));
+
+ tmp_ptr += 4;
+ PUT(tmp_ptr, htonl(request.key));
+
+ tmp_ptr += 4;
+ PUT(tmp_ptr, htonl(request.content_length));
+
+ tmp_ptr += 4;
+ PUT(tmp_ptr, htonl(request.topic_length));
+
+
+ return headbs;
}
-net_mod_request_head_t NetModSocket::decode_request_head(void *_headbs) {
- char *headbs = (char *)_headbs;
+net_mod_request_head_t NetModSocket::decode_request_head(void *headbs) {
+ char *tmp_ptr = (char *)headbs;
net_mod_request_head_t head;
- head.mod = ntohl(GET(headbs));
- head.key = ntohl(GET(headbs + 4));
- head.content_length = ntohl(GET(headbs + 8));
- head.topic_length = ntohl(GET(headbs + 12));
+
+ head.mod = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += NI_MAXHOST;
+ memcpy(head.host, tmp_ptr, NI_MAXHOST);
+
+ tmp_ptr += 4;
+ head.port = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.key = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.content_length = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.topic_length = ntohl(GET(tmp_ptr));
+
return head;
}
+/**
+ char host[NI_MAXHOST];
+ uint32_t port;
+ uint32_t key;
+ uint32_t content_length;
+ uint32_t code;
+ */
void * NetModSocket::encode_response_head(net_mod_response_head_t & response) {
- char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
- PUT(head, htonl(response.code));
- PUT(head + 4, htonl(response.content_length));
- return head;
+ void * headbs = malloc(NET_MODE_RESPONSE_HEAD_LENGTH);
+ char *tmp_ptr = (char *)headbs;
+
+ memcpy(tmp_ptr, response.host, NI_MAXHOST);
+
+ tmp_ptr += NI_MAXHOST;
+ PUT(tmp_ptr, htonl(response.port));
+
+ tmp_ptr += 4;
+ PUT(tmp_ptr , htonl(response.key));
+
+ tmp_ptr += 4;
+ PUT(tmp_ptr, htonl(response.content_length));
+
+ tmp_ptr += 4;
+ PUT(tmp_ptr, htonl(response.code));
+
+ return headbs;
}
-net_mod_response_head_t NetModSocket::decode_response_head(void *_headbs) {
- char *headbs = (char *)_headbs;
+net_mod_response_head_t NetModSocket::decode_response_head(void *headbs) {
+ char *tmp_ptr = (char *)headbs;
net_mod_response_head_t head;
- head.code = ntohl(GET(headbs));
- head.content_length = ntohl(GET(headbs + 4));
+
+ memcpy(head.host, tmp_ptr, NI_MAXHOST);
+
+ tmp_ptr += NI_MAXHOST;
+ head.port = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.key = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.content_length = ntohl(GET(tmp_ptr));
+
+ tmp_ptr += 4;
+ head.code = ntohl(GET(tmp_ptr));
+
return head;
}
diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h
index c133302..4e4394c 100644
--- a/src/socket/net_mod_socket.h
+++ b/src/socket/net_mod_socket.h
@@ -3,7 +3,9 @@
#include "usg_common.h"
#include "shm_mod_socket.h"
#include "socket_io.h"
+#include <poll.h>
+#define OPEN_MAX 1024
#define GET(p) (*(uint32_t *)(p))
#define PUT(p, val) (*(uint32_t *)(p) = (val))
@@ -20,20 +22,24 @@
int key;
};
-#define NET_MODE_REQUEST_HEAD_LENGTH 16
+#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 5 * sizeof(uint32_t))
struct net_mod_request_head_t {
uint32_t mod;
+ char host[NI_MAXHOST];
+ uint32_t port;
uint32_t key;
uint32_t content_length;
uint32_t topic_length;
};
-#define NET_MODE_RESPONSE_HEAD_LENGTH 8
+#define NET_MODE_RESPONSE_HEAD_LENGTH (NI_MAXHOST + 4 * sizeof(uint32_t))
struct net_mod_response_head_t {
// socket_mod_t mod;
- // int key;
+ char host[NI_MAXHOST];
+ uint32_t port;
+ uint32_t key;
uint32_t code;
uint32_t content_length;
};
@@ -41,7 +47,7 @@
struct net_mod_recv_msg_t
{
- char host[128];
+ char host[NI_MAXHOST];
int port;
int key;
void *content;
@@ -51,22 +57,13 @@
class NetModSocket {
struct pool{ /* Represents a pool of connected descriptors */ //line:conc:echoservers:beginpool
- int maxfd; /* Largest descriptor in read_set */
- fd_set read_set; /* Set of all active descriptors */
- fd_set ready_read_set; /* Subset of descriptors ready for reading */
-
- fd_set write_set;
- fd_set ready_write_set;
-
- fd_set except_set;
- fd_set ready_except_set;
int nready; /* Number of ready descriptors from select */
int maxi; /* Highwater index into client array */
- int connfd[FD_SETSIZE]; /* Set of active descriptors */
+ struct pollfd conns[OPEN_MAX];
// net_node_t *nodes[FD_SETSIZE];
// rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
- std::map<int, net_node_t*> connfdNodeMap;
+ // std::map<int, net_node_t*> connfdNodeMap;
std::map<std::string, int> connectionMap;
} ;
diff --git a/test_net_socket/Makefile b/test_net_socket/Makefile
index 12ece03..01ccd9f 100644
--- a/test_net_socket/Makefile
+++ b/test_net_socket/Makefile
@@ -7,13 +7,15 @@
# 寮�婧愬伐鍏峰寘璺緞
LDDIR += -L${DEST}/lib
+#-lusgcommon
# 寮�婧愬伐鍏峰寘
-LDLIBS += -lshm_queue -lusgcommon -lpthread
+LDLIBS += -lshm_queue -lpthread
-INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon
+#-I$(ROOT)/include/usgcommon
+INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include
-PROGS = ${DEST}/net_mod_socket
+PROGS = ${DEST}/test_net_mod_socket
DEPENDENCES = $(patsubst %, %.d, $(PROGS))
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index fdd8a5b..ea30ac3 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -9,12 +9,12 @@
./dgram_mod_bus server 8 & server_pid=$! && echo ${server_pid}
# 寮�鍚綉缁渟erver
- ./net_mod_socket server 5000 & server_pid=$! && echo ${server_pid}
+ ./test_net_mod_socket server 5000 & server_pid=$! && echo ${server_pid}
}
function client() {
- ./net_mod_socket client 5000
+ ./test_net_mod_socket client 5000
}
function close() {
@@ -22,6 +22,12 @@
ipcrm -a
}
+function scp() {
+
+ scp -P 100 -rp ../build basic@192.168.5.22:/data/disk2/test
+ scp -rp ../build basic@192.168.20.10:/data3/workspace/wzq
+}
+
case ${1} in
"server")
close
diff --git a/test_net_socket/net_mod_socket.c b/test_net_socket/test_net_mod_socket.c
similarity index 94%
rename from test_net_socket/net_mod_socket.c
rename to test_net_socket/test_net_mod_socket.c
index a763c5e..a65cd20 100644
--- a/test_net_socket/net_mod_socket.c
+++ b/test_net_socket/test_net_mod_socket.c
@@ -24,13 +24,14 @@
char topic[512];
int recv_arr_size, i, n;
- int node_arr_size = 3;
+
net_mod_recv_msg_t *recv_arr;
//192.168.20.104
+ int node_arr_size = 2;
net_node_t node_arr[] = {
- {"192.168.5.22", port, 11},
- {"192.168.20.104", port, 21},
+ // {"192.168.5.22", port, 11},
+ {"192.168.20.10", port, 11},
{"192.168.20.104", port, 11}
};
@@ -98,10 +99,12 @@
int i,j, n, recv_arr_size;
net_mod_recv_msg_t *recv_arr;
- int node_arr_size = 1;
+ int node_arr_size = 2;
//192.168.20.104
net_node_t node_arr[] = {
- {NULL, port, 11}
+ // {"192.168.5.22", port, 11},
+ {"192.168.20.10", port, 11},
+ {"192.168.20.104", port, 11}
};
void * client = net_mod_socket_open();
@@ -137,7 +140,7 @@
void mclient(int port) {
- int status, i = 0, processors = 4;
+ int status, i = 0, processors = 2;
void *res[processors];
// Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
Targ targs[processors];
@@ -146,6 +149,7 @@
struct timeval start, end;
long total = 0;
+printf("寮�濮嬫祴璇�...\n");
gettimeofday(&start, NULL);
for (i = 0; i < processors; i++) {
targs[i].port = port;
diff --git a/test_socket/Makefile b/test_socket/Makefile
index be0fa26..22a401f 100644
--- a/test_socket/Makefile
+++ b/test_socket/Makefile
@@ -8,9 +8,10 @@
LDDIR += -L${DEST}/lib
# 寮�婧愬伐鍏峰寘
-LDLIBS += -lshm_queue -lusgcommon -lpthread
+#-lusgcommon
+LDLIBS += -lshm_queue -lpthread
-INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon
+INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include
PROGS = ${DEST}/dgram_mod_bus ${DEST}/dgram_mod_survey ${DEST}/dgram_mod_req_rep ${DEST}/test_timeout ${DEST}/test_open_close
--
Gitblit v1.8.0