From 9b9220321e647b381a999f67cad12345334b2cbe Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 13:06:23 +0800
Subject: [PATCH] update
---
src/socket/include/shm_socket.h | 1
src/socket/shm_socket.c | 6 ++
test/Makefile | 2
src/queue/include/shm_queue.h | 23 +++++--
src/socket/dgram_mod_socket.c | 9 +++
src/socket/include/dmod_socket.h | 11 +++
src/queue/include/shm_queue_wrapper.h | 7 +-
src/queue/shm_queue_wrapper.c | 8 +-
src/socket/dmod_socket.c | 85 ++++++++++++++++++++++++---
src/socket/include/dgram_mod_socket.h | 2
10 files changed, 125 insertions(+), 29 deletions(-)
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index ca5036f..5c82b05 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -38,9 +38,10 @@
inline ELEM_T &operator[](unsigned i);
- static void remove_queues_exclude(int keys[], size_t length);
- static void remove_queues_include(int keys[], size_t length);
- static void remove_queue(int key);
+ // @deprecate
+ static size_t remove_queues_exclude(int keys[], size_t length);
+ static size_t remove_queues(int keys[], size_t length);
+ static size_t remove_queue(int key);
private:
protected:
@@ -53,13 +54,15 @@
SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
};
+// @deprecate
template <typename ELEM_T>
-void SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
+size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
hashtable_t *hashtable = mm_get_hashtable();
std::set<int> *keyset = hashtable_keyset(hashtable);
std::set<int>::iterator keyItr;
LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
bool found;
+ size_t count = 0;
for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
found = false;
for (size_t i = 0; i < length; i++) {
@@ -73,29 +76,33 @@
mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
delete mqueue;
hashtable_remove(hashtable, *keyItr);
+ count++;
}
}
delete keyset;
+ return count;
}
-
template <typename ELEM_T>
-void SHMQueue<ELEM_T>::remove_queues_include(int keys[], size_t length) {
+size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
hashtable_t *hashtable = mm_get_hashtable();
LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
+ size_t count = 0;
for(int i = 0; i< length; i++) {
// 閿�姣佸叡浜唴瀛樼殑queue
mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
delete mqueue;
hashtable_remove(hashtable, keys[i]);
+ count++;
}
+ return count;
}
template <typename ELEM_T>
-void SHMQueue<ELEM_T>::remove_queue(int key) {
+size_t SHMQueue<ELEM_T>::remove_queue(int key) {
int keys[] = {key};
- remove_queues_include(keys, 1);
+ return remove_queues(keys, 1);
}
template <typename ELEM_T>
diff --git a/src/queue/include/shm_queue_wrapper.h b/src/queue/include/shm_queue_wrapper.h
index 9c84682..10d3b16 100644
--- a/src/queue/include/shm_queue_wrapper.h
+++ b/src/queue/include/shm_queue_wrapper.h
@@ -11,10 +11,11 @@
-//绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
+/**
+ * @depracate 宸插簾寮冧笉鐢�
+ * 绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
+ */
void shm_remove_queues_exclude(void *keys, int length);
-//绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
-void shm_remove_queue(int key);
/**
* 鍒涘缓闃熷垪
* @ shmqueue
diff --git a/src/queue/shm_queue_wrapper.c b/src/queue/shm_queue_wrapper.c
index 357e9e8..b617af4 100644
--- a/src/queue/shm_queue_wrapper.c
+++ b/src/queue/shm_queue_wrapper.c
@@ -16,13 +16,11 @@
-//绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
+// 绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪,
void shm_remove_queues_exclude(void *keys, int length) {
- SHMQueue<ele_t>::remove_queues_exclude((int*)keys, (size_t)length);
+ //SHMQueue<ele_t>::remove_queues_exclude((int*)keys, (size_t)length);
}
-void shm_remove_queue(int key) {
- SHMQueue<ele_t>::remove_queue(key);
-}
+
/**
* 鍒涘缓闃熷垪
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index ff3f31f..7a4511c 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -6,6 +6,15 @@
} dgram_mod_socket_t;
+
+int dgram_mod_remove_keys(int keys[], int length){
+ return DModSocket::remove_keys(keys, length);
+}
+
+int dgram_mod_remove_key(int key){
+ int keys[] = {key};
+ return DModSocket::remove_keys(keys, 1);
+}
/**
* 鍒涘缓socket
* @return socket鍦板潃
diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c
index 586e49f..3086dd0 100644
--- a/src/socket/dmod_socket.c
+++ b/src/socket/dmod_socket.c
@@ -19,15 +19,50 @@
}
}
+bool DModSocket::include_in_keys(int key, int keys[], size_t length) {
+ if(length == 0) {
+ return false;
+ }
+ for(int i = 0; i < length; i++) {
+ if(keys[i] == key)
+ return true;
+ }
+ return false;
+}
+
+size_t DModSocket::remove_subscripters(int keys[], size_t length) {
+ size_t count;
+ foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, SHMKeySet::iterator set_iter){
+ if (include_in_keys(*set_iter, keys, length)) {
+ subscripter_set->erase(set_iter);
+ count++;
+ }
+ });
+ return count;
+}
+
+
+size_t DModSocket::remove_keys(int keys[], size_t length) {
+ remove_subscripters(keys, length);
+ return shm_socket_remove_keys(keys, length);
+}
DModSocket::DModSocket() {
shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ bus_set = new std::set<int>;
}
DModSocket::~DModSocket() {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
-
+ struct timespec timeout = {1, 0};
+ if(bus_set != NULL) {
+ for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
+ desub_timeout(NULL, 0, *bus_iter, &timeout);
+ }
+ delete bus_set;
+ }
+
if(topic_sub_map != NULL) {
for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
subscripter_set = map_iter->second;
@@ -208,8 +243,13 @@
int DModSocket::_sub_( void *topic, int size, int port,
struct timespec *timeout, int flags) {
char buf[8192];
+ int rv;
snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
- return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+ rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
+ if(rv == 0) {
+ bus_set->insert(port);
+ }
+ return rv;
}
@@ -268,7 +308,20 @@
subscripter_set = map_iter->second;
subscripter_set->erase(port);
}
-
+}
+
+/*
+ * 澶勭悊鍙栨秷鎵�鏈夎闃�
+*/
+void DModSocket::_proxy_desub_all(int port) {
+ SHMKeySet *subscripter_set;
+
+ SHMTopicSubMap::iterator map_iter;
+ // SHMKeySet::iterator set_iter;
+ for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ subscripter_set = map_iter->second;
+ subscripter_set->erase(port);
+ }
}
/*
@@ -334,12 +387,20 @@
}
} else if(strcmp(action, "desub") == 0) {
- // 璁㈤槄鏀寔澶氫富棰樿闃�
- topic = strtok(topics, topic_delim);
- while(topic) {
- _proxy_desub(trim(topic, 0), port);
- topic = strtok(NULL, topic_delim);
- }
+ if(strcmp(trim(topics, 0), "") == 0) {
+ // 鍙栨秷鎵�鏈夎闃�
+ printf("鍙栨秷鎵�鏈夎闃�");
+ _proxy_desub_all(port);
+ } else {
+
+ topic = strtok(topics, topic_delim);
+ while(topic) {
+ _proxy_desub(trim(topic, 0), port);
+ topic = strtok(NULL, topic_delim);
+ }
+ }
+
+
} else if(strcmp(action, "pub") == 0) {
_proxy_pub(topics, head_len, buf, size, port);
@@ -410,12 +471,14 @@
return 0;
}
- char *topic = (char *)calloc(1, topic_len+1);
+ char *topic = (char *)malloc(topic_len+1);
strncpy(topic, topic_start_ptr, topic_len);
+ *topic = '\0';
*_topic = topic;
- char *action = (char *)calloc(1, action_len+1);
+ char *action = (char *)malloc(action_len+1);
strncpy(action, action_start_ptr, action_len);
+ *action = '\0';
*_action = action;
*head_len = ptr-str;
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
index a43c197..5a845dc 100644
--- a/src/socket/include/dgram_mod_socket.h
+++ b/src/socket/include/dgram_mod_socket.h
@@ -6,7 +6,9 @@
extern "C" {
#endif
+int dgram_mod_remove_keys(int keys[], int length);
+int dgram_mod_remove_key(int key);
/**
* 鍒涘缓socket
* @return socket鍦板潃
diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h
index 0998b1d..8eb0fbf 100644
--- a/src/socket/include/dmod_socket.h
+++ b/src/socket/include/dmod_socket.h
@@ -38,6 +38,7 @@
// pthread_t recv_thread;
// <涓婚锛� 璁㈤槄鑰�>
SHMTopicSubMap *topic_sub_map;
+ std::set<int> *bus_set;
private:
inline int _recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags);
@@ -49,8 +50,14 @@
int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
void _proxy_desub( char *topic, int port);
+ void _proxy_desub_all(int port);
int _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
- static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb) ;
+
+ static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb);
+ static bool include_in_keys(int key, int keys[], size_t length);
+ static size_t remove_subscripters(int keys[], size_t length) ;
+public:
+ static size_t remove_keys(int keys[], size_t length);
public:
DModSocket();
~DModSocket();
@@ -145,6 +152,8 @@
* 鑾峰彇soket绔彛鍙�
*/
int get_port() ;
+
+
};
#endif
\ No newline at end of file
diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h
index 6bb0448..fd67d9c 100644
--- a/src/socket/include/shm_socket.h
+++ b/src/socket/include/shm_socket.h
@@ -68,6 +68,7 @@
+size_t shm_socket_remove_keys(int keys[], size_t length);
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index be69413..ea2f674 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -30,6 +30,12 @@
SHMQueue<shm_msg_t> *_attach_remote_queue(int port);
+
+
+size_t shm_socket_remove_keys(int keys[], size_t length) {
+ return SHMQueue<shm_msg_t>::remove_queues(keys, length);
+}
+
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
socket->socket_type = socket_type;
diff --git a/test/Makefile b/test/Makefile
index ffc7b16..fc951f6 100755
--- a/test/Makefile
+++ b/test/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = protocle_parse strtok test_set test_vector lambda test
+PROGS = protocle_parse strtok test_set test_vector lambda test test_type
build: $(PROGS)
--
Gitblit v1.8.0