From 9e6ceaad059b2aec84df92c8750f6d87eab708c2 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 16 七月 2020 20:46:31 +0800 Subject: [PATCH] udpate --- queue/socket.c | 89 +++++++++++++++++++++++++------------------- 1 files changed, 50 insertions(+), 39 deletions(-) diff --git a/queue/socket.c b/queue/socket.c index 1f3ff24..a52d532 100644 --- a/queue/socket.c +++ b/queue/socket.c @@ -37,29 +37,37 @@ } -int shm_close_socket(shm_socket_t *socket) { +int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 struct timespec timeout = {1, 0}; shm_msg_t close_msg; + close_msg.port = socket->port; close_msg.size = 0; close_msg.type=SHM_SOCKET_CLOSE; - if(socket->remoteQueue != NULL) { + if(notifyRemote && socket->remoteQueue != NULL) { socket->remoteQueue->push_timeout(close_msg, &timeout); } - - - if(socket->queue != NULL) + if(socket->queue != NULL) { delete socket->queue; - if(socket->remoteQueue != NULL) + socket->queue = NULL; + } + + if(socket->remoteQueue != NULL) { delete socket->remoteQueue; + socket->remoteQueue = NULL; + } - if(socket->messageQueue != NULL) + if(socket->messageQueue != NULL) { delete socket->messageQueue; + socket->messageQueue = NULL; + } - if(socket->acceptQueue != NULL) + if(socket->acceptQueue != NULL) { delete socket->acceptQueue; + socket->acceptQueue = NULL; + } if(socket->clientSocketMap != NULL) { shm_socket_t *client_socket; @@ -68,7 +76,9 @@ client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; + client_socket->remoteQueue=NULL; delete client_socket->messageQueue; + client_socket->messageQueue=NULL; socket->clientSocketMap->erase(iter); free((void *)client_socket); } @@ -78,13 +88,16 @@ if(socket->dispatch_thread != 0) pthread_cancel(socket->dispatch_thread); - - free(socket); return 0; } + +int shm_close_socket(shm_socket_t *socket) { + return _shm_close_socket(socket, true); +} + int shm_bind(shm_socket_t * socket, int port) { @@ -119,12 +132,19 @@ shm_socket_t *client_socket; auto iter = socket->clientSocketMap->find(port); if( iter != socket->clientSocketMap->end() ) { - client_socket= iter->second; - delete client_socket->remoteQueue; - delete client_socket->messageQueue; + // client_socket= iter->second; + // if(client_socket->remoteQueue != NULL) { + // delete client_socket->remoteQueue; + // client_socket->remoteQueue = NULL; + // } + // if(client_socket->messageQueue != NULL) { + // delete client_socket->messageQueue; + // client_socket->messageQueue = NULL; + // } + socket->clientSocketMap->erase(iter); } - free((void *)client_socket); + //free((void *)client_socket); } @@ -139,7 +159,6 @@ shm_socket_t *client_socket; std::map<int, shm_socket_t* >::iterator iter; while(socket->queue->pop(src)) { -print_msg("=====_server_run_msg_rev:", src); switch (src.type) { case SHM_SOCKET_OPEN : socket->acceptQueue->push_timeout(src, &timeout); @@ -148,11 +167,9 @@ _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG : -err_msg(0, "===_server_run_msg_rev 1"); iter = socket->clientSocketMap->find(src.port); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; -err_msg(0, "===_server_run_msg_rev client_socket->messageQueue=%p", client_socket->messageQueue); client_socket->messageQueue->push_timeout(src, &timeout); } @@ -178,7 +195,7 @@ if (socket->acceptQueue->pop(src) ) { -print_msg("===accept:", src); +// print_msg("===accept:", src); client_port = src.port; client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); client_socket->port = socket->port; @@ -228,20 +245,8 @@ } void _client_close_conn_to_server(shm_socket_t* socket) { - if(socket->queue != NULL) - delete socket->queue; - if(socket->remoteQueue != NULL) - delete socket->remoteQueue; - - if(socket->messageQueue != NULL) - delete socket->messageQueue; - - if(socket->acceptQueue != NULL) - delete socket->acceptQueue; - - if(socket->dispatch_thread != 0) - pthread_cancel(socket->dispatch_thread); - + + _shm_close_socket(socket, false); } @@ -283,24 +288,30 @@ dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); - - socket->remoteQueue->push(dest); - return 0; + if(socket->remoteQueue->push(dest)) { + return 0; + } else { + err_msg(errno, "connection has been closed!"); + return -1; + } + + } int shm_recv(shm_socket_t* socket, void **buf, int *size) { shm_msg_t src; -err_msg(0, "====shm_recv socket ==%p", socket); - bool rv = socket->messageQueue->pop(src); - if (rv) { + if (socket->messageQueue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); + return 0; + } else { + return -1; } - return 0; + } -- Gitblit v1.8.0