From 72b7aebb0022f8e391c999348763acd5f7a16133 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 26 十一月 2020 18:56:33 +0800 Subject: [PATCH] update --- src/socket/shm_socket.c | 120 ++++++++++++++++++++++++++++++------------------------------ 1 files changed, 60 insertions(+), 60 deletions(-) diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index f07d8a6..3041568 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -8,7 +8,7 @@ void print_msg(char *head, shm_msg_t &msg) { - // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); + // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); } static void *_server_run_msg_rev(void *_socket); @@ -20,15 +20,15 @@ static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); static inline int _shm_socket_check_key(shm_socket_t *socket) { - void *tmp_ptr = mm_get_by_key(socket->port); + void *tmp_ptr = mm_get_by_key(socket->key); if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { - err_exit(0, "key %d has already been in used!", socket->port); + err_exit(0, "key %d has already been in used!", socket->key); return 0; } return 1; } -SHMQueue<shm_msg_t> *_attach_remote_queue(int port); +SHMQueue<shm_msg_t> *_attach_remote_queue(int key); @@ -39,7 +39,7 @@ shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->socket_type = socket_type; - socket->port = -1; + socket->key = -1; socket->force_bind = false; socket->dispatch_thread = 0; socket->status = SHM_CONN_CLOSED; @@ -65,14 +65,14 @@ return ret; } -int shm_socket_bind(shm_socket_t *socket, int port) { - socket->port = port; +int shm_socket_bind(shm_socket_t *socket, int key) { + socket->key = key; return 0; } -int shm_socket_force_bind(shm_socket_t *socket, int port) { +int shm_socket_force_bind(shm_socket_t *socket, int key) { socket->force_bind = true; - socket->port = port; + socket->key = key; return 0; } @@ -83,17 +83,17 @@ "SHM_SOCKET_STREAM socket"); } - int port; + int key; hashtable_t *hashtable = mm_get_hashtable(); - if (socket->port == -1) { - port = hashtable_alloc_key(hashtable); - socket->port = port; + if (socket->key == -1) { + key = hashtable_alloc_key(hashtable); + socket->key = key; } else { _shm_socket_check_key(socket); } - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); socket->clientSocketMap = new std::map<int, shm_socket_t *>; socket->status = SHM_CONN_LISTEN; @@ -113,25 +113,25 @@ "SHM_SOCKET_STREAM socket"); } hashtable_t *hashtable = mm_get_hashtable(); - int client_port; + int client_key; shm_socket_t *client_socket; shm_msg_t src; if (socket->acceptQueue->pop(src)) { // print_msg("===accept:", src); - client_port = src.port; + client_key = src.key; // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); client_socket = shm_open_socket(socket->socket_type); - client_socket->port = socket->port; + client_socket->key = socket->key; // client_socket->queue= socket->queue; //鍒濆鍖栨秷鎭痲ueue client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); //杩炴帴鍒板鏂筿ueue - client_socket->remoteQueue = _attach_remote_queue(client_port); + client_socket->remoteQueue = _attach_remote_queue(client_key); - socket->clientSocketMap->insert({client_port, client_socket}); + socket->clientSocketMap->insert({client_key, client_socket}); /* * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� @@ -140,7 +140,7 @@ //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 struct timespec timeout = {1, 0}; shm_msg_t msg; - msg.port = socket->port; + msg.key = socket->key; msg.size = 0; msg.type = SHM_SOCKET_OPEN_REPLY; @@ -159,33 +159,33 @@ } -int shm_connect(shm_socket_t *socket, int port) { +int shm_connect(shm_socket_t *socket, int key) { if (socket->socket_type != SHM_SOCKET_STREAM) { err_exit(0, "can not invoke shm_connect method with a socket which is not " "a SHM_SOCKET_STREAM socket"); } hashtable_t *hashtable = mm_get_hashtable(); - if (hashtable_get(hashtable, port) == NULL) { - err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); + if (hashtable_get(hashtable, key) == NULL) { + err_exit(0, "shm_connect锛歝onnect at key %d failed!", key); } - if (socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); + if (socket->key == -1) { + socket->key = hashtable_alloc_key(hashtable); } else { _shm_socket_check_key(socket); } - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); - if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) { - err_exit(0, "connect to %d failted", port); + if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) { + err_exit(0, "connect to %d failted", key); } socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); //鍙戦�乷pen璇锋眰 struct timespec timeout = {1, 0}; shm_msg_t msg; - msg.port = socket->port; + msg.key = socket->key; msg.size = 0; msg.type = SHM_SOCKET_OPEN; socket->remoteQueue->push_timeout(msg, &timeout); @@ -220,7 +220,7 @@ // } shm_msg_t dest; dest.type = SHM_COMMON_MSG; - dest.port = socket->port; + dest.key = socket->key; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); @@ -256,7 +256,7 @@ // 鐭繛鎺ユ柟寮忓彂閫� int shm_sendto(shm_socket_t *socket, const void *buf, const int size, - const int port, const struct timespec *timeout, const int flags) { + const int key, const struct timespec *timeout, const int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is " "not a SHM_SOCKET_DGRAM socket ", @@ -266,31 +266,31 @@ SemUtil::dec(socket->mutex); if (socket->queue == NULL) { - if (socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); + if (socket->key == -1) { + socket->key = hashtable_alloc_key(hashtable); } else { _shm_socket_check_key(socket); } - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); } SemUtil::inc(socket->mutex); - if (port == socket->port) { + if (key == socket->key) { err_msg(0, "can not send to your self!"); return -1; } SHMQueue<shm_msg_t> *remoteQueue; - if ((remoteQueue = _attach_remote_queue(port)) == NULL) { + if ((remoteQueue = _attach_remote_queue(key)) == NULL) { err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!"); return SHM_SOCKET_ECONNFAILED; } shm_msg_t dest; dest.type = SHM_COMMON_MSG; - dest.port = socket->port; + dest.key = socket->key; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); @@ -312,13 +312,13 @@ } else { delete remoteQueue; mm_free(dest.buf); - err_msg(errno, "sendto port %d failed!", port); + err_msg(errno, "sendto key %d failed!", key); return -1; } } // 鐭繛鎺ユ柟寮忔帴鍙� -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { +int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which " "is not a SHM_SOCKET_DGRAM socket ", @@ -327,14 +327,14 @@ hashtable_t *hashtable = mm_get_hashtable(); SemUtil::dec(socket->mutex); if (socket->queue == NULL) { - if (socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); + if (socket->key == -1) { + socket->key = hashtable_alloc_key(hashtable); } else { _shm_socket_check_key(socket); } - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16); } SemUtil::inc(socket->mutex); @@ -354,7 +354,7 @@ memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; - *port = src.port; + *key = src.key; mm_free(src.buf); // printf("shm_recvfrom pop after\n"); return 0; @@ -364,19 +364,19 @@ } int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, - const int send_size, const int send_port, void **recv_buf, + const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " "which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); } - int recv_port; + int recv_key; int rv; shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); - if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags)) == 0) { - rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags); + if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { + rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); shm_close_socket(tmp_socket); return rv; } else { @@ -387,19 +387,19 @@ } int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, - const int send_size, const int send_port, void **recv_buf, + const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " "which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); } - int recv_port; + int recv_key; int rv; - if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) { - rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags); + if ((rv = shm_sendto(socket, send_buf, send_size, send_key, timeout, flags)) == 0) { + rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_key, timeout, flags); return rv; } else { return rv; @@ -412,21 +412,21 @@ /** * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� */ -SHMQueue<shm_msg_t> *_attach_remote_queue(int port) { +SHMQueue<shm_msg_t> *_attach_remote_queue(int key) { hashtable_t *hashtable = mm_get_hashtable(); - if (hashtable_get(hashtable, port) == NULL) { - err_msg(0, "_remote_queue_attach锛歝onnet at port %d failed!", port); + if (hashtable_get(hashtable, key) == NULL) { + err_msg(0, "_remote_queue_attach锛歝onnet at key %d failed!", key); return NULL; } - SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); + SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(key, 0); return queue; } -void _server_close_conn_to_client(shm_socket_t *socket, int port) { +void _server_close_conn_to_client(shm_socket_t *socket, int key) { shm_socket_t *client_socket; std::map<int, shm_socket_t *>::iterator iter = - socket->clientSocketMap->find(port); + socket->clientSocketMap->find(key); if (iter != socket->clientSocketMap->end()) { client_socket = iter->second; free((void *)client_socket); @@ -452,11 +452,11 @@ socket->acceptQueue->push_timeout(src, &timeout); break; case SHM_SOCKET_CLOSE: - _server_close_conn_to_client(socket, src.port); + _server_close_conn_to_client(socket, src.key); break; case SHM_COMMON_MSG: - iter = socket->clientSocketMap->find(src.port); + iter = socket->clientSocketMap->find(src.key); if (iter != socket->clientSocketMap->end()) { client_socket = iter->second; // print_msg("_server_run_msg_rev push before", src); @@ -511,7 +511,7 @@ struct timespec timeout = {1, 0}; shm_msg_t close_msg; - close_msg.port = socket->port; + close_msg.key = socket->key; close_msg.size = 0; close_msg.type = SHM_SOCKET_CLOSE; if (notifyRemote && socket->remoteQueue != NULL) { -- Gitblit v1.8.0