From 203df24a403a8c0cd8e93d0f33eaf10de2788969 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 10:22:46 +0800
Subject: [PATCH] add desub
---
test_socket/dgram_mod_bus.c | 14 ++++
src/socket/dgram_mod_socket.c | 23 +++++++
src/socket/include/dmod_socket.h | 12 ++++
src/socket/dmod_socket.c | 100 +++++++++++++++++++++++++-------
src/socket/include/dgram_mod_socket.h | 16 ++++-
5 files changed, 137 insertions(+), 28 deletions(-)
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index e733ff6..ff3f31f 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -144,6 +144,29 @@
/**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
+ */
+int dgram_mod_desub(void * _socket, void *topic, int size, int port){
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ return socket->m_socket->desub(topic, size, port);
+}
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ struct timespec timeout = {sec, nsec};
+ return socket->m_socket->desub_timeout(topic, size, port, &timeout);
+}
+int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ return socket->m_socket->desub_nowait(topic, size, port);
+}
+
+
+
+/**
* 鍙戝竷涓婚
* @topic 涓婚
* @content 涓婚鍐呭
diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c
index c1907ee..586e49f 100644
--- a/src/socket/dmod_socket.c
+++ b/src/socket/dmod_socket.c
@@ -135,15 +135,7 @@
// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
return 0;
}
-/**
- * @port 鎬荤嚎绔彛
- */
-int DModSocket::_sub_( void *topic, int size, int port,
- struct timespec *timeout, int flags) {
- char buf[8192];
- snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
- return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
-}
+
/**
* 璁㈤槄鎸囧畾涓婚
* @topic 涓婚
@@ -162,19 +154,25 @@
}
+
/**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
* @port 鎬荤嚎绔彛
*/
-int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,
- struct timespec *timeout, int flags) {
- int head_len;
- char buf[8192+content_size];
- snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
- head_len = strlen(buf);
- memcpy(buf+head_len, content, content_size);
- return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
-
+int DModSocket::desub( void *topic, int size, int port){
+ return _desub_( topic, size, port, NULL, 0);
}
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){
+ return _desub_(topic, size, port, timeout, 0);
+}
+int DModSocket::desub_nowait(void *topic, int size, int port) {
+ return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT);
+}
+
+
/**
* 鍙戝竷涓婚
@@ -203,9 +201,41 @@
-// ========================================================
+// =============================================================================
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int DModSocket::_sub_( void *topic, int size, int port,
+ struct timespec *timeout, int flags) {
+ char buf[8192];
+ snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+ return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+}
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int DModSocket::_desub_( void *topic, int size, int port,
+ struct timespec *timeout, int flags) {
+ char buf[8192];
+ snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+ return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+}
+
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,
+ struct timespec *timeout, int flags) {
+ int head_len;
+ char buf[8192+content_size];
+ snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+ head_len = strlen(buf);
+ memcpy(buf+head_len, content, content_size);
+ return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
+
+}
/*
* 澶勭悊璁㈤槄
*/
@@ -223,6 +253,22 @@
topic_sub_map->insert({topic, subscripter_set});
}
subscripter_set->insert(port);
+}
+
+/*
+ * 澶勭悊鍙栨秷璁㈤槄
+*/
+void DModSocket::_proxy_desub( char *topic, int port) {
+ SHMKeySet *subscripter_set;
+
+ SHMTopicSubMap::iterator map_iter;
+ // SHMKeySet::iterator set_iter;
+
+ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+ subscripter_set = map_iter->second;
+ subscripter_set->erase(port);
+ }
+
}
/*
@@ -281,15 +327,23 @@
if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
if(strcmp(action, "sub") == 0) {
// 璁㈤槄鏀寔澶氫富棰樿闃�
- topic = trim(strtok(topics, topic_delim), NULL);
+ topic = strtok(topics, topic_delim);
while(topic) {
- _proxy_sub( topic, port);
- topic = trim(strtok(NULL, topic_delim), NULL);
+ _proxy_sub(trim(topic, 0), port);
+ topic = strtok(NULL, topic_delim);
+ }
+
+ } else if(strcmp(action, "desub") == 0) {
+ // 璁㈤槄鏀寔澶氫富棰樿闃�
+ topic = strtok(topics, topic_delim);
+ while(topic) {
+ _proxy_desub(trim(topic, 0), port);
+ topic = strtok(NULL, topic_delim);
}
} else if(strcmp(action, "pub") == 0) {
_proxy_pub(topics, head_len, buf, size, port);
- }
+ }
free(action);
free(topics);
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
index 1670def..a43c197 100644
--- a/src/socket/include/dgram_mod_socket.h
+++ b/src/socket/include/dgram_mod_socket.h
@@ -56,10 +56,10 @@
* @port 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ;
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ;
-int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, int sec, int nsec) ;
+int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ;
/**
@@ -81,6 +81,16 @@
int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port);
+/**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
+ */
+int dgram_mod_desub(void * _socket, void *topic, int size, int port);
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec);
+int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port);
/**
* 鍙戝竷涓婚
diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h
index 5e97e2a..0998b1d 100644
--- a/src/socket/include/dmod_socket.h
+++ b/src/socket/include/dmod_socket.h
@@ -48,6 +48,8 @@
int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags);
int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
+ void _proxy_desub( char *topic, int port);
+ int _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb) ;
public:
DModSocket();
@@ -116,6 +118,16 @@
int sub_nowait(void *topic, int size, int port);
+ /**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
+ */
+ int desub( void *topic, int size, int port);
+ // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+ int desub_timeout(void *topic, int size, int port, struct timespec *timeout);
+ int desub_nowait(void *topic, int size, int port) ;
/**
* 鍙戝竷涓婚
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index 472ad46..5a549bc 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -49,7 +49,7 @@
long i = 0;
while (true) {
//printf("Usage: pub <topic> [content] or sub <topic>\n");
- printf("Can I help you? sub, pub or quit\n");
+ printf("Can I help you? sub, pub, desub or quit\n");
scanf("%s",action);
if(strcmp(action, "sub") == 0) {
@@ -59,6 +59,16 @@
printf("Sub success!\n");
} else {
printf("Sub failture!\n");
+ exit(0);
+ }
+
+ } else if(strcmp(action, "desub") == 0) {
+ printf("Please input topic!\n");
+ scanf("%s", topic);
+ if (dgram_mod_desub(socket, topic, strlen(topic), port) == 0) {
+ printf("Desub success!\n");
+ } else {
+ printf("Desub failture!\n");
exit(0);
}
@@ -76,7 +86,7 @@
} else if(strcmp(action, "quit") == 0) {
break;
} else {
- printf("error input\n");
+ printf("error input argument\n");
continue;
}
--
Gitblit v1.8.0