| | |
| | | # debug "make --just-print" |
| | | DIRS = src test_net_socket |
| | | DIRS = src test_net_socket test_socket |
| | | TAR_NAME = shm_queue.tar.gz |
| | | |
| | | all: |
| | |
| | | #include "net_mod_socket_io.h" |
| | | #include "net_mod_socket.h" |
| | | |
| | | NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256) |
| | | NetModServerSocket::NetModServerSocket(int _port): listenfd(0), port(_port), max_buf(1024), max_topic_buf(256), max_response_buf(1024) |
| | | { |
| | | char portstr[32]; |
| | | |
| | | //shmModSocket = new ShmModSocket; |
| | | sprintf(portstr, "%d", port); |
| | | listenfd = Open_listenfd(portstr); |
| | | init_pool(listenfd); |
| | | |
| | | |
| | | buf = malloc(max_buf); |
| | | if(buf == NULL) { |
| | | err_exit(errno, "process_client malloc"); |
| | | err_exit(errno, "NetModServerSocket::NetModServerSocket malloc"); |
| | | } |
| | | |
| | | topic_buf = malloc(max_topic_buf); |
| | | if(topic_buf == NULL) { |
| | | err_exit(errno, "process_client malloc"); |
| | | err_exit(errno, "NetModServerSocket::NetModServerSocket malloc"); |
| | | } |
| | | |
| | | response_buf = (char *) malloc(max_response_buf); |
| | | if(response_buf == NULL) { |
| | | err_exit(errno, "NetModServerSocket::NetModServerSocket malloc"); |
| | | } |
| | | } |
| | | |
| | | |
| | | NetModServerSocket::~NetModServerSocket() { |
| | | Close(listenfd); |
| | | free(buf); |
| | | if(listenfd != 0) { |
| | | Close(listenfd); |
| | | } |
| | | |
| | | if(buf != NULL) |
| | | free(buf); |
| | | if(topic_buf != NULL) |
| | | free(topic_buf); |
| | | if(response_buf != NULL) |
| | | free(response_buf); |
| | | } |
| | | |
| | | void NetModServerSocket::start() { |
| | | int NetModServerSocket::start() { |
| | | int connfd; |
| | | socklen_t clientlen; |
| | | struct sockaddr_storage clientaddr; |
| | | char portstr[32]; |
| | | |
| | | //shmModSocket = new ShmModSocket; |
| | | sprintf(portstr, "%d", port); |
| | | listenfd = open_listenfd(portstr); |
| | | if(listenfd < 0) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start"); |
| | | return -1; |
| | | } |
| | | init_pool(listenfd); |
| | | |
| | | while (1) |
| | | { |
| | | /* Wait for listening/connected descriptor(s) to become ready */ |
| | |
| | | /* Echo a text line from each ready connected descriptor */ |
| | | check_clients(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | void NetModServerSocket::init_pool(int listenfd) |
| | |
| | | net_mod_response_head_t response_head; |
| | | char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH]; |
| | | void *recv_buf; |
| | | char tmp[8196]; |
| | | int recv_size; |
| | | // char tmp[8196]; |
| | | int recv_size, response_buf_size; |
| | | |
| | | if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) |
| | | { |
| | |
| | | } |
| | | |
| | | if(request_head.mod == REQ_REP) { |
| | | // TODO: shmModSocket.sendandrecv_unsafe |
| | | shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size); |
| | | response_head.content_length = recv_size; |
| | | Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH); |
| | | Rio_writen(connfd, recv_buf, recv_size); |
| | | if(shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size) != 0) { |
| | | response_head.code = 1; |
| | | response_head.content_length = 0; |
| | | if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH ) |
| | | return -1; |
| | | //Rio_writen(connfd, recv_buf, recv_size); |
| | | } else { |
| | | response_head.code = 0; |
| | | response_head.content_length = recv_size; |
| | | |
| | | response_buf_size = NET_MODE_RESPONSE_HEAD_LENGTH + recv_size; |
| | | if(max_response_buf < response_buf_size) { |
| | | buf = (char *)realloc(response_buf, response_buf_size); |
| | | max_response_buf = response_buf_size; |
| | | } |
| | | memcpy(response_buf, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH); |
| | | memcpy(response_buf + NET_MODE_RESPONSE_HEAD_LENGTH, recv_buf, recv_size); |
| | | |
| | | if(rio_writen(connfd, response_buf, response_buf_size) != response_buf_size) { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | return 0; |
| | | |
| | | |
| | | } else if(request_head.mod == BUS) { |
| | | if(request_head.topic_length > max_topic_buf) { |
| | | topic_buf = realloc(topic_buf, request_head.topic_length); |
| | |
| | | if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) { |
| | | return -1; |
| | | } |
| | | LoggerFactory::getLogger()->debug("====server pub %s===\n", buf); |
| | | LoggerFactory::getLogger()->debug("====server pub %s===\n", buf); |
| | | shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key); |
| | | } |
| | | |
| | |
| | | |
| | | private: |
| | | int listenfd; |
| | | int port; |
| | | ShmModSocket shmModSocket; |
| | | pool pool; |
| | | |
| | | void *buf; |
| | | void *topic_buf; |
| | | char *response_buf; |
| | | |
| | | size_t max_buf; |
| | | size_t max_topic_buf; |
| | | size_t max_response_buf; |
| | | |
| | | |
| | | void init_pool(int listenfd); |
| | | void add_client(int connfd); |
| | |
| | | |
| | | /* |
| | | * 启动 server |
| | | * @return 0 success, 其他 failture |
| | | */ |
| | | void start(); |
| | | int start(); |
| | | ~NetModServerSocket(); |
| | | |
| | | }; |
| | |
| | | |
| | | } |
| | | |
| | | void net_mod_server_socket_start(void *_sockt) { |
| | | int net_mod_server_socket_start(void *_sockt) { |
| | | net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt; |
| | | sockt->sockt->start(); |
| | | return sockt->sockt->start(); |
| | | } |
| | |
| | | /** |
| | | * 启动 |
| | | */ |
| | | void net_mod_server_socket_start(void *_sockt); |
| | | int net_mod_server_socket_start(void *_sockt); |
| | | |
| | | |
| | | |
| | |
| | | NetModSocket::NetModSocket() |
| | | { |
| | | init_req_rep_req_resp_pool(); |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | Close(clientfd); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | net_node_t *node = req_resp_pool.connfdNodeMap.find(connfd)->second; |
| | | |
| | | // std::map<std::string, int>::iterator mapIter; |
| | | Close(connfd); //line:conc:echoservers:closeconnfd |
| | | if(close(connfd) != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::close_connect"); |
| | | } |
| | | FD_CLR(connfd, &req_resp_pool.read_set); |
| | | FD_CLR(connfd, &req_resp_pool.write_set); |
| | | FD_CLR(connfd, &req_resp_pool.except_set); |
| | |
| | | // char portstr[32]; |
| | | sprintf(mapKey, "%s:%d", node->host, node->port); |
| | | req_resp_pool.connectionMap.erase(mapKey); |
| | | LoggerFactory::getLogger()->debug("close_connect"); |
| | | // LoggerFactory::getLogger()->debug("close_connect"); |
| | | } |
| | | |
| | | |
| | | int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | |
| | | int i, recv_size, connfd; |
| | | int i, n, recv_size, connfd; |
| | | net_node_t *node; |
| | | void *recv_buf; |
| | | |
| | | struct timeval timeout = {5, 0}; |
| | | |
| | | int n_conn_suc = 0, n_recv_suc = 0; |
| | | |
| | | net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); |
| | | |
| | | init_req_rep_req_resp_pool(); |
| | | //init_req_rep_req_resp_pool(); |
| | | |
| | | for (i = 0; i< arrlen; i++) { |
| | | |
| | |
| | | ret_arr[n_recv_suc].content = recv_buf; |
| | | ret_arr[n_recv_suc].content_length = recv_size; |
| | | n_recv_suc++; |
| | | continue; |
| | | } |
| | | |
| | | if( (connfd = connect(node)) < 0 ) { |
| | | continue; |
| | | } |
| | | |
| | | |
| | | // if(write_request(connfd, node->key, send_buf, send_size) != 0) { |
| | | // close_connect(connfd); |
| | | // } |
| | | |
| | | n_conn_suc++; |
| | | // optval = 0; |
| | | // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); |
| | | } |
| | | |
| | | printf("n_conn_suc =%d\n", n_conn_suc); |
| | | // printf("n_conn_suc =%d\n", n_conn_suc); |
| | | |
| | | while(n_recv_suc < n_conn_suc) |
| | | { |
| | |
| | | req_resp_pool.ready_read_set = req_resp_pool.read_set; |
| | | req_resp_pool.ready_write_set = req_resp_pool.write_set; |
| | | req_resp_pool.ready_except_set = req_resp_pool.except_set; |
| | | req_resp_pool.nready = select(req_resp_pool.maxfd + 1, &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, &req_resp_pool.ready_except_set, NULL); |
| | | printf("req_resp_pool.nready =%d\n", req_resp_pool.nready); |
| | | if( (req_resp_pool.nready = select(req_resp_pool.maxfd + 1, |
| | | &req_resp_pool.ready_read_set, &req_resp_pool.ready_write_set, |
| | | &req_resp_pool.ready_except_set, &timeout)) <= 0) { |
| | | // wirite_set 和 read_set 在指定时间内都没准备好 |
| | | break; |
| | | } |
| | | // printf("req_resp_pool.nready =%d\n", req_resp_pool.nready); |
| | | for (i = 0; (i <= req_resp_pool.maxi) && (req_resp_pool.nready > 0); i++) { |
| | | if ( (connfd = req_resp_pool.connfd[i]) > 0 ) { |
| | | /* If the descriptor is ready, echo a text line from it */ |
| | |
| | | if ( FD_ISSET(connfd, &req_resp_pool.ready_read_set)) |
| | | { |
| | | req_resp_pool.nready--; |
| | | if(read_response(connfd, ret_arr+n_recv_suc) == 0) { |
| | | if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) { |
| | | |
| | | // 成功收到返回消息,清空读入位 |
| | | FD_CLR(connfd, &req_resp_pool.read_set); |
| | | req_resp_pool.connfd[i] = -1; |
| | | n_recv_suc++; |
| | | } else { |
| | | } else if(n == -1) { |
| | | close_connect(connfd); |
| | | } |
| | | |
| | | } |
| | | |
| | | if (FD_ISSET(connfd, &req_resp_pool.ready_write_set)) |
| | | { |
| | | req_resp_pool.nready--; |
| | | printf("write %d\n", connfd); |
| | | // printf("write %d\n", connfd); |
| | | if(write_request(connfd, node->key, send_buf, send_size) != 0) { |
| | | close_connect(connfd); |
| | | } else{ |
| | |
| | | } |
| | | } |
| | | |
| | | FD_ZERO(&req_resp_pool.except_set); |
| | | for (i = 0; i <= req_resp_pool.maxi; i++) { |
| | | if ( (connfd = req_resp_pool.connfd[i]) > 0 ) { |
| | | // 关闭并清除写入或读取失败的连接 |
| | | close_connect(connfd); |
| | | } |
| | | } |
| | | req_resp_pool.maxi = -1; |
| | | |
| | | *recv_arr = ret_arr; |
| | | if(recv_arr_size != NULL) { |
| | | *recv_arr_size = n_recv_suc; |
| | |
| | | |
| | | int NetModSocket::write_request(int clientfd, int key, void *send_buf, int send_size) { |
| | | net_mod_request_head_t request_head = {}; |
| | | static char *buf; |
| | | static int buf_size, max_buf_size; |
| | | |
| | | |
| | | int buf_size; |
| | | char *buf; |
| | | int max_buf_size; |
| | | buf = (char *)malloc(MAXBUF); |
| | | if(buf == NULL) { |
| | | buf = (char *)malloc(MAXBUF); |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::NetModSocket malloc"); |
| | | exit(1); |
| | | } else { |
| | | max_buf_size = MAXBUF; |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc"); |
| | | |
| | | } |
| | | |
| | | buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH; |
| | | if(max_buf_size < buf_size) { |
| | | buf = (char *)realloc(buf, buf_size); |
| | | max_buf_size = buf_size; |
| | | if(buf == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request realloc"); |
| | | exit(1); |
| | | } |
| | | } |
| | | |
| | | request_head.mod = REQ_REP; |
| | |
| | | |
| | | |
| | | if(rio_writen(clientfd, buf, buf_size) != buf_size ) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen"); |
| | | |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::write_request rio_writen"); |
| | | free(buf); |
| | | return -1; |
| | | } |
| | | free(buf); |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * @return 0 成功, 1 对方没有对应的key, -1 网络错误 |
| | | * |
| | | */ |
| | | int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) { |
| | | int recv_size; |
| | | void *recv_buf; |
| | |
| | | } |
| | | |
| | | response_head = NetModSocket::decode_response_head(response_head_bs); |
| | | if(response_head.code != 0) { |
| | | // 对方没有对应的key |
| | | return 1; |
| | | } |
| | | |
| | | recv_buf = malloc(response_head.content_length); |
| | | if(recv_buf == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc"); |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc recv_buf"); |
| | | exit(1); |
| | | } |
| | | if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) != response_head.content_length) { |
| | |
| | | } |
| | | |
| | | response_head = NetModSocket::decode_response_head(response_head_bs); |
| | | if(response_head.code != 0) { |
| | | continue; |
| | | } |
| | | |
| | | recv_buf = malloc(response_head.content_length); |
| | | if(recv_buf == NULL) { |
| | |
| | | free(buf); |
| | | return nsuc; |
| | | } |
| | | |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | | |
| | | for(int i =0; i< size; i++) { |
| | |
| | | |
| | | void * NetModSocket::encode_response_head(net_mod_response_head_t & response) { |
| | | char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH); |
| | | PUT(head, htonl(response.content_length)); |
| | | PUT(head, htonl(response.code)); |
| | | PUT(head + 4, htonl(response.content_length)); |
| | | return head; |
| | | } |
| | | |
| | | net_mod_response_head_t NetModSocket::decode_response_head(void *_headbs) { |
| | | char *headbs = (char *)_headbs; |
| | | net_mod_response_head_t head; |
| | | head.content_length = ntohl(GET(headbs)); |
| | | head.code = ntohl(GET(headbs)); |
| | | head.content_length = ntohl(GET(headbs + 4)); |
| | | return head; |
| | | } |
| | |
| | | #define GET(p) (*(uint32_t *)(p)) |
| | | #define PUT(p, val) (*(uint32_t *)(p) = (val)) |
| | | |
| | | #define NET_MODE_REQUEST_HEAD_LENGTH 16 |
| | | #define NET_MODE_RESPONSE_HEAD_LENGTH 4 |
| | | |
| | | |
| | | |
| | | |
| | |
| | | int key; |
| | | }; |
| | | |
| | | #define NET_MODE_REQUEST_HEAD_LENGTH 16 |
| | | |
| | | struct net_mod_request_head_t { |
| | | uint32_t mod; |
| | |
| | | uint32_t topic_length; |
| | | }; |
| | | |
| | | #define NET_MODE_RESPONSE_HEAD_LENGTH 8 |
| | | |
| | | struct net_mod_response_head_t { |
| | | // socket_mod_t mod; |
| | | // int key; |
| | | uint32_t code; |
| | | uint32_t content_length; |
| | | }; |
| | | |
| | |
| | | |
| | | ShmModSocket shmModSocket; |
| | | pool req_resp_pool; |
| | | |
| | | |
| | | |
| | | static void * encode_request_head(net_mod_request_head_t & request); |
| | | static net_mod_request_head_t decode_request_head(void *headbs); |
| | |
| | | * @send_buf 发送的消息,@send_size 该消息体的长度 |
| | | * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度 |
| | | * @return 成功发送的节点的个数 |
| | | * 优点:无阻塞,性能好 |
| | | * 缺点:不是线程安全的 |
| | | */ |
| | | int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); |
| | | |
| | | |
| | | /** |
| | | * 功能同sendandrecv |
| | | * 优点:线程安全 |
| | | * 缺点:阻塞的,性能不如sendandrecv |
| | | * |
| | | */ |
| | | int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); |
| | | |
| | |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | |
| | | int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; |
| | | int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | |
| | | int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; |
| | | int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | |
| | | return -1; |
| | | } |
| | | |
| | | int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_port, void **recv_buf, |
| | | int *recv_size, struct timespec *timeout, int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | | socket->socket_type); |
| | | } |
| | | int recv_port; |
| | | int rv; |
| | | |
| | | |
| | | if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags); |
| | | return rv; |
| | | } else { |
| | | return rv; |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | // ============================================================================================================ |
| | | |
| | | /** |
| | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, |
| | | struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | /** |
| | | * 功能同shm_sendandrecv, 但是不是线程安全的 |
| | | */ |
| | | int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, |
| | | struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | |
| | | |
| | | |
| | | #endif |
| | |
| | | #include "dgram_mod_socket.h" |
| | | #include "usg_common.h" |
| | | |
| | | typedef struct Targ { |
| | | int port; |
| | | int id; |
| | | |
| | | }Targ; |
| | | |
| | | void server(int port) { |
| | | void *serverSocket = net_mod_server_socket_open(port); |
| | | net_mod_server_socket_start(serverSocket); |
| | | if(net_mod_server_socket_start(serverSocket) != 0) { |
| | | err_exit(errno, "net_mod_server_socket_start"); |
| | | } |
| | | } |
| | | |
| | | void client(int port ){ |
| | |
| | | char content[MAXLINE]; |
| | | char action[512]; |
| | | char topic[512]; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | |
| | | int recv_arr_size, i, n; |
| | | int node_arr_size = 3; |
| | | |
| | | net_mod_recv_msg_t *recv_arr; |
| | | //192.168.20.104 |
| | | net_node_t node_arr[] = { |
| | | {"192.168.5.22", port, 11}, |
| | | {"192.168.20.10", port, 11}, |
| | | {"192.168.20.104", port, 21}, |
| | | {"192.168.20.104", port, 11} |
| | | }; |
| | | |
| | |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | | #define SCALE 100000 |
| | | |
| | | void *runclient(void *arg) { |
| | | Targ *targ = (Targ *)arg; |
| | | int port = targ->port; |
| | | char sendbuf[512]; |
| | | |
| | | int i,j, n, recv_arr_size; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | |
| | | int node_arr_size = 1; |
| | | //192.168.20.104 |
| | | net_node_t node_arr[] = { |
| | | {NULL, port, 11} |
| | | }; |
| | | |
| | | void * client = net_mod_socket_open(); |
| | | |
| | | char filename[512]; |
| | | sprintf(filename, "test%d.tmp", targ->id); |
| | | FILE *fp = NULL; |
| | | fp = fopen(filename, "w+"); |
| | | // fp = stdout; |
| | | |
| | | int recvsize; |
| | | void *recvbuf; |
| | | for (i = 0; i < SCALE; i++) { |
| | | sprintf(sendbuf, "thread(%d) %d", targ->id, i); |
| | | fprintf(fp, "requst:%s\n", sendbuf); |
| | | n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | //printf("send %d nodes\n", n); |
| | | for(j=0; j < recv_arr_size; j++) { |
| | | fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", |
| | | recv_arr[j].host, |
| | | recv_arr[j].port, |
| | | recv_arr[j].key, |
| | | recv_arr[j].content |
| | | ); |
| | | } |
| | | // 使用完后,不要忘记释放掉 |
| | | net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); |
| | | } |
| | | fclose(fp); |
| | | net_mod_socket_close(client); |
| | | return (void *)i; |
| | | } |
| | | |
| | | void mclient(int port) { |
| | | |
| | | int status, i = 0, processors = 4; |
| | | void *res[processors]; |
| | | // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); |
| | | Targ targs[processors]; |
| | | pthread_t tids[processors]; |
| | | char sendbuf[512]; |
| | | struct timeval start, end; |
| | | long total = 0; |
| | | |
| | | gettimeofday(&start, NULL); |
| | | for (i = 0; i < processors; i++) { |
| | | targs[i].port = port; |
| | | targs[i].id = i; |
| | | pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); |
| | | } |
| | | |
| | | for (i = 0; i < processors; i++) { |
| | | if (pthread_join(tids[i], &res[i]) != 0) { |
| | | perror("multyThreadClient pthread_join"); |
| | | } else { |
| | | total += (long)res[i]; |
| | | //fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); |
| | | } |
| | | } |
| | | |
| | | gettimeofday(&end, NULL); |
| | | |
| | | double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); |
| | | long diffsec = (long) (difftime/1000000); |
| | | long diffusec = difftime - diffsec*1000000; |
| | | fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total ); |
| | | // fflush(stdout); |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | |
| | | if (strcmp("mclient", argv[1]) == 0) |
| | | mclient(port); |
| | | } |
| | | |
| | | |