From b63ce299ddacea2ad487dc635926ed52ff422c20 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 03 八月 2020 11:40:17 +0800 Subject: [PATCH] add timeout nowait --- src/socket/include/shm_socket.h | 21 ++- src/socket/shm_socket.c | 38 +++++-- test_socket/test_timeout.c | 63 ++++++++++++ test_socket/dgram_mod_bus.c | 17 ++ src/socket/dgram_mod_socket.c | 106 ++++++++++++++++++--- src/util/sem_util.c | 8 src/queue/include/shm_queue_wrapper.h | 4 src/socket/include/dgram_mod_socket.h | 19 +++ test_socket/Makefile | 2 9 files changed, 229 insertions(+), 49 deletions(-) diff --git a/src/queue/include/shm_queue_wrapper.h b/src/queue/include/shm_queue_wrapper.h index 984bd5a..64e2404 100644 --- a/src/queue/include/shm_queue_wrapper.h +++ b/src/queue/include/shm_queue_wrapper.h @@ -55,7 +55,7 @@ int shmqueue_push(void * _shmqueue, void *src, int size); /** - * 鍏ラ槦, 闃熷垪婊℃椂绔嬪嵆杩斿洖. + * 鍏ラ槦, 绔嬪埢杩斿洖 * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 */ int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ; @@ -75,7 +75,7 @@ int shmqueue_pop(void * _shmqueue, void **dest, int *size); /** - * 鍑洪槦, 闃熷垪绌烘椂绔嬪嵆杩斿洖 + * 鍑洪槦, 绔嬪埢杩斿洖 * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 */ int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ; diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c index 4c79df2..5efc9ca 100644 --- a/src/socket/dgram_mod_socket.c +++ b/src/socket/dgram_mod_socket.c @@ -83,43 +83,78 @@ return shm_socket_force_bind(socket->shm_socket, port); } + + int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return shm_sendto(socket->shm_socket, buf, size, port); + return shm_sendto(socket->shm_socket, buf, size, port, NULL, 0); } -int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { +int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + struct timespec timeout = {sec, nsec}; + return shm_sendto(socket->shm_socket, buf, size, port, &timeout, 0); + +} + +int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + return shm_sendto(socket->shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); + +} + +static inline int _dgram_mod_recvfrom_(void *_socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; if(socket->mod == BUS) { err_exit(0, "Can not use method recvfrom in a Bus"); } // printf("dgram_mod_recvfrom before\n"); - int rv = shm_recvfrom(socket->shm_socket, buf, size, port); + int rv = shm_recvfrom(socket->shm_socket, buf, size, port, timeout, flags); // printf("dgram_mod_recvfrom after\n"); return rv; +} + +int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { + return _dgram_mod_recvfrom_(_socket, buf, size, port, NULL, 0); +} + +int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec) { + struct timespec timeout = {sec, nsec}; + return _dgram_mod_recvfrom_(_socket, buf, size, port, &timeout, 0); +} + +int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port) { + return _dgram_mod_recvfrom_(_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); } -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) { +int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, + void **recv_buf, int *recv_size) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size); + return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); } - - -int dgram_mod_get_port(void * _socket) { +int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, + void **recv_buf, int *recv_size, int sec, int nsec) { + struct timespec timeout = {sec, nsec}; dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->shm_socket->port; + return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, &timeout, 0); + +} + +int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, + void **recv_buf, int *recv_size) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); + } -void dgram_mod_free(void *buf) { - free(buf); -} +// =================bus======================== int dgram_mod_start_bus(void * _socket) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; @@ -136,17 +171,32 @@ /** * @port 鎬荤嚎绔彛 */ -int dgram_mod_sub(void * _socket, void *topic, int size, int port) { +static int _dgram_mod_sub_(void * _socket, void *topic, int size, int port, + struct timespec *timeout, int flags) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; char buf[8192]; snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); - return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port); + return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port, timeout, flags); +} + +int dgram_mod_sub(void * _socket, void *topic, int size, int port ) { + return _dgram_mod_sub_(_socket, topic, size, port, NULL, 0); +} + +int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec) { + struct timespec timeout = {sec, nsec}; + return _dgram_mod_sub_(_socket, topic, size, port, &timeout, 0); +} + +int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port) { + return _dgram_mod_sub_(_socket, topic, size, port, NULL, (int)SHM_MSG_NOWAIT); } /** * @port 鎬荤嚎绔彛 */ -int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { +static int _dgram_mod_pub_(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, + struct timespec *timeout, int flags) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; int head_len; @@ -154,10 +204,34 @@ snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); head_len = strlen(buf); memcpy(buf+head_len, content, content_size); - return shm_sendto(socket->shm_socket, buf, head_len+content_size, port); + return shm_sendto(socket->shm_socket, buf, head_len+content_size, port, timeout, flags); } +int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { + return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, NULL, 0); +} + +int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec) { + struct timespec timeout = {sec, nsec}; + return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, &timeout, 0); +} + +int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { + return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); +} + + + +int dgram_mod_get_port(void * _socket) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + return socket->shm_socket->port; +} + + +void dgram_mod_free(void *buf) { + free(buf); +} //========================================================================================================================== diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h index 21d0d20..1670def 100644 --- a/src/socket/include/dgram_mod_socket.h +++ b/src/socket/include/dgram_mod_socket.h @@ -36,7 +36,10 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); - +// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec); +// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� +int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port); /** * 鎺ユ敹淇℃伅 @@ -44,6 +47,9 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); +// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec); +int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port); /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 @@ -51,6 +57,9 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ; +int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; /** @@ -67,6 +76,11 @@ * @port 鎬荤嚎绔彛 */ int dgram_mod_sub(void * _socket, void *topic, int size, int port); +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec); +int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port); + + /** * 鍙戝竷涓婚 @@ -75,6 +89,9 @@ * @port 鎬荤嚎绔彛 */ int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec); +int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); /** diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h index 0ca4cfd..4852b27 100644 --- a/src/socket/include/shm_socket.h +++ b/src/socket/include/shm_socket.h @@ -5,9 +5,7 @@ #include "usg_typedef.h" #include "shm_queue.h" -#ifdef __cplusplus -extern "C" { -#endif + enum shm_msg_type_t { @@ -16,6 +14,12 @@ SHM_SOCKET_CLOSE = 3, SHM_COMMON_MSG = 4 +}; + +enum shm_socket_flag_t +{ + SHM_MSG_TIMEOUT = 1, + SHM_MSG_NOWAIT = 2 }; enum shm_socket_type_t @@ -77,17 +81,16 @@ int shm_send(shm_socket_t * socket, const void *buf, const int size) ; -int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size); int shm_recv(shm_socket_t * socket, void **buf, int *size) ; -int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL); +int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0); -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port); +int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec * timeout = NULL, int flags=0); + +int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, + struct timespec * timeout = NULL, int flags=0); -#ifdef __cplusplus -} -#endif #endif \ No newline at end of file diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 4a705ad..f95379a 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -5,17 +5,19 @@ static Logger logger = LoggerFactory::getLogger(); + + void print_msg(char *head, shm_msg_t &msg) { // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); } -void *_server_run_msg_rev(void *_socket); +static void *_server_run_msg_rev(void *_socket); -void *_client_run_msg_rev(void *_socket); +static void *_client_run_msg_rev(void *_socket); -int _shm_close_dgram_socket(shm_socket_t *socket); +static int _shm_close_dgram_socket(shm_socket_t *socket); -int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); +static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); static inline int _shm_socket_check_key(shm_socket_t *socket) { void *tmp_ptr = mm_get_by_key(socket->port); @@ -239,9 +241,10 @@ } } + // 鐭繛鎺ユ柟寮忓彂閫� int shm_sendto(shm_socket_t *socket, const void *buf, const int size, - const int port, const struct timespec *timeout) { + const int port, const struct timespec *timeout, const int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is " "not a SHM_SOCKET_DGRAM socket ", @@ -273,12 +276,14 @@ SHMQueue<shm_msg_t> *remoteQueue; if ((remoteQueue = _attach_remote_queue(port)) == NULL) { - err_msg(0, "shm_sendto failed, then other end has been closed!"); + err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!"); return -1; } // printf("shm_sendto push before\n"); bool rv; - if(timeout != NULL) { + if(flags & SHM_MSG_NOWAIT != 0) { + rv = remoteQueue->push_nowait(dest); + } else if(timeout != NULL) { rv = remoteQueue->push_timeout(dest, timeout); } else { rv = remoteQueue->push(dest); @@ -296,7 +301,7 @@ } // 鐭繛鎺ユ柟寮忔帴鍙� -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port) { +int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which " "is not a SHM_SOCKET_DGRAM socket ", @@ -316,7 +321,16 @@ shm_msg_t src; // printf("shm_recvfrom pop before\n"); - if (socket->queue->pop(src)) { + bool rv; + if(flags & SHM_MSG_NOWAIT != 0) { + rv = socket->queue->pop_nowait(src); + } else if(timeout != NULL) { + rv = socket->queue->pop_timeout(src, timeout); + } else { + rv = socket->queue->pop(src); + } + + if (rv) { void *_buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; @@ -332,7 +346,7 @@ int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, - int *recv_size) { + int *recv_size, struct timespec *timeout, int flags) { if (socket->socket_type != SHM_SOCKET_DGRAM) { err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " "which is not a SHM_SOCKET_DGRAM socket ", @@ -342,8 +356,8 @@ int rv; shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); - if (shm_sendto(tmp_socket, send_buf, send_size, send_port) == 0) { - rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port); + if (shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags) == 0) { + rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags); shm_close_socket(tmp_socket); return rv; } diff --git a/src/util/sem_util.c b/src/util/sem_util.c index 1dcf00d..c73d1e1 100644 --- a/src/util/sem_util.c +++ b/src/util/sem_util.c @@ -72,7 +72,7 @@ /* Reserve semaphore - decrement it by 1 */ int SemUtil::dec(int semId) { -logger.debug("%d: SemUtil::dec\n", semId); +// logger.debug("%d: SemUtil::dec\n", semId); struct sembuf sops; sops.sem_num = 0; @@ -81,7 +81,7 @@ while (semop(semId, &sops, 1) == -1) if (errno != EINTR) { - err_msg(errno, "SemUtil::dec"); + // err_msg(errno, "SemUtil::dec"); return -1; } @@ -97,7 +97,7 @@ while (semop(semId, &sops, 1) == -1) if (errno != EINTR) { - err_msg(errno, "SemUtil::dec_nowait"); + // err_msg(errno, "SemUtil::dec_nowait"); return -1; } @@ -113,7 +113,7 @@ while (semtimedop(semId, &sops, 1, timeout) == -1) if (errno != EINTR) { - err_msg(errno, "SemUtil::dec_timeout"); + //err_msg(errno, "SemUtil::dec_timeout"); return -1; } diff --git a/test_socket/Makefile b/test_socket/Makefile index 0c155b1..a63b4f7 100644 --- a/test_socket/Makefile +++ b/test_socket/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = dgram_mod_bus dgram_mod_survey dgram_mod_req_rep +PROGS = dgram_mod_bus dgram_mod_survey dgram_mod_req_rep test_timeout build: $(PROGS) diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c index 70f09c4..472ad46 100644 --- a/test_socket/dgram_mod_bus.c +++ b/test_socket/dgram_mod_bus.c @@ -55,15 +55,24 @@ if(strcmp(action, "sub") == 0) { printf("Please input topic!\n"); scanf("%s", topic); - dgram_mod_sub(socket, topic, strlen(topic), port); - printf("Sub success!\n"); + if (dgram_mod_sub(socket, topic, strlen(topic), port) == 0) { + printf("Sub success!\n"); + } else { + printf("Sub failture!\n"); + exit(0); + } + } else if(strcmp(action, "pub") == 0) { // printf("%s %s %s\n", action, topic, content); printf("Please input topic and content\n"); scanf("%s %s", topic, content); - dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port); - printf("Pub success!\n"); + if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port) == 0){ + printf("Pub success!\n"); + } else { + printf("Pub failture!\n"); + } + } else if(strcmp(action, "quit") == 0) { break; } else { diff --git a/test_socket/test_timeout.c b/test_socket/test_timeout.c new file mode 100644 index 0000000..e48f0b4 --- /dev/null +++ b/test_socket/test_timeout.c @@ -0,0 +1,63 @@ +#include "dgram_mod_socket.h" +#include "shm_mm.h" +#include "usg_common.h" + +void server(int port) { + void *socket = dgram_mod_open_socket(); + dgram_mod_bind(socket, port); + int size; + void *recvbuf; + char sendbuf[512]; + int rv; + int remote_port; + if ( (rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 5, 0) ) == 0) { + printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); + free(recvbuf); + } else { + printf("RECEIVED failture, timeout\n"); + } + sleep(100); + dgram_mod_close_socket(socket); +} + +void client(int port) { + void *socket = dgram_mod_open_socket(); + int size; + char sendbuf[512]; + long i = 0; + while (true) { + sprintf(sendbuf, "%d", i); + + if(dgram_mod_sendto_timeout(socket, sendbuf, strlen(sendbuf) + 1, port, 2, 0) == 0) { + printf("SEND HEART:%s\n", sendbuf); + printf("send success\n"); + } else { + printf("send failture, timeout\n"); + } + i++; + } + dgram_mod_close_socket(socket); +} + + + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + + + return 0; +} \ No newline at end of file -- Gitblit v1.8.0