From af85260254bacac40a68d4f5f61950523beb3a27 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 19 十月 2020 17:02:41 +0800 Subject: [PATCH] update --- src/socket/net_mod_socket.c | 98 +++++++++++----- src/socket/shm_mod_socket.h | 5 src/socket/shm_socket.c | 21 +++ src/socket/net_mod_server_socket.h | 8 + src/socket/net_mod_socket.h | 18 ++ Makefile | 2 src/socket/shm_socket.h | 7 + test_net_socket/net_mod_socket.c | 103 ++++++++++++++++ src/socket/net_mod_server_socket_wrapper.h | 2 src/socket/net_mod_server_socket.c | 80 ++++++++++--- src/socket/shm_mod_socket.c | 12 ++ src/socket/net_mod_server_socket_wrapper.c | 4 12 files changed, 298 insertions(+), 62 deletions(-) diff --git a/Makefile b/Makefile index f55f829..d471709 100755 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # debug "make --just-print" -DIRS = src test_net_socket +DIRS = src test_net_socket test_socket TAR_NAME = shm_queue.tar.gz all: diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c index ccc17e2..51e10e9 100644 --- a/src/socket/net_mod_server_socket.c +++ b/src/socket/net_mod_server_socket.c @@ -4,37 +4,55 @@ #include "net_mod_socket_io.h" #include "net_mod_socket.h" -NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256) +NetModServerSocket::NetModServerSocket(int _port): listenfd(0), port(_port), max_buf(1024), max_topic_buf(256), max_response_buf(1024) { - char portstr[32]; - - //shmModSocket = new ShmModSocket; - sprintf(portstr, "%d", port); - listenfd = Open_listenfd(portstr); - init_pool(listenfd); + buf = malloc(max_buf); if(buf == NULL) { - err_exit(errno, "process_client malloc"); + err_exit(errno, "NetModServerSocket::NetModServerSocket malloc"); } topic_buf = malloc(max_topic_buf); if(topic_buf == NULL) { - err_exit(errno, "process_client malloc"); + err_exit(errno, "NetModServerSocket::NetModServerSocket malloc"); + } + + response_buf = (char *) malloc(max_response_buf); + if(response_buf == NULL) { + err_exit(errno, "NetModServerSocket::NetModServerSocket malloc"); } } NetModServerSocket::~NetModServerSocket() { - Close(listenfd); - free(buf); + if(listenfd != 0) { + Close(listenfd); + } + + if(buf != NULL) + free(buf); + if(topic_buf != NULL) free(topic_buf); + if(response_buf != NULL) + free(response_buf); } -void NetModServerSocket::start() { +int NetModServerSocket::start() { int connfd; socklen_t clientlen; struct sockaddr_storage clientaddr; + char portstr[32]; + + //shmModSocket = new ShmModSocket; + sprintf(portstr, "%d", port); + listenfd = open_listenfd(portstr); + if(listenfd < 0) { + LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start"); + return -1; + } + init_pool(listenfd); + while (1) { /* Wait for listening/connected descriptor(s) to become ready */ @@ -52,6 +70,7 @@ /* Echo a text line from each ready connected descriptor */ check_clients(); } + return 0; } void NetModServerSocket::init_pool(int listenfd) @@ -105,8 +124,8 @@ net_mod_response_head_t response_head; char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH]; void *recv_buf; -char tmp[8196]; - int recv_size; +// char tmp[8196]; + int recv_size, response_buf_size; if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { @@ -129,11 +148,32 @@ } if(request_head.mod == REQ_REP) { - // TODO: shmModSocket.sendandrecv_unsafe - shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size); - response_head.content_length = recv_size; - Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH); - Rio_writen(connfd, recv_buf, recv_size); + if(shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size) != 0) { + response_head.code = 1; + response_head.content_length = 0; + if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH ) + return -1; + //Rio_writen(connfd, recv_buf, recv_size); + } else { + response_head.code = 0; + response_head.content_length = recv_size; + + response_buf_size = NET_MODE_RESPONSE_HEAD_LENGTH + recv_size; + if(max_response_buf < response_buf_size) { + buf = (char *)realloc(response_buf, response_buf_size); + max_response_buf = response_buf_size; + } + memcpy(response_buf, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH); + memcpy(response_buf + NET_MODE_RESPONSE_HEAD_LENGTH, recv_buf, recv_size); + + if(rio_writen(connfd, response_buf, response_buf_size) != response_buf_size) { + return -1; + } + + } + return 0; + + } else if(request_head.mod == BUS) { if(request_head.topic_length > max_topic_buf) { topic_buf = realloc(topic_buf, request_head.topic_length); @@ -147,7 +187,7 @@ if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) { return -1; } - LoggerFactory::getLogger()->debug("====server pub %s===\n", buf); +LoggerFactory::getLogger()->debug("====server pub %s===\n", buf); shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key); } diff --git a/src/socket/net_mod_server_socket.h b/src/socket/net_mod_server_socket.h index 40219e1..3f80931 100644 --- a/src/socket/net_mod_server_socket.h +++ b/src/socket/net_mod_server_socket.h @@ -28,13 +28,18 @@ private: int listenfd; + int port; ShmModSocket shmModSocket; pool pool; void *buf; void *topic_buf; + char *response_buf; + size_t max_buf; size_t max_topic_buf; + size_t max_response_buf; + void init_pool(int listenfd); void add_client(int connfd); @@ -47,8 +52,9 @@ /* * 鍚姩 server + * @return 0 success, 鍏朵粬 failture */ - void start(); + int start(); ~NetModServerSocket(); }; diff --git a/src/socket/net_mod_server_socket_wrapper.c b/src/socket/net_mod_server_socket_wrapper.c index 2aed1cf..cdd4c49 100644 --- a/src/socket/net_mod_server_socket_wrapper.c +++ b/src/socket/net_mod_server_socket_wrapper.c @@ -14,7 +14,7 @@ } -void net_mod_server_socket_start(void *_sockt) { +int net_mod_server_socket_start(void *_sockt) { net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt; - sockt->sockt->start(); + return sockt->sockt->start(); } diff --git a/src/socket/net_mod_server_socket_wrapper.h b/src/socket/net_mod_server_socket_wrapper.h index 0bb7571..1a6d143 100644 --- a/src/socket/net_mod_server_socket_wrapper.h +++ b/src/socket/net_mod_server_socket_wrapper.h @@ -25,7 +25,7 @@ /** * 鍚姩 */ -void net_mod_server_socket_start(void *_sockt); +int net_mod_server_socket_start(void *_sockt); diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c index c7b3cc7..560f8be 100644 --- a/src/socket/net_mod_socket.c +++ b/src/socket/net_mod_socket.c @@ -10,6 +10,8 @@ NetModSocket::NetModSocket() { init_req_rep_req_resp_pool(); + + } @@ -20,6 +22,7 @@ Close(clientfd); } + } @@ -95,7 +98,9 @@ net_node_t *node = req_resp_pool.connfdNodeMap.find(connfd)->second; // std::map<std::string, int>::iterator mapIter; - Close(connfd); //line:conc:echoservers:closeconnfd + if(close(connfd) != 0) { + LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect"); + } FD_CLR(connfd, &req_resp_pool.read_set); FD_CLR(connfd, &req_resp_pool.write_set); FD_CLR(connfd, &req_resp_pool.except_set); @@ -104,23 +109,23 @@ // char portstr[32]; sprintf(mapKey, "%s:%d", node->host, node->port); req_resp_pool.connectionMap.erase(mapKey); -LoggerFactory::getLogger()->debug("close_connect"); +// LoggerFactory::getLogger()->debug("close_connect"); } int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { - int i, recv_size, connfd; + int i, n, recv_size, connfd; net_node_t *node; void *recv_buf; - + struct timeval timeout = {5, 0}; int n_conn_suc = 0, n_recv_suc = 0; net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); - init_req_rep_req_resp_pool(); + //init_req_rep_req_resp_pool(); for (i = 0; i< arrlen; i++) { @@ -134,23 +139,17 @@ ret_arr[n_recv_suc].content = recv_buf; ret_arr[n_recv_suc].content_length = recv_size; n_recv_suc++; + continue; } if( (connfd = connect(node)) < 0 ) { continue; } - - // if(write_request(connfd, node->key, send_buf, send_size) != 0) { - // close_connect(connfd); - // } - n_conn_suc++; - // optval = 0; - // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); } -printf("n_conn_suc =%d\n", n_conn_suc); +// printf("n_conn_suc =%d\n", n_conn_suc); while(n_recv_suc < n_conn_suc) { @@ -158,8 +157,13 @@ req_resp_pool.ready_read_set = req_resp_pool.read_set; req_resp_pool.ready_write_set = req_resp_pool.write_set; req_resp_pool.ready_except_set = req_resp_pool.except_set; - req_resp_pool.nready = select(req_resp_pool.maxfd + 1, &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, &req_resp_pool.ready_except_set, NULL); -printf("req_resp_pool.nready =%d\n", req_resp_pool.nready); + if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1, + &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, + &req_resp_pool.ready_except_set, &timeout)) <= 0) { + // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂� + break; + } +// printf("req_resp_pool.nready =%d\n", req_resp_pool.nready); for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) { if ( (connfd = req_resp_pool.connfd[i]) > 0 ) { /* If the descriptor is ready, echo a text line from it */ @@ -167,17 +171,22 @@ if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set)) { req_resp_pool.nready--; - if(read_response(connfd, ret_arr+n_recv_suc) == 0) { + if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) { + + // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅 + FD_CLR(connfd, &req_resp_pool.read_set); + req_resp_pool.connfd[i] = -1; n_recv_suc++; - } else { + } else if(n == -1) { close_connect(connfd); } } + if (FD_ISSET(connfd, &req_resp_pool.ready_write_set)) { req_resp_pool.nready--; -printf("write %d\n", connfd); +// printf("write %d\n", connfd); if(write_request(connfd, node->key, send_buf, send_size) != 0) { close_connect(connfd); } else{ @@ -195,6 +204,15 @@ } } + FD_ZERO(&req_resp_pool.except_set); + for (i = 0; i <= req_resp_pool.maxi; i++) { + if ( (connfd = req_resp_pool.connfd[i]) > 0 ) { + // 鍏抽棴骞舵竻闄ゅ啓鍏ユ垨璇诲彇澶辫触鐨勮繛鎺� + close_connect(connfd); + } + } + req_resp_pool.maxi = -1; + *recv_arr = ret_arr; if(recv_arr_size != NULL) { *recv_arr_size = n_recv_suc; @@ -205,21 +223,25 @@ int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) { net_mod_request_head_t request_head = {}; - static char *buf; - static int buf_size, max_buf_size; - - + int buf_size; + char *buf; + int max_buf_size; + buf = (char *)malloc(MAXBUF); if(buf == NULL) { - buf = (char *)malloc(MAXBUF); + LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc"); + exit(1); + } else { max_buf_size = MAXBUF; - LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc"); - } buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH; if(max_buf_size < buf_size) { buf = (char *)realloc(buf, buf_size); max_buf_size = buf_size; + if(buf == NULL) { + LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request realloc"); + exit(1); + } } request_head.mod = REQ_REP; @@ -234,13 +256,18 @@ if(rio_writen(clientfd, buf, buf_size) != buf_size ) { - LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen"); - + LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request rio_writen"); + free(buf); return -1; } + free(buf); return 0; } +/** + * @return 0 鎴愬姛, 1 瀵规柟娌℃湁瀵瑰簲鐨刱ey, -1 缃戠粶閿欒 + * + */ int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) { int recv_size; void *recv_buf; @@ -255,10 +282,14 @@ } response_head = NetModSocket::decode_response_head(response_head_bs); + if(response_head.code != 0) { + // 瀵规柟娌℃湁瀵瑰簲鐨刱ey + return 1; + } recv_buf = malloc(response_head.content_length); if(recv_buf == NULL) { - LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc"); + LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf"); exit(1); } if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) { @@ -351,6 +382,9 @@ } response_head = NetModSocket::decode_response_head(response_head_bs); + if(response_head.code != 0) { + continue; + } recv_buf = malloc(response_head.content_length); if(recv_buf == NULL) { @@ -452,7 +486,7 @@ free(buf); return nsuc; } - + void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { for(int i =0; i< size; i++) { @@ -488,13 +522,15 @@ void * NetModSocket::encode_response_head(net_mod_response_head_t & response) { char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH); - PUT(head, htonl(response.content_length)); + PUT(head, htonl(response.code)); + PUT(head + 4, htonl(response.content_length)); return head; } net_mod_response_head_t NetModSocket::decode_response_head(void *_headbs) { char *headbs = (char *)_headbs; net_mod_response_head_t head; - head.content_length = ntohl(GET(headbs)); + head.code = ntohl(GET(headbs)); + head.content_length = ntohl(GET(headbs + 4)); return head; } diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h index a68c8cf..c133302 100644 --- a/src/socket/net_mod_socket.h +++ b/src/socket/net_mod_socket.h @@ -7,8 +7,7 @@ #define GET(p) (*(uint32_t *)(p)) #define PUT(p, val) (*(uint32_t *)(p) = (val)) -#define NET_MODE_REQUEST_HEAD_LENGTH 16 -#define NET_MODE_RESPONSE_HEAD_LENGTH 4 + @@ -21,6 +20,7 @@ int key; }; +#define NET_MODE_REQUEST_HEAD_LENGTH 16 struct net_mod_request_head_t { uint32_t mod; @@ -29,9 +29,12 @@ uint32_t topic_length; }; +#define NET_MODE_RESPONSE_HEAD_LENGTH 8 + struct net_mod_response_head_t { // socket_mod_t mod; // int key; + uint32_t code; uint32_t content_length; }; @@ -72,6 +75,8 @@ ShmModSocket shmModSocket; pool req_resp_pool; + + static void * encode_request_head(net_mod_request_head_t & request); static net_mod_request_head_t decode_request_head(void *headbs); @@ -97,11 +102,18 @@ * @send_buf 鍙戦�佺殑娑堟伅锛孈send_size 璇ユ秷鎭綋鐨勯暱搴� * @recv_arr 杩斿洖鐨勫簲绛旀秷鎭粍锛孈recv_arr_size 璇ユ暟缁勯暱搴� * @return 鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� + * 浼樼偣锛氭棤闃诲锛屾�ц兘濂� + * 缂虹偣锛氫笉鏄嚎绋嬪畨鍏ㄧ殑 */ int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); - + /** + * 鍔熻兘鍚宻endandrecv + * 浼樼偣锛氱嚎绋嬪畨鍏� + * 缂虹偣锛氶樆濉炵殑锛屾�ц兘涓嶅sendandrecv + * + */ int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c index bd60993..e890721 100644 --- a/src/socket/shm_mod_socket.c +++ b/src/socket/shm_mod_socket.c @@ -182,6 +182,18 @@ return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); } +int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); +} +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); +} +int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); +} + + /** * 鍚姩bus diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index 3365804..b6dadee 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -104,6 +104,11 @@ int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; + int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; + // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 + int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; + int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; + /** * 鍚姩bus diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index ab34bca..f07d8a6 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -386,6 +386,27 @@ return -1; } +int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, + const int send_size, const int send_port, void **recv_buf, + int *recv_size, struct timespec *timeout, int flags) { + if (socket->socket_type != SHM_SOCKET_DGRAM) { + err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " + "which is not a SHM_SOCKET_DGRAM socket ", + socket->socket_type); + } + int recv_port; + int rv; + + + if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) { + rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags); + return rv; + } else { + return rv; + } + return -1; +} + // ============================================================================================================ /** diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index e38fd0e..4f1efc2 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -100,6 +100,13 @@ int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec * timeout = NULL, int flags=0); +/** + * 鍔熻兘鍚宻hm_sendandrecv, 浣嗘槸涓嶆槸绾跨▼瀹夊叏鐨� + */ +int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, + struct timespec * timeout = NULL, int flags=0); + + #endif \ No newline at end of file diff --git a/test_net_socket/net_mod_socket.c b/test_net_socket/net_mod_socket.c index 9f1bfa1..a763c5e 100644 --- a/test_net_socket/net_mod_socket.c +++ b/test_net_socket/net_mod_socket.c @@ -4,9 +4,17 @@ #include "dgram_mod_socket.h" #include "usg_common.h" +typedef struct Targ { + int port; + int id; + +}Targ; + void server(int port) { void *serverSocket = net_mod_server_socket_open(port); - net_mod_server_socket_start(serverSocket); + if(net_mod_server_socket_start(serverSocket) != 0) { + err_exit(errno, "net_mod_server_socket_start"); + } } void client(int port ){ @@ -14,13 +22,15 @@ char content[MAXLINE]; char action[512]; char topic[512]; - net_mod_recv_msg_t *recv_arr; + int recv_arr_size, i, n; int node_arr_size = 3; + + net_mod_recv_msg_t *recv_arr; //192.168.20.104 net_node_t node_arr[] = { {"192.168.5.22", port, 11}, - {"192.168.20.10", port, 11}, + {"192.168.20.104", port, 21}, {"192.168.20.104", port, 11} }; @@ -76,6 +86,90 @@ } + + +#define SCALE 100000 + +void *runclient(void *arg) { + Targ *targ = (Targ *)arg; + int port = targ->port; + char sendbuf[512]; + + int i,j, n, recv_arr_size; + net_mod_recv_msg_t *recv_arr; + + int node_arr_size = 1; + //192.168.20.104 + net_node_t node_arr[] = { + {NULL, port, 11} + }; + + void * client = net_mod_socket_open(); + + char filename[512]; + sprintf(filename, "test%d.tmp", targ->id); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + // fp = stdout; + + int recvsize; + void *recvbuf; + for (i = 0; i < SCALE; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + fprintf(fp, "requst:%s\n", sendbuf); + n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + //printf("send %d nodes\n", n); + for(j=0; j < recv_arr_size; j++) { + fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", + recv_arr[j].host, + recv_arr[j].port, + recv_arr[j].key, + recv_arr[j].content + ); + } + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } + fclose(fp); + net_mod_socket_close(client); + return (void *)i; +} + +void mclient(int port) { + + int status, i = 0, processors = 4; + void *res[processors]; + // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + Targ targs[processors]; + pthread_t tids[processors]; + char sendbuf[512]; + struct timeval start, end; + long total = 0; + + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].port = port; + targs[i].id = i; + pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + total += (long)res[i]; + //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffusec = difftime - diffsec*1000000; + fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); + // fflush(stdout); +} int main(int argc, char *argv[]) { shm_init(512); @@ -95,6 +189,9 @@ if (strcmp("client", argv[1]) == 0) client(port); + + if (strcmp("mclient", argv[1]) == 0) + mclient(port); } -- Gitblit v1.8.0