From 203df24a403a8c0cd8e93d0f33eaf10de2788969 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 10:22:46 +0800
Subject: [PATCH] add desub

---
 src/socket/dmod_socket.c |  100 ++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 77 insertions(+), 23 deletions(-)

diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c
index c1907ee..586e49f 100644
--- a/src/socket/dmod_socket.c
+++ b/src/socket/dmod_socket.c
@@ -135,15 +135,7 @@
 	// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
 	return 0;
 }
-/**
- * @port 鎬荤嚎绔彛
- */
-int  DModSocket::_sub_( void *topic, int size, int port,  
-	struct timespec *timeout, int flags) {
-	char buf[8192];
-	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
-	return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
-}
+
 /**
  * 璁㈤槄鎸囧畾涓婚
  * @topic 涓婚
@@ -162,19 +154,25 @@
 }
 
 
+
 /**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
  * @port 鎬荤嚎绔彛
  */
-int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,  
-	struct timespec *timeout, int flags) {
-	int head_len;
-	char buf[8192+content_size];
-	snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
-	head_len = strlen(buf);
-	memcpy(buf+head_len, content, content_size);
-	return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
-
+int  DModSocket::desub( void *topic, int size, int port){
+	return _desub_( topic, size, port, NULL, 0);
 }
+// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
+int  DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){
+	return _desub_(topic, size, port, timeout, 0);
+}
+int  DModSocket::desub_nowait(void *topic, int size, int port) {
+	return _desub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
+}
+
+
 
 /**
  * 鍙戝竷涓婚
@@ -203,9 +201,41 @@
 
 
 
-// ========================================================
+// =============================================================================
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int  DModSocket::_sub_( void *topic, int size, int port,  
+	struct timespec *timeout, int flags) {
+	char buf[8192];
+	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+	return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+}
 
 
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int  DModSocket::_desub_( void *topic, int size, int port,  
+	struct timespec *timeout, int flags) {
+	char buf[8192];
+	snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+	return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+}
+
+/**
+ * @port 鎬荤嚎绔彛
+ */
+int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,  
+	struct timespec *timeout, int flags) {
+	int head_len;
+	char buf[8192+content_size];
+	snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+	head_len = strlen(buf);
+	memcpy(buf+head_len, content, content_size);
+	return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
+
+}
 /*
  * 澶勭悊璁㈤槄
 */
@@ -223,6 +253,22 @@
 		topic_sub_map->insert({topic, subscripter_set});
 	}
 	subscripter_set->insert(port);
+}
+
+/*
+ * 澶勭悊鍙栨秷璁㈤槄
+*/
+void DModSocket::_proxy_desub( char *topic, int port) {
+	SHMKeySet *subscripter_set;
+
+	SHMTopicSubMap::iterator map_iter;
+	// SHMKeySet::iterator set_iter;
+
+	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+		subscripter_set = map_iter->second;
+		subscripter_set->erase(port);
+	}
+ 
 }
 
 /*
@@ -281,15 +327,23 @@
 		if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 			if(strcmp(action, "sub") == 0) {
 				// 璁㈤槄鏀寔澶氫富棰樿闃�
-				topic = trim(strtok(topics, topic_delim), NULL);
+				topic = strtok(topics, topic_delim);
 			  while(topic) {
-	       _proxy_sub( topic, port);
-	        topic = trim(strtok(NULL, topic_delim), NULL);
+	       _proxy_sub(trim(topic, 0), port);
+	        topic =  strtok(NULL, topic_delim);
+			  }
+
+			} else if(strcmp(action, "desub") == 0) {
+				// 璁㈤槄鏀寔澶氫富棰樿闃�
+				topic = strtok(topics, topic_delim);
+			  while(topic) {
+	       _proxy_desub(trim(topic, 0), port);
+	        topic =  strtok(NULL, topic_delim);
 			  }
 
 			} else if(strcmp(action, "pub") == 0) {
 				_proxy_pub(topics, head_len, buf, size, port);
-			}
+			}  
 			
 			free(action);
 			free(topics);

--
Gitblit v1.8.0