From 203df24a403a8c0cd8e93d0f33eaf10de2788969 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 06 八月 2020 10:22:46 +0800 Subject: [PATCH] add desub --- test_socket/dgram_mod_bus.c | 14 ++++ src/socket/dgram_mod_socket.c | 23 +++++++ src/socket/include/dmod_socket.h | 12 ++++ src/socket/dmod_socket.c | 100 +++++++++++++++++++++++++------- src/socket/include/dgram_mod_socket.h | 16 ++++- 5 files changed, 137 insertions(+), 28 deletions(-) diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c index e733ff6..ff3f31f 100644 --- a/src/socket/dgram_mod_socket.c +++ b/src/socket/dgram_mod_socket.c @@ -144,6 +144,29 @@ /** + * 鍙栨秷璁㈤槄鎸囧畾涓婚 + * @topic 涓婚 + * @size 涓婚闀垮害 + * @port 鎬荤嚎绔彛 + */ +int dgram_mod_desub(void * _socket, void *topic, int size, int port){ + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + return socket->m_socket->desub(topic, size, port); +} +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){ + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + struct timespec timeout = {sec, nsec}; + return socket->m_socket->desub_timeout(topic, size, port, &timeout); +} +int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){ + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + return socket->m_socket->desub_nowait(topic, size, port); +} + + + +/** * 鍙戝竷涓婚 * @topic 涓婚 * @content 涓婚鍐呭 diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c index c1907ee..586e49f 100644 --- a/src/socket/dmod_socket.c +++ b/src/socket/dmod_socket.c @@ -135,15 +135,7 @@ // pthread_create(&tid, NULL, run_accept_sub_request, _socket); return 0; } -/** - * @port 鎬荤嚎绔彛 - */ -int DModSocket::_sub_( void *topic, int size, int port, - struct timespec *timeout, int flags) { - char buf[8192]; - snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); - return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); -} + /** * 璁㈤槄鎸囧畾涓婚 * @topic 涓婚 @@ -162,19 +154,25 @@ } + /** + * 鍙栨秷璁㈤槄鎸囧畾涓婚 + * @topic 涓婚 + * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port, - struct timespec *timeout, int flags) { - int head_len; - char buf[8192+content_size]; - snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); - head_len = strlen(buf); - memcpy(buf+head_len, content, content_size); - return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); - +int DModSocket::desub( void *topic, int size, int port){ + return _desub_( topic, size, port, NULL, 0); } +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){ + return _desub_(topic, size, port, timeout, 0); +} +int DModSocket::desub_nowait(void *topic, int size, int port) { + return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); +} + + /** * 鍙戝竷涓婚 @@ -203,9 +201,41 @@ -// ======================================================== +// ============================================================================= +/** + * @port 鎬荤嚎绔彛 + */ +int DModSocket::_sub_( void *topic, int size, int port, + struct timespec *timeout, int flags) { + char buf[8192]; + snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); +} +/** + * @port 鎬荤嚎绔彛 + */ +int DModSocket::_desub_( void *topic, int size, int port, + struct timespec *timeout, int flags) { + char buf[8192]; + snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); +} + +/** + * @port 鎬荤嚎绔彛 + */ +int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port, + struct timespec *timeout, int flags) { + int head_len; + char buf[8192+content_size]; + snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + head_len = strlen(buf); + memcpy(buf+head_len, content, content_size); + return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); + +} /* * 澶勭悊璁㈤槄 */ @@ -223,6 +253,22 @@ topic_sub_map->insert({topic, subscripter_set}); } subscripter_set->insert(port); +} + +/* + * 澶勭悊鍙栨秷璁㈤槄 +*/ +void DModSocket::_proxy_desub( char *topic, int port) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + // SHMKeySet::iterator set_iter; + + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + subscripter_set = map_iter->second; + subscripter_set->erase(port); + } + } /* @@ -281,15 +327,23 @@ if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� - topic = trim(strtok(topics, topic_delim), NULL); + topic = strtok(topics, topic_delim); while(topic) { - _proxy_sub( topic, port); - topic = trim(strtok(NULL, topic_delim), NULL); + _proxy_sub(trim(topic, 0), port); + topic = strtok(NULL, topic_delim); + } + + } else if(strcmp(action, "desub") == 0) { + // 璁㈤槄鏀寔澶氫富棰樿闃� + topic = strtok(topics, topic_delim); + while(topic) { + _proxy_desub(trim(topic, 0), port); + topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "pub") == 0) { _proxy_pub(topics, head_len, buf, size, port); - } + } free(action); free(topics); diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h index 1670def..a43c197 100644 --- a/src/socket/include/dgram_mod_socket.h +++ b/src/socket/include/dgram_mod_socket.h @@ -56,10 +56,10 @@ * @port 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; +int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ; // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ; -int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; +int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, int sec, int nsec) ; +int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ; /** @@ -81,6 +81,16 @@ int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port); +/** + * 鍙栨秷璁㈤槄鎸囧畾涓婚 + * @topic 涓婚 + * @size 涓婚闀垮害 + * @port 鎬荤嚎绔彛 + */ +int dgram_mod_desub(void * _socket, void *topic, int size, int port); +// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 +int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec); +int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port); /** * 鍙戝竷涓婚 diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h index 5e97e2a..0998b1d 100644 --- a/src/socket/include/dmod_socket.h +++ b/src/socket/include/dmod_socket.h @@ -48,6 +48,8 @@ int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags); int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags); + void _proxy_desub( char *topic, int port); + int _desub_( void *topic, int size, int port, struct timespec *timeout, int flags); static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb) ; public: DModSocket(); @@ -116,6 +118,16 @@ int sub_nowait(void *topic, int size, int port); + /** + * 鍙栨秷璁㈤槄鎸囧畾涓婚 + * @topic 涓婚 + * @size 涓婚闀垮害 + * @port 鎬荤嚎绔彛 + */ + int desub( void *topic, int size, int port); + // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 + int desub_timeout(void *topic, int size, int port, struct timespec *timeout); + int desub_nowait(void *topic, int size, int port) ; /** * 鍙戝竷涓婚 diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c index 472ad46..5a549bc 100644 --- a/test_socket/dgram_mod_bus.c +++ b/test_socket/dgram_mod_bus.c @@ -49,7 +49,7 @@ long i = 0; while (true) { //printf("Usage: pub <topic> [content] or sub <topic>\n"); - printf("Can I help you? sub, pub or quit\n"); + printf("Can I help you? sub, pub, desub or quit\n"); scanf("%s",action); if(strcmp(action, "sub") == 0) { @@ -59,6 +59,16 @@ printf("Sub success!\n"); } else { printf("Sub failture!\n"); + exit(0); + } + + } else if(strcmp(action, "desub") == 0) { + printf("Please input topic!\n"); + scanf("%s", topic); + if (dgram_mod_desub(socket, topic, strlen(topic), port) == 0) { + printf("Desub success!\n"); + } else { + printf("Desub failture!\n"); exit(0); } @@ -76,7 +86,7 @@ } else if(strcmp(action, "quit") == 0) { break; } else { - printf("error input\n"); + printf("error input argument\n"); continue; } -- Gitblit v1.8.0