src/libshm_queue.a | 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/net_mod_server_socket.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/net_mod_server_socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/net_mod_socket.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/net_mod_socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_net_socket/net_mod_req_rep.sh | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/libshm_queue.aBinary files differ
src/socket/net_mod_server_socket.c
@@ -4,7 +4,7 @@ #include "net_mod_socket_io.h" #include "net_mod_socket.h" NetModServerSocket::NetModServerSocket(int port) NetModServerSocket::NetModServerSocket(int port):max_buf(1024) { char portstr[32]; @@ -12,11 +12,17 @@ sprintf(portstr, "%d", port); listenfd = Open_listenfd(portstr); init_pool(listenfd); buf = malloc(max_buf); if(buf == NULL) { err_exit(errno, "process_client malloc"); } } NetModServerSocket::~NetModServerSocket() { Close(listenfd); fee(buf); } void NetModServerSocket::start() { @@ -88,11 +94,11 @@ int n; net_mod_request_head_t request_head; net_mod_response_head_t response_head; char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH]; void *recv_buf; static void *buf = NULL; int recv_size; static size_t max_buf = 1024; if(buf == NULL) { buf = malloc(max_buf); if(buf == NULL) { @@ -101,10 +107,12 @@ } if ((n = rio_readnb(rio, &request_head, sizeof(net_mod_request_head_t))) != sizeof(net_mod_request_head_t)) if (rio_readnb(rio, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { return -1; } request_head = NetModSocket::decode_request_head(request_head_bs); if(request_head.content_length > max_buf) { buf = realloc(buf, request_head.content_length); @@ -121,7 +129,7 @@ if(request_head.mod == REQ_REP) { shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size); response_head.content_length = recv_size; Rio_writen(connfd, &response_head, sizeof(response_head)); Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH); Rio_writen(connfd, recv_buf, recv_size); } src/socket/net_mod_server_socket.h
@@ -31,6 +31,9 @@ ShmModSocket shmModSocket; pool pool; void *buf = NULL; size_t max_buf; void init_pool(int listenfd); void add_client(int connfd); void check_clients(); src/socket/net_mod_socket.c
@@ -11,6 +11,10 @@ } NetModSocket::~NetModSocket() { } int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { @@ -20,6 +24,7 @@ char mapKey[256]; void *recv_buf; int recv_size; char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; net_mod_request_head_t request_head; net_mod_response_head_t response_head; std::map<std::string, rio_t*>::iterator mapIter; @@ -49,18 +54,21 @@ request_head.mod = REQ_REP; request_head.key = node->key; request_head.content_length = send_size; if( (n = rio_writen(rio->rio_fd, &request_head, sizeof(request_head))) != sizeof(request_head)) { if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { err_exit(errno, "NetModSocket::send head rio_writen"); } if( (n = rio_writen(rio->rio_fd, send_buf, send_size)) != send_size ) { if(rio_writen(rio->rio_fd, send_buf, send_size) != send_size ) { err_exit(errno, "NetModSocket::send conent rio_writen"); } if ((n = rio_readnb(rio, &response_head, sizeof(response_head))) != sizeof(response_head)) { if ( rio_readnb(rio, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) { err_exit(errno, "NetModSocket::send rio_readnb"); } response_head = NetModSocket::decode_response_head(response_head_bs); recv_buf = malloc(response_head.content_length); if(recv_buf == NULL) { @@ -99,7 +107,32 @@ // } NetModSocket::~NetModSocket() { void * NetModSocket::encode_request_head(net_mod_request_head_t & request) { char * head = (char *)malloc(NET_MODE_REQUEST_HEAD_LENGTH); PUT(head, htonl(request.mod)); PUT(head + 4, htonl(request.key)); PUT(head + 8, htonl(request.content_length)); return head; } net_mod_request_head_t NetModSocket::decode_request_head(void *_headbs) { char *headbs = (char *)_headbs; net_mod_request_head_t head; head.mod = ntohl(GET(headbs)); head.key = ntohl(GET(headbs + 4)); head.content_length = ntohl(GET(headbs + 8)); return head; } void * NetModSocket::encode_response_head(net_mod_response_head_t & response) { char * head = (char *)malloc(NET_MODE_RESPONSE_HEAD_LENGTH); PUT(head, htonl(response.content_length)); return head; } net_mod_response_head_t NetModSocket::decode_response_head(void *_headbs) { char *headbs = (char *)_headbs; net_mod_response_head_t head; head.content_length = ntohl(GET(headbs)); return head; } src/socket/net_mod_socket.h
@@ -4,6 +4,12 @@ #include "shm_mod_socket.h" #include "socket_io.h" #define GET(p) (*(uint32_t *)(p)) #define PUT(p, val) (*(uint32_t *)(p) = (val)) #define NET_MODE_REQUEST_HEAD_LENGTH 12 #define NET_MODE_RESPONSE_HEAD_LENGTH 4 struct net_node_t { const char *host; @@ -49,8 +55,15 @@ ~NetModSocket(); static void free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size); static void * encode_request_head(net_mod_request_head_t & request); static net_mod_request_head_t decode_request_head(void *headbs); static void * encode_response_head(net_mod_response_head_t & response); static net_mod_response_head_t decode_response_head(void *_headbs); }; #endif test_net_socket/net_mod_req_rep.sh
@@ -13,7 +13,7 @@ ./net_mod_req_rep client 5000 } function clean() { function close() { ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep" | awk '{print $2}' | xargs -i kill -9 {} ipcrm -a } @@ -25,11 +25,11 @@ "client") client ;; "clean") clean "close") close ;; "") clean close server client ;;