From 03987ef3d1ed9c2d604561a69db169cd535014b6 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 25 七月 2020 16:17:04 +0800
Subject: [PATCH] commit

---
 test/test_set                       |    0 
 test/test_set.c                     |   37 +++++++++
 test/test_vector                    |    0 
 src/libshm_queue.a                  |    0 
 src/socket/shm_socket.c             |    1 
 lib/libusgcommon.a                  |    0 
 src/queue/include/shm_queue.h       |    4 
 src/socket/dgram_mod_socket.c       |  120 +++++++++++++++++++----------
 test/test_vector.c                  |   22 +++++
 test/strtok                         |    0 
 test_socket/dgram_mod_bus           |    0 
 lib/libusgcommon.so                 |    0 
 test_socket/dgram_mod_survey        |    0 
 test/Makefile                       |    2 
 test/strtok.c                       |   17 ++++
 src/queue/include/lock_free_queue.h |    4 
 16 files changed, 160 insertions(+), 47 deletions(-)

diff --git a/lib/libusgcommon.a b/lib/libusgcommon.a
index f03c816..d2e4ae1 100644
--- a/lib/libusgcommon.a
+++ b/lib/libusgcommon.a
Binary files differ
diff --git a/lib/libusgcommon.so b/lib/libusgcommon.so
index 1106ad9..8e10ddc 100644
--- a/lib/libusgcommon.so
+++ b/lib/libusgcommon.so
Binary files differ
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index 60be46c..685264f 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/queue/include/lock_free_queue.h b/src/queue/include/lock_free_queue.h
index 6ba2a00..25a3392 100644
--- a/src/queue/include/lock_free_queue.h
+++ b/src/queue/include/lock_free_queue.h
@@ -117,7 +117,7 @@
     /// @return true if the element was inserted in the queue. False if the queue was full
     bool push(const ELEM_T &a_data);
     bool push_nowait(const ELEM_T &a_data);
-    bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+    bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
 
     /// @brief pop the element at the head of the queue
     /// @param a reference where the element in the head of the queue will be saved to
@@ -244,7 +244,7 @@
     typename ELEM_T, 
     typename Allocator,
     template <typename T, typename AT> class Q_TYPE>
-bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
 {
 
 
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 26bf019..91e9530 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -32,7 +32,7 @@
    
     inline bool push(const ELEM_T &a_data);
     inline bool push_nowait(const ELEM_T &a_data);
-    inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+    inline bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
     inline bool pop(ELEM_T &a_data);
     inline bool pop_nowait(ELEM_T &a_data);
     inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
@@ -146,7 +146,7 @@
 }
 
 template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
 {
 
     return queue->push_timeout(a_data, timeout);
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index c5e5a28..4856204 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -13,8 +13,8 @@
 #define TOPIC_LIDENTIFIER "{"
 #define TOPIC_RIDENTIFIER "}"
 
-static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
-void * run_accept_pubsub_request(void * _socket) ;
+
+static Logger logger = LoggerFactory::getLogger();
 
 typedef struct dgram_mod_socket_t {
 	socket_mod_t mod;
@@ -24,6 +24,8 @@
 	std::map<std::string, std::set<int> *> *topic_sub_map;
 } dgram_mod_socket_t;
 
+static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
+void * run_pubsub_proxy(dgram_mod_socket_t * socket) ;
 
 void *dgram_mod_open_socket() {
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t));
@@ -96,7 +98,7 @@
 int start_bus(void * _socket) {
 	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
 	socket->topic_sub_map = new std::map<std::string, std::set<int> *>;
-	run_accept_pubsub_request(_socket);
+	run_pubsub_proxy(socket);
 	// pthread_t tid;
 	// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
 	return 0;
@@ -131,60 +133,96 @@
 
 //==========================================================================================================================
 
-
-void * run_accept_pubsub_request(void * _socket) {
-	// pthread_detach(pthread_self());
-	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
-	int size;
-	int port;
-	char * action, *topic, *buf;
-
-	size_t head_len;
-	// void * send_buf;
-	int send_port;
-	struct timespec timeout = {1,0};
+/*
+ * 澶勭悊璁㈤槄
+*/
+void _proxy_sub(dgram_mod_socket_t *socket, char *topic, int port) {
 	std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
 	std::set<int> *subscripter_set;
 
 	std::map<std::string, std::set<int> *>::iterator map_iter;
 	std::set<int>::iterator set_iter;
+
+	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+		subscripter_set = map_iter->second;
+	} else {
+		subscripter_set = new std::set<int>;
+		topic_sub_map->insert({topic, subscripter_set});
+	}
+	subscripter_set->insert(port);
+}
+
+/*
+ * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
+*/
+void _proxy_pub(dgram_mod_socket_t * socket, char *topic, size_t head_len, void *buf, size_t size, int port) {
+	std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
+	std::set<int> *subscripter_set;
+
+	std::map<std::string, std::set<int> *>::iterator map_iter;
+	std::set<int>::iterator set_iter;
+
+	std::vector<int> subscripter_to_del;
+	std::vector<int>::iterator vector_iter;
+
+	int send_port;
+	struct timespec timeout = {1,0};
+
+	if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+		subscripter_set = map_iter->second;
+		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+			send_port = *set_iter;
+// printf("run_accept_sub_request send before %d \n", send_port);
+			if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
+				//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
+				subscripter_to_del.push_back(send_port);
+			}
+// printf("run_accept_sub_request send after: %d \n", send_port);
+			
+		}
+
+		// 鍒犻櫎宸插叧闂殑绔�
+		for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
+			if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
+				subscripter_set->erase(set_iter);
+				printf("remove closed subscripter %d \n", send_port);
+			}
+		}
+		subscripter_to_del.clear();
+
+	}
+}
+
+void * run_pubsub_proxy(dgram_mod_socket_t * socket) {
+	// pthread_detach(pthread_self());
+	int size;
+	int port;
+	char * action, *topic, *topics, *buf;
+	size_t head_len;
+
+	const char *topic_delim = ",";
 //printf("server receive before\n");
 	while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) {
 //printf("server recv after: %s \n", buf);
-		if(parse_pubsub_topic(buf, size, &action, &topic, &head_len)) {
+		if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 			if(strcmp(action, "sub") == 0) {
-				if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
-					subscripter_set = map_iter->second;
-				} else {
-					subscripter_set = new std::set<int>;
-					topic_sub_map->insert({topic, subscripter_set});
-				}
-				subscripter_set->insert(port);
+				// 璁㈤槄鏀寔澶氫富棰樿闃�
+				topic = trim(strtok(topics, topic_delim), NULL);
+			  while(topic) {
+	       _proxy_sub(socket, topic, port);
+	        topic = trim(strtok(NULL, topic_delim), NULL);
+			  }
 
 			} else if(strcmp(action, "pub") == 0) {
-				if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
-					subscripter_set = map_iter->second;
-					for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
-						send_port = *set_iter;
-						// send_buf = malloc(size-head_len);
-						// memcpy(send_buf, buf+head_len, );
-printf("run_accept_sub_request send before %d \n", send_port);
-						if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
-printf("erase %d \n", send_port);
-							subscripter_set->erase(set_iter);
-							set_iter++;
-						}
-printf("run_accept_sub_request send after: %d \n", send_port);
-						
-					}
-				}
+				_proxy_pub(socket, topics, head_len, buf, size, port);
 			}
-			free(buf);
+			
 			free(action);
-			free(topic);
+			free(topics);
 		} else {
 			err_msg(0, "incorrect format msg");
 		}
+		free(buf);
 	}
 	return NULL;
 }
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 9d54986..52251e6 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -483,7 +483,6 @@
 
       delete client_socket->messageQueue;
       client_socket->messageQueue = NULL;
-      socket->clientSocketMap->erase(iter);
       free((void *)client_socket);
     }
     delete socket->clientSocketMap;
