From 203df24a403a8c0cd8e93d0f33eaf10de2788969 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 10:22:46 +0800
Subject: [PATCH] add desub

---
 test_socket/dgram_mod_bus.c           |   14 ++++
 src/socket/dgram_mod_socket.c         |   23 +++++++
 src/socket/include/dmod_socket.h      |   12 ++++
 src/socket/dmod_socket.c              |  100 +++++++++++++++++++++++++-------
 src/socket/include/dgram_mod_socket.h |   16 ++++-
 5 files changed, 137 insertions(+), 28 deletions(-)

diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index e733ff6..ff3f31f 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -144,6 +144,29 @@
 
 
 /**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
+ */
+int  dgram_mod_desub(void * _socket, void *topic, int size, int port){
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	return socket->m_socket->desub(topic,  size,  port);
+}
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int  dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	struct timespec timeout = {sec, nsec};
+	return socket->m_socket->desub_timeout(topic,  size,  port, &timeout);
+}
+int  dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	return socket->m_socket->desub_nowait(topic,  size,  port);
+}
+
+
+
+/**
  * 鍙戝竷涓婚
  * @topic 涓婚
  * @content 涓婚鍐呭
diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c
index c1907ee..586e49f 100644
--- a/src/socket/dmod_socket.c
+++ b/src/socket/dmod_socket.c
@@ -135,15 +135,7 @@
 	// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
 	return 0;
 }
-/**
- * @port 鎬荤嚎绔彛
- */
-int  DModSocket::_sub_( void *topic, int size, int port,  
-	struct timespec *timeout, int flags) {
-	char buf[8192];
-	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
-	return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
-}
+
 /**
  * 璁㈤槄鎸囧畾涓婚
  * @topic 涓婚
@@ -162,19 +154,25 @@
 }
 
 
+
 /**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
  * @port 鎬荤嚎绔彛
  */
-int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,  
-	struct timespec *timeout, int flags) {
-	int head_len;
-	char buf[8192+content_size];
-	snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
-	head_len = strlen(buf);
-	memcpy(buf+head_len, content, content_size);
-	return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
-
+int  DModSocket::desub( void *topic, int size, int port){
+	return _desub_( topic, size, port, NULL, 0);
 }
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int  DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){
+	return _desub_(topic, size, port, timeout, 0);
+}
+int  DModSocket::desub_nowait(void *topic, int size, int port) {
+	return _desub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
+}
+
+
 
 /**
  * 鍙戝竷涓婚
@@ -203,9 +201,41 @@
 
 
 
-// ========================================================
+// =============================================================================
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int  DModSocket::_sub_( void *topic, int size, int port,  
+	struct timespec *timeout, int flags) {
+	char buf[8192];
+	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+	return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+}
 
 
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int  DModSocket::_desub_( void *topic, int size, int port,  
+	struct timespec *timeout, int flags) {
+	char buf[8192];
+	snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+	return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+}
+
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,  
+	struct timespec *timeout, int flags) {
+	int head_len;
+	char buf[8192+content_size];
+	snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+	head_len = strlen(buf);
+	memcpy(buf+head_len, content, content_size);
+	return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
+
+}
 /*
  * 澶勭悊璁㈤槄
 */
@@ -223,6 +253,22 @@
 		topic_sub_map->insert({topic, subscripter_set});
 	}
 	subscripter_set->insert(port);
+}
+
+/*
+ * 澶勭悊鍙栨秷璁㈤槄
+*/
+void DModSocket::_proxy_desub( char *topic, int port) {
+	SHMKeySet *subscripter_set;
+
+	SHMTopicSubMap::iterator map_iter;
+	// SHMKeySet::iterator set_iter;
+
+	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+		subscripter_set = map_iter->second;
+		subscripter_set->erase(port);
+	}
+ 
 }
 
 /*
@@ -281,15 +327,23 @@
 		if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 			if(strcmp(action, "sub") == 0) {
 				// 璁㈤槄鏀寔澶氫富棰樿闃�
-				topic = trim(strtok(topics, topic_delim), NULL);
+				topic = strtok(topics, topic_delim);
 			  while(topic) {
-	       _proxy_sub( topic, port);
-	        topic = trim(strtok(NULL, topic_delim), NULL);
+	       _proxy_sub(trim(topic, 0), port);
+	        topic =  strtok(NULL, topic_delim);
+			  }
+
+			} else if(strcmp(action, "desub") == 0) {
+				// 璁㈤槄鏀寔澶氫富棰樿闃�
+				topic = strtok(topics, topic_delim);
+			  while(topic) {
+	       _proxy_desub(trim(topic, 0), port);
+	        topic =  strtok(NULL, topic_delim);
 			  }
 
 			} else if(strcmp(action, "pub") == 0) {
 				_proxy_pub(topics, head_len, buf, size, port);
-			}
+			}  
 			
 			free(action);
 			free(topics);
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
index 1670def..a43c197 100644
--- a/src/socket/include/dgram_mod_socket.h
+++ b/src/socket/include/dgram_mod_socket.h
@@ -56,10 +56,10 @@
  * @port 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ;
 // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ;
-int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, int sec, int nsec) ;
+int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ;
 
 
 /**
@@ -81,6 +81,16 @@
 int  dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port);
 
 
+/**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
+ * @port 鎬荤嚎绔彛
+ */
+int  dgram_mod_desub(void * _socket, void *topic, int size, int port);
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int  dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec);
+int  dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port);
 
 /**
  * 鍙戝竷涓婚
diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h
index 5e97e2a..0998b1d 100644
--- a/src/socket/include/dmod_socket.h
+++ b/src/socket/include/dmod_socket.h
@@ -48,6 +48,8 @@
 	int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags);
 	int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
 
+	void _proxy_desub( char *topic, int port);
+	int  _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
 	static void  foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)>  cb) ;
 public:
 	DModSocket();
@@ -116,6 +118,16 @@
 	int  sub_nowait(void *topic, int size, int port);
 
 
+	 /**
+	 * 鍙栨秷璁㈤槄鎸囧畾涓婚
+	 * @topic 涓婚
+	 * @size 涓婚闀垮害
+	 * @port 鎬荤嚎绔彛
+	 */
+	int desub( void *topic, int size, int port);
+	// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+	int desub_timeout(void *topic, int size, int port, struct timespec *timeout);
+	int desub_nowait(void *topic, int size, int port) ;
 
 	/**
 	 * 鍙戝竷涓婚
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index 472ad46..5a549bc 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -49,7 +49,7 @@
   long i = 0;
   while (true) {
     //printf("Usage: pub <topic> [content] or sub <topic>\n");
-    printf("Can I help you? sub, pub or quit\n");
+    printf("Can I help you? sub, pub, desub or quit\n");
     scanf("%s",action);
     
     if(strcmp(action, "sub") == 0) {
@@ -59,6 +59,16 @@
          printf("Sub success!\n");
       } else {
         printf("Sub failture!\n");
+        exit(0);
+      }
+     
+    } else if(strcmp(action, "desub") == 0) {
+      printf("Please input topic!\n");
+      scanf("%s", topic);
+      if (dgram_mod_desub(socket, topic, strlen(topic),  port) == 0) {
+         printf("Desub success!\n");
+      } else {
+        printf("Desub failture!\n");
         exit(0);
       }
      
@@ -76,7 +86,7 @@
     } else if(strcmp(action, "quit") == 0) {
       break;
     } else {
-      printf("error input\n");
+      printf("error input argument\n");
       continue;
     }
    

--
Gitblit v1.8.0