From ff4991e1f942a3f1281330e21bf437b4b8558094 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 15:22:55 +0800
Subject: [PATCH] add remove_keys
---
test_socket/dgram_mod_bus.c | 29 ++++--
src/socket/dgram_mod_socket.c | 18 ++--
src/socket/include/dmod_socket.h | 26 +++---
test/test_type.c | 15 +++
test/test.c | 6 +
src/socket/dmod_socket.c | 89 +++++++++++++++-------
6 files changed, 122 insertions(+), 61 deletions(-)
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
index 7a4511c..0f988b5 100644
--- a/src/socket/dgram_mod_socket.c
+++ b/src/socket/dgram_mod_socket.c
@@ -137,17 +137,17 @@
*/
int dgram_mod_sub(void * _socket, void *topic, int size, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->sub(topic, size, port);
+ return socket->m_socket->sub((char *)topic, size, port);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
struct timespec timeout = {sec, nsec};
- return socket->m_socket->sub_timeout(topic, size, port, &timeout);
+ return socket->m_socket->sub_timeout((char *)topic, size, port, &timeout);
}
int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->sub_nowait(topic, size, port);
+ return socket->m_socket->sub_nowait((char *)topic, size, port);
}
@@ -160,17 +160,17 @@
*/
int dgram_mod_desub(void * _socket, void *topic, int size, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->desub(topic, size, port);
+ return socket->m_socket->desub((char *)topic, size, port);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
struct timespec timeout = {sec, nsec};
- return socket->m_socket->desub_timeout(topic, size, port, &timeout);
+ return socket->m_socket->desub_timeout((char *)topic, size, port, &timeout);
}
int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->desub_nowait(topic, size, port);
+ return socket->m_socket->desub_nowait((char *)topic, size, port);
}
@@ -183,17 +183,17 @@
*/
int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->pub(topic, topic_size, content, content_size, port);
+ return socket->m_socket->pub((char *)topic, topic_size, content, content_size, port);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
struct timespec timeout = {sec, nsec};
- return socket->m_socket->pub_timeout(topic, topic_size, content, content_size, port, &timeout);
+ return socket->m_socket->pub_timeout((char *)topic, topic_size, content, content_size, port, &timeout);
}
int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port){
dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
- return socket->m_socket->pub_nowait(topic, topic_size, content, content_size, port);
+ return socket->m_socket->pub_nowait((char *)topic, topic_size, content, content_size, port);
}
diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c
index 3086dd0..df57a9f 100644
--- a/src/socket/dmod_socket.c
+++ b/src/socket/dmod_socket.c
@@ -1,7 +1,7 @@
#include "dmod_socket.h"
-void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb) {
+void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
SHMKeySet *subscripter_set;
SHMKeySet::iterator set_iter;
@@ -12,7 +12,7 @@
subscripter_set = map_iter->second;
if(subscripter_set != NULL) {
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
- cb(subscripter_set, set_iter);
+ cb(subscripter_set, *set_iter);
}
}
}
@@ -31,14 +31,38 @@
}
size_t DModSocket::remove_subscripters(int keys[], size_t length) {
- size_t count;
- foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, SHMKeySet::iterator set_iter){
- if (include_in_keys(*set_iter, keys, length)) {
- subscripter_set->erase(set_iter);
- count++;
+ size_t count = 0;
+ int key;
+ for(int i = 0; i < length; i++) {
+ key = keys[i];
+ SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
+ SHMKeySet *subscripter_set;
+ SHMKeySet::iterator set_iter;
+ SHMTopicSubMap::iterator map_iter;
+
+ if(topic_sub_map != NULL) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ subscripter_set = map_iter->second;
+ if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
+ subscripter_set->erase(set_iter);
+// printf("remove_subscripter %s, %d\n", map_iter->first, key);
+ count++;
+ }
+ }
}
- });
+ }
return count;
+// foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, int key){
+// printf("foreach===========\n");
+// if (include_in_keys(key, keys, length)) {
+
+// //subscripter_set->erase(key);
+// printf("remove_subscripter %d\n", key);
+// count++;
+// }
+// });
+// printf("remove_subscripters count = %d\n", count);
+
}
@@ -177,14 +201,14 @@
* @size 涓婚闀垮害
* @port 鎬荤嚎绔彛
*/
-int DModSocket::sub( void *topic, int size, int port){
+int DModSocket::sub(char *topic, int size, int port){
return _sub_( topic, size, port, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int DModSocket::sub_timeout(void *topic, int size, int port, struct timespec *timeout){
+int DModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){
return _sub_(topic, size, port, timeout, 0);
}
-int DModSocket::sub_nowait(void *topic, int size, int port) {
+int DModSocket::sub_nowait(char *topic, int size, int port) {
return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT);
}
@@ -196,14 +220,14 @@
* @size 涓婚闀垮害
* @port 鎬荤嚎绔彛
*/
-int DModSocket::desub( void *topic, int size, int port){
+int DModSocket::desub(char *topic, int size, int port){
return _desub_( topic, size, port, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){
+int DModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){
return _desub_(topic, size, port, timeout, 0);
}
-int DModSocket::desub_nowait(void *topic, int size, int port) {
+int DModSocket::desub_nowait(char *topic, int size, int port) {
return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT);
}
@@ -215,14 +239,14 @@
* @content 涓婚鍐呭
* @port 鎬荤嚎绔彛
*/
-int DModSocket::pub(void *topic, int topic_size, void *content, int content_size, int port){
+int DModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){
return _pub_(topic, topic_size, content, content_size, port, NULL, 0);
}
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int DModSocket::pub_timeout(void *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){
+int DModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){
return _pub_( topic, topic_size, content, content_size, port, timeout, 0);
}
-int DModSocket::pub_nowait(void *topic, int topic_size, void *content, int content_size, int port){
+int DModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){
return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT);
}
@@ -240,11 +264,11 @@
/**
* @port 鎬荤嚎绔彛
*/
-int DModSocket::_sub_( void *topic, int size, int port,
+int DModSocket::_sub_(char *topic, int size, int port,
struct timespec *timeout, int flags) {
char buf[8192];
int rv;
- snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+ snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
if(rv == 0) {
bus_set->insert(port);
@@ -256,21 +280,24 @@
/**
* @port 鎬荤嚎绔彛
*/
-int DModSocket::_desub_( void *topic, int size, int port,
+int DModSocket::_desub_(char *topic, int size, int port,
struct timespec *timeout, int flags) {
char buf[8192];
- snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+ if(topic == NULL) {
+ topic = "";
+ }
+ snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
}
/**
* @port 鎬荤嚎绔彛
*/
-int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,
+int DModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port,
struct timespec *timeout, int flags) {
int head_len;
char buf[8192+content_size];
- snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
+ snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
head_len = strlen(buf);
memcpy(buf+head_len, content, content_size);
return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
@@ -284,7 +311,7 @@
SHMTopicSubMap::iterator map_iter;
SHMKeySet::iterator set_iter;
-
+printf("_proxy_sub topic = %s\n", topic);
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
} else {
@@ -306,7 +333,9 @@
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
+
subscripter_set->erase(port);
+printf("============ desub %d\n", port);
}
}
@@ -321,6 +350,7 @@
for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
subscripter_set = map_iter->second;
subscripter_set->erase(port);
+printf("============ desub %d\n", port);
}
}
@@ -376,20 +406,23 @@
const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) {
-// printf("run_pubsub_proxy server recv after: %s \n", buf);
+//printf("run_pubsub_proxy server recv after: %s \n", buf);
if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
+// printf("run_pubsub_proxy %s %s \n", action, topics);
if(strcmp(action, "sub") == 0) {
// 璁㈤槄鏀寔澶氫富棰樿闃�
topic = strtok(topics, topic_delim);
+//printf("run_pubsub_proxy topic = %s\n", topic);
while(topic) {
_proxy_sub(trim(topic, 0), port);
topic = strtok(NULL, topic_delim);
}
} else if(strcmp(action, "desub") == 0) {
+printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
if(strcmp(trim(topics, 0), "") == 0) {
// 鍙栨秷鎵�鏈夎闃�
- printf("鍙栨秷鎵�鏈夎闃�");
+ printf("====鍙栨秷鎵�鏈夎闃匼n");
_proxy_desub_all(port);
} else {
@@ -473,12 +506,12 @@
char *topic = (char *)malloc(topic_len+1);
strncpy(topic, topic_start_ptr, topic_len);
- *topic = '\0';
+ *(topic+topic_len) = '\0';
*_topic = topic;
char *action = (char *)malloc(action_len+1);
strncpy(action, action_start_ptr, action_len);
- *action = '\0';
+ *(action+action_len) = '\0';
*_action = action;
*head_len = ptr-str;
diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h
index 8eb0fbf..e34e4cc 100644
--- a/src/socket/include/dmod_socket.h
+++ b/src/socket/include/dmod_socket.h
@@ -46,14 +46,14 @@
void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port);
void *run_pubsub_proxy();
int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
- int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags);
- int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
+ int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags);
+ int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
void _proxy_desub( char *topic, int port);
void _proxy_desub_all(int port);
- int _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
+ int _desub_( char *topic, int size, int port, struct timespec *timeout, int flags);
- static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb);
+ static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb);
static bool include_in_keys(int key, int keys[], size_t length);
static size_t remove_subscripters(int keys[], size_t length) ;
public:
@@ -119,10 +119,10 @@
* @size 涓婚闀垮害
* @port 鎬荤嚎绔彛
*/
- int sub(void *topic, int size, int port);
+ int sub(char *topic, int size, int port);
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int sub_timeout(void *topic, int size, int port, struct timespec *timeout);
- int sub_nowait(void *topic, int size, int port);
+ int sub_timeout(char *topic, int size, int port, struct timespec *timeout);
+ int sub_nowait(char *topic, int size, int port);
/**
@@ -131,10 +131,10 @@
* @size 涓婚闀垮害
* @port 鎬荤嚎绔彛
*/
- int desub( void *topic, int size, int port);
+ int desub( char *topic, int size, int port);
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int desub_timeout(void *topic, int size, int port, struct timespec *timeout);
- int desub_nowait(void *topic, int size, int port) ;
+ int desub_timeout(char *topic, int size, int port, struct timespec *timeout);
+ int desub_nowait(char *topic, int size, int port) ;
/**
* 鍙戝竷涓婚
@@ -142,10 +142,10 @@
* @content 涓婚鍐呭
* @port 鎬荤嚎绔彛
*/
- int pub(void *topic, int topic_size, void *content, int content_size, int port);
+ int pub(char *topic, int topic_size, void *content, int content_size, int port);
// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int pub_timeout(void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout);
- int pub_nowait(void *topic, int topic_size, void *content, int content_size, int port);
+ int pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout);
+ int pub_nowait(char *topic, int topic_size, void *content, int content_size, int port);
/**
diff --git a/test/test.c b/test/test.c
index b978ef2..1f7533b 100644
--- a/test/test.c
+++ b/test/test.c
@@ -2,4 +2,10 @@
#include "usg_typedef.h"
int main() {
+
+ char buf[1024];
+ sprintf(buf, "%s\n", (char*)"" );
+ printf(buf);
+ int d = strcmp(trim("", 0), "");
+ printf("%d\n", d);
}
\ No newline at end of file
diff --git a/test/test_type.c b/test/test_type.c
new file mode 100644
index 0000000..6415266
--- /dev/null
+++ b/test/test_type.c
@@ -0,0 +1,15 @@
+#include "usg_common.h"
+#include "usg_typedef.h"
+#include "dgram_mod_socket.h"
+#include "shm_mm.h"
+#include "mm.h"
+#include <typeinfo>
+#include "lock_free_queue.h"
+
+int main() {
+ shm_init(512);
+ LockFreeQueue<int> * queue = new LockFreeQueue<int>(16);
+ void * tmp = (void *)queue;
+ std::cout << typeid(queue).name() << std::endl;
+ std::cout << typeid(tmp).name() << std::endl;
+}
\ No newline at end of file
diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c
index 5a549bc..f800f10 100644
--- a/test_socket/dgram_mod_bus.c
+++ b/test_socket/dgram_mod_bus.c
@@ -10,7 +10,6 @@
}
void server(int port, bool restart) {
- signal(SIGINT, sigint_handler);
server_socket = dgram_mod_open_socket();
@@ -39,6 +38,8 @@
void client(int port) {
void *socket = dgram_mod_open_socket();
+
+
pthread_t tid;
pthread_create(&tid, NULL, run_recv, socket);
int size;
@@ -56,7 +57,7 @@
printf("Please input topic!\n");
scanf("%s", topic);
if (dgram_mod_sub(socket, topic, strlen(topic), port) == 0) {
- printf("Sub success!\n");
+ printf("%d Sub success!\n", dgram_mod_get_port(socket));
} else {
printf("Sub failture!\n");
exit(0);
@@ -66,24 +67,25 @@
printf("Please input topic!\n");
scanf("%s", topic);
if (dgram_mod_desub(socket, topic, strlen(topic), port) == 0) {
- printf("Desub success!\n");
+ printf("%d Desub success!\n", dgram_mod_get_port(socket));
} else {
printf("Desub failture!\n");
exit(0);
}
- }
- else if(strcmp(action, "pub") == 0) {
+ } else if(strcmp(action, "pub") == 0) {
// printf("%s %s %s\n", action, topic, content);
printf("Please input topic and content\n");
scanf("%s %s", topic, content);
if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port) == 0){
- printf("Pub success!\n");
+ printf("%d Pub success!\n", dgram_mod_get_port(socket));
} else {
printf("Pub failture!\n");
}
} else if(strcmp(action, "quit") == 0) {
+ printf("(%d) quit\n", dgram_mod_get_port(socket));
+ dgram_mod_close_socket(socket);
break;
} else {
printf("error input argument\n");
@@ -91,8 +93,7 @@
}
}
- printf("(%d) quit\n", dgram_mod_get_port(socket));
- dgram_mod_close_socket(socket);
+
}
@@ -101,7 +102,7 @@
shm_init(512);
int port;
if (argc < 3) {
- fprintf(stderr, "Usage: %s %s|%s <PORT> ...\n", argv[0], "server", "client");
+ fprintf(stderr, "Usage: %s %s|%s|rmkey <PORT> ...\n", argv[0], "server", "client");
return 1;
}
@@ -115,10 +116,16 @@
server(port, false);
}
+ } else if (strcmp("client", argv[1]) == 0) {
+ client(port);
+ } else if(strcmp("rmkey", argv[1]) == 0) {
+ for(int i = 2; i < argc; i++) {
+ port = atoi(argv[i]);
+ dgram_mod_remove_key(port);
+ // printf("%d\n", port);
+ }
}
- if (strcmp("client", argv[1]) == 0)
- client(port);
return 0;
--
Gitblit v1.8.0