From 03987ef3d1ed9c2d604561a69db169cd535014b6 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 25 七月 2020 16:17:04 +0800
Subject: [PATCH] commit
---
test/test_set | 0
test/test_set.c | 37 +++++++++
test/test_vector | 0
src/libshm_queue.a | 0
src/socket/shm_socket.c | 1
lib/libusgcommon.a | 0
src/queue/include/shm_queue.h | 4
src/socket/dgram_mod_socket.c | 120 +++++++++++++++++++----------
test/test_vector.c | 22 +++++
test/strtok | 0
test_socket/dgram_mod_bus | 0
lib/libusgcommon.so | 0
test_socket/dgram_mod_survey | 0
test/Makefile | 2
test/strtok.c | 17 ++++
src/queue/include/lock_free_queue.h | 4
16 files changed, 160 insertions(+), 47 deletions(-)
diff --git a/lib/libusgcommon.a b/lib/libusgcommon.a
index f03c816..d2e4ae1 100644
--- a/lib/libusgcommon.a
+++ b/lib/libusgcommon.a
Binary files differ
diff --git a/lib/libusgcommon.so b/lib/libusgcommon.so
index 1106ad9..8e10ddc 100644
--- a/lib/libusgcommon.so
+++ b/lib/libusgcommon.so
Binary files differ
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index 60be46c..685264f 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/queue/include/lock_free_queue.h b/src/queue/include/lock_free_queue.h
index 6ba2a00..25a3392 100644
--- a/src/queue/include/lock_free_queue.h
+++ b/src/queue/include/lock_free_queue.h
@@ -117,7 +117,7 @@
/// @return true if the element was inserted in the queue. False if the queue was full
bool push(const ELEM_T &a_data);
bool push_nowait(const ELEM_T &a_data);
- bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+ bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
/// @brief pop the element at the head of the queue
/// @param a reference where the element in the head of the queue will be saved to
@@ -244,7 +244,7 @@
typename ELEM_T,
typename Allocator,
template <typename T, typename AT> class Q_TYPE>
-bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
{
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 26bf019..91e9530 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -32,7 +32,7 @@
inline bool push(const ELEM_T &a_data);
inline bool push_nowait(const ELEM_T &a_data);
- inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+ inline bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
inline bool pop(ELEM_T &a_data);
inline bool pop_nowait(ELEM_T &a_data);
inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
@@ -146,7 +146,7 @@
}
template < typename ELEM_T >
-inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
{
return queue->push_timeout(a_data, timeout);
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index c5e5a28..4856204 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -13,8 +13,8 @@
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
-static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
-void * run_accept_pubsub_request(void * _socket) ;
+
+static Logger logger = LoggerFactory::getLogger();
typedef struct dgram_mod_socket_t {
socket_mod_t mod;
@@ -24,6 +24,8 @@
std::map<std::string, std::set<int> *> *topic_sub_map;
} dgram_mod_socket_t;
+static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
+void * run_pubsub_proxy(dgram_mod_socket_t * socket) ;
void *dgram_mod_open_socket() {
dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t));
@@ -96,7 +98,7 @@
int start_bus(void * _socket) {
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
socket->topic_sub_map = new std::map<std::string, std::set<int> *>;
- run_accept_pubsub_request(_socket);
+ run_pubsub_proxy(socket);
// pthread_t tid;
// pthread_create(&tid, NULL, run_accept_sub_request, _socket);
return 0;
@@ -131,60 +133,96 @@
//==========================================================================================================================
-
-void * run_accept_pubsub_request(void * _socket) {
- // pthread_detach(pthread_self());
- dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- int size;
- int port;
- char * action, *topic, *buf;
-
- size_t head_len;
- // void * send_buf;
- int send_port;
- struct timespec timeout = {1,0};
+/*
+ * 澶勭悊璁㈤槄
+*/
+void _proxy_sub(dgram_mod_socket_t *socket, char *topic, int port) {
std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
std::set<int> *subscripter_set;
std::map<std::string, std::set<int> *>::iterator map_iter;
std::set<int>::iterator set_iter;
+
+ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+ subscripter_set = map_iter->second;
+ } else {
+ subscripter_set = new std::set<int>;
+ topic_sub_map->insert({topic, subscripter_set});
+ }
+ subscripter_set->insert(port);
+}
+
+/*
+ * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
+*/
+void _proxy_pub(dgram_mod_socket_t * socket, char *topic, size_t head_len, void *buf, size_t size, int port) {
+ std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
+ std::set<int> *subscripter_set;
+
+ std::map<std::string, std::set<int> *>::iterator map_iter;
+ std::set<int>::iterator set_iter;
+
+ std::vector<int> subscripter_to_del;
+ std::vector<int>::iterator vector_iter;
+
+ int send_port;
+ struct timespec timeout = {1,0};
+
+ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
+ subscripter_set = map_iter->second;
+ for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ send_port = *set_iter;
+// printf("run_accept_sub_request send before %d \n", send_port);
+ if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
+ //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
+ subscripter_to_del.push_back(send_port);
+ }
+// printf("run_accept_sub_request send after: %d \n", send_port);
+
+ }
+
+ // 鍒犻櫎宸插叧闂殑绔�
+ for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
+ if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
+ subscripter_set->erase(set_iter);
+ printf("remove closed subscripter %d \n", send_port);
+ }
+ }
+ subscripter_to_del.clear();
+
+ }
+}
+
+void * run_pubsub_proxy(dgram_mod_socket_t * socket) {
+ // pthread_detach(pthread_self());
+ int size;
+ int port;
+ char * action, *topic, *topics, *buf;
+ size_t head_len;
+
+ const char *topic_delim = ",";
//printf("server receive before\n");
while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) {
//printf("server recv after: %s \n", buf);
- if(parse_pubsub_topic(buf, size, &action, &topic, &head_len)) {
+ if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
if(strcmp(action, "sub") == 0) {
- if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
- subscripter_set = map_iter->second;
- } else {
- subscripter_set = new std::set<int>;
- topic_sub_map->insert({topic, subscripter_set});
- }
- subscripter_set->insert(port);
+ // 璁㈤槄鏀寔澶氫富棰樿闃�
+ topic = trim(strtok(topics, topic_delim), NULL);
+ while(topic) {
+ _proxy_sub(socket, topic, port);
+ topic = trim(strtok(NULL, topic_delim), NULL);
+ }
} else if(strcmp(action, "pub") == 0) {
- if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
- subscripter_set = map_iter->second;
- for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
- send_port = *set_iter;
- // send_buf = malloc(size-head_len);
- // memcpy(send_buf, buf+head_len, );
-printf("run_accept_sub_request send before %d \n", send_port);
- if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
-printf("erase %d \n", send_port);
- subscripter_set->erase(set_iter);
- set_iter++;
- }
-printf("run_accept_sub_request send after: %d \n", send_port);
-
- }
- }
+ _proxy_pub(socket, topics, head_len, buf, size, port);
}
- free(buf);
+
free(action);
- free(topic);
+ free(topics);
} else {
err_msg(0, "incorrect format msg");
}
+ free(buf);
}
return NULL;
}
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 9d54986..52251e6 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -483,7 +483,6 @@
delete client_socket->messageQueue;
client_socket->messageQueue = NULL;
- socket->clientSocketMap->erase(iter);
free((void *)client_socket);
}
delete socket->clientSocketMap;
diff --git a/test/Makefile b/test/Makefile
index 4b888f5..4bcb663 100755
--- a/test/Makefile
+++ b/test/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = protocle_parse test
+PROGS = protocle_parse strtok test_set test_vector
build: $(PROGS)
diff --git a/test/strtok b/test/strtok
new file mode 100755
index 0000000..df7f33e
--- /dev/null
+++ b/test/strtok
Binary files differ
diff --git a/test/strtok.c b/test/strtok.c
new file mode 100644
index 0000000..364548e
--- /dev/null
+++ b/test/strtok.c
@@ -0,0 +1,17 @@
+
+
+#include <string.h>
+#include <stdio.h>
+#include "usg_common.h"
+
+int main(void)
+{
+ // char input[] = "A bird came down the walk";
+ char input[] = "bird";
+ printf("Parsing the input string '%s'\n", input);
+ char *token = strtok(input, " ");
+ while(token) {
+ puts(token);
+ token = strtok(NULL, " ");
+ }
+ }
\ No newline at end of file
diff --git a/test/test_set b/test/test_set
new file mode 100755
index 0000000..c8b6db0
--- /dev/null
+++ b/test/test_set
Binary files differ
diff --git a/test/test_set.c b/test/test_set.c
new file mode 100644
index 0000000..f602902
--- /dev/null
+++ b/test/test_set.c
@@ -0,0 +1,37 @@
+
+#include "usg_common.h"
+#include <stdio.h>
+#include <string.h>
+int main() {
+ std::set<int> *subscripter_set = new std::set<int>;
+ std::set<int>::iterator set_iter;
+ int send_port;
+ int i =0;
+ for (i = 0; i< 10; i++) {
+ subscripter_set->insert(i);
+ }
+
+ for (i = 0, set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ send_port = *set_iter;
+
+ printf("send_port = %d \n", send_port);
+ if (send_port == 0 || send_port == 1 || send_port == 4 || send_port == 6 || send_port == 9) {
+ printf("erase %d \n", send_port);
+ subscripter_set->erase(set_iter);
+ // set_iter--;
+ // set_iter--;
+ // if(i != 0)
+ // set_iter--;
+ }
+ i++;
+ }
+
+printf("================================\n");
+
+ for (set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ send_port = *set_iter;
+
+ printf("send_port = %d \n", send_port);
+
+ }
+}
\ No newline at end of file
diff --git a/test/test_vector b/test/test_vector
new file mode 100755
index 0000000..c308410
--- /dev/null
+++ b/test/test_vector
Binary files differ
diff --git a/test/test_vector.c b/test/test_vector.c
new file mode 100644
index 0000000..b2b82cd
--- /dev/null
+++ b/test/test_vector.c
@@ -0,0 +1,22 @@
+#include <algorithm>
+#include <iostream>
+#include <vector>
+
+#include "usg_common.h"
+int main()
+{
+ std::vector<int> container{1, 2, 3};
+
+ auto print = [](const int& n) { std::cout << " " << n; };
+
+ std::cout << "Before clear:";
+ std::for_each(container.begin(), container.end(), print);
+ std::cout << "\nSize=" << container.size() << '\n';
+
+ std::cout << "Clear\n";
+ container.clear();
+
+ std::cout << "After clear:";
+ std::for_each(container.begin(), container.end(), print);
+ std::cout << "\nSize=" << container.size() << '\n';
+}
\ No newline at end of file
diff --git a/test_socket/dgram_mod_bus b/test_socket/dgram_mod_bus
index 8e5a8c8..34cfc06 100755
--- a/test_socket/dgram_mod_bus
+++ b/test_socket/dgram_mod_bus
Binary files differ
diff --git a/test_socket/dgram_mod_survey b/test_socket/dgram_mod_survey
index 6e179bb..eacb2c7 100755
--- a/test_socket/dgram_mod_survey
+++ b/test_socket/dgram_mod_survey
Binary files differ
--
Gitblit v1.8.0