diff --git a/test/Makefile b/test/Makefile
index 4b888f5..4bcb663 100755
--- a/test/Makefile
+++ b/test/Makefile
@@ -14,7 +14,7 @@
 include $(ROOT)/Make.defines.$(PLATFORM)
 
  
-PROGS = protocle_parse test
+PROGS = protocle_parse strtok test_set test_vector
 
 build: $(PROGS)
 
diff --git a/test/strtok b/test/strtok
new file mode 100755
index 0000000..df7f33e
--- /dev/null
+++ b/test/strtok
Binary files differ
diff --git a/test/strtok.c b/test/strtok.c
new file mode 100644
index 0000000..364548e
--- /dev/null
+++ b/test/strtok.c
@@ -0,0 +1,17 @@
+
+
+#include <string.h>
+#include <stdio.h>
+#include "usg_common.h"
+ 
+int main(void)
+{
+    // char input[] = "A bird came down the walk";
+	 char input[] = "bird";
+    printf("Parsing the input string '%s'\n", input);
+    char *token = strtok(input, " ");
+    while(token) {
+        puts(token);
+        token = strtok(NULL, " ");
+    }
+ }
\ No newline at end of file
diff --git a/test/test_set b/test/test_set
new file mode 100755
index 0000000..c8b6db0
--- /dev/null
+++ b/test/test_set
Binary files differ
diff --git a/test/test_set.c b/test/test_set.c
new file mode 100644
index 0000000..f602902
--- /dev/null
+++ b/test/test_set.c
@@ -0,0 +1,37 @@
+
+#include "usg_common.h"
+#include <stdio.h>
+#include <string.h>
+int main() {
+	std::set<int> *subscripter_set = new std::set<int>;
+	std::set<int>::iterator set_iter;
+	int send_port;
+	int i =0;
+	for (i = 0; i< 10; i++) {
+		subscripter_set->insert(i);
+	}
+
+  for (i = 0, set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+    send_port = *set_iter;
+
+    printf("send_port = %d \n", send_port);
+    if (send_port == 0 || send_port == 1  || send_port == 4 || send_port == 6 ||  send_port == 9) {
+      printf("erase %d \n", send_port);
+      subscripter_set->erase(set_iter);
+      // set_iter--;
+      // set_iter--;
+      // if(i != 0)
+      //  	set_iter--;
+    }
+    i++;
+  }
+
+printf("================================\n");
+
+  for (set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+    send_port = *set_iter;
+
+    printf("send_port = %d \n", send_port);
+    
+  }
+}
\ No newline at end of file
diff --git a/test/test_vector b/test/test_vector
new file mode 100755
index 0000000..c308410
--- /dev/null
+++ b/test/test_vector
Binary files differ
diff --git a/test/test_vector.c b/test/test_vector.c
new file mode 100644
index 0000000..b2b82cd
--- /dev/null
+++ b/test/test_vector.c
@@ -0,0 +1,22 @@
+#include <algorithm>
+#include <iostream>
+#include <vector>
+
+#include "usg_common.h"
+int main()
+{
+    std::vector<int> container{1, 2, 3};
+ 
+    auto print = [](const int& n) { std::cout << " " << n; };
+ 
+    std::cout << "Before clear:";
+    std::for_each(container.begin(), container.end(), print);
+    std::cout << "\nSize=" << container.size() << '\n';
+ 
+    std::cout << "Clear\n";
+    container.clear();
+ 
+    std::cout << "After clear:";
+    std::for_each(container.begin(), container.end(), print);
+    std::cout << "\nSize=" << container.size() << '\n';
+}
\ No newline at end of file
diff --git a/test_socket/dgram_mod_bus b/test_socket/dgram_mod_bus
index 8e5a8c8..34cfc06 100755
--- a/test_socket/dgram_mod_bus
+++ b/test_socket/dgram_mod_bus
Binary files differ
diff --git a/test_socket/dgram_mod_survey b/test_socket/dgram_mod_survey
index 6e179bb..eacb2c7 100755
--- a/test_socket/dgram_mod_survey
+++ b/test_socket/dgram_mod_survey
Binary files differ

--
Gitblit v1.8.0