From 91ea20d03ebb5a8d20150d3ecc28a13c51ce93f1 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 19:17:06 +0800 Subject: [PATCH] update --- src/socket/shm_socket.cpp | 55 +++++++++++++++++++++++++------------------------------ 1 files changed, 25 insertions(+), 30 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 1b7721d..e370c72 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -30,8 +30,8 @@ static inline int _shm_socket_check_key(shm_socket_t *socket) { void *tmp_ptr = mm_get_by_key(socket->key); if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { - bus_errno = ESHM_BUS_KEY_INUSED; - logger->error("%s. key = %d ", bus_strerror(ESHM_BUS_KEY_INUSED), socket->key); + bus_errno = EBUS_KEY_INUSED; + logger->error("%s. key = %d ", bus_strerror(EBUS_KEY_INUSED), socket->key); return 0; } return 1; @@ -133,8 +133,8 @@ } else { if(!_shm_socket_check_key(socket)) { - bus_errno = ESHM_BUS_KEY_INUSED; - return ESHM_BUS_KEY_INUSED; + bus_errno = EBUS_KEY_INUSED; + return EBUS_KEY_INUSED; } } @@ -224,8 +224,8 @@ socket->key = hashtable_alloc_key(hashtable); } else { if(!_shm_socket_check_key(socket)) { - bus_errno = ESHM_BUS_KEY_INUSED; - return ESHM_BUS_KEY_INUSED; + bus_errno = EBUS_KEY_INUSED; + return EBUS_KEY_INUSED; } } @@ -338,8 +338,8 @@ } else { if(!_shm_socket_check_key(socket)) { - bus_errno = ESHM_BUS_KEY_INUSED; - return ESHM_BUS_KEY_INUSED; + bus_errno = EBUS_KEY_INUSED; + return EBUS_KEY_INUSED; } } @@ -350,10 +350,13 @@ if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0) err_exit(s, "shm_sendto : pthread_mutex_unlock"); - if (key == socket->key) { - logger->error( "can not send to your self!"); - return -1; - } + // There is some case where a socket need to send to himeself, for example when bus server need to stop, he need to send himself + // a top message. + + // if (key == socket->key) { + // logger->error( "can not send to your self!"); + // return -1; + // } SHMQueue<shm_msg_t> *remoteQueue; if ((remoteQueue = _attach_remote_queue(key)) == NULL) { @@ -370,9 +373,9 @@ memcpy(dest.buf, buf, size); - if(flags & SHM_MSG_NOWAIT != 0) { + if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { rv = remoteQueue->push_nowait(dest); - } else if(timeout != NULL) { + } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { rv = remoteQueue->push_timeout(dest, timeout); } else { rv = remoteQueue->push(dest); @@ -380,10 +383,8 @@ if (rv == 0) { // printf("shm_sendto push after\n"); - delete remoteQueue; return 0; } else { - delete remoteQueue; mm_free(dest.buf); if(rv > EBUS_BASE) { // bus_errno = EBUS_TIMEOUT; @@ -392,8 +393,6 @@ logger->error(rv, "sendto key %d failed", key); } return rv; - - } } @@ -419,8 +418,8 @@ } else { if(!_shm_socket_check_key(socket)) { - bus_errno = ESHM_BUS_KEY_INUSED; - return ESHM_BUS_KEY_INUSED; + bus_errno = EBUS_KEY_INUSED; + return EBUS_KEY_INUSED; } } @@ -432,9 +431,9 @@ shm_msg_t src; - if(flags & SHM_MSG_NOWAIT != 0) { + if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { rv = socket->queue->pop_nowait(src); - } else if(timeout != NULL) { + } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { rv = socket->queue->pop_timeout(src, timeout); // printf("0 shm_recvfrom====%d\n", rv); } else { @@ -648,7 +647,7 @@ switch (src.type) { case SHM_SOCKET_OPEN: - socket->acceptQueue->push_timeout(src, &timeout); + socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); break; case SHM_SOCKET_CLOSE: _server_close_conn_to_client(socket, src.key); @@ -659,7 +658,7 @@ if (iter != socket->clientSocketMap->end()) { client_socket = iter->second; // print_msg("_server_run_msg_rev push before", src); - client_socket->messageQueue->push_timeout(src, &timeout); + client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); // print_msg("_server_run_msg_rev push after", src); } @@ -694,7 +693,7 @@ _client_close_conn_to_server(socket); break; case SHM_COMMON_MSG: - socket->messageQueue->push_timeout(src, &timeout); + socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG); break; default: logger->error( "shm_socket._client_run_msg_rev: undefined message type."); @@ -722,10 +721,7 @@ socket->queue = NULL; } - if (socket->remoteQueue != NULL) { - delete socket->remoteQueue; - socket->remoteQueue = NULL; - } + if (socket->messageQueue != NULL) { delete socket->messageQueue; @@ -744,7 +740,6 @@ client_socket = iter->second; client_socket->remoteQueue->push_timeout(close_msg, &timeout); - delete client_socket->remoteQueue; client_socket->remoteQueue = NULL; delete client_socket->messageQueue; -- Gitblit v1.8.0