From ff4991e1f942a3f1281330e21bf437b4b8558094 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 06 八月 2020 15:22:55 +0800 Subject: [PATCH] add remove_keys --- test_socket/dgram_mod_bus.c | 29 ++++-- src/socket/dgram_mod_socket.c | 18 ++-- src/socket/include/dmod_socket.h | 26 +++--- test/test_type.c | 15 +++ test/test.c | 6 + src/socket/dmod_socket.c | 89 +++++++++++++++------- 6 files changed, 122 insertions(+), 61 deletions(-) diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c index 7a4511c..0f988b5 100644 --- a/src/socket/dgram_mod_socket.c +++ b/src/socket/dgram_mod_socket.c @@ -137,17 +137,17 @@ */ int dgram_mod_sub(void * _socket, void *topic, int size, int port){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sub(topic, size, port); + return socket->m_socket->sub((char *)topic, size, port); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; struct timespec timeout = {sec, nsec}; - return socket->m_socket->sub_timeout(topic, size, port, &timeout); + return socket->m_socket->sub_timeout((char *)topic, size, port, &timeout); } int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sub_nowait(topic, size, port); + return socket->m_socket->sub_nowait((char *)topic, size, port); } @@ -160,17 +160,17 @@ */ int dgram_mod_desub(void * _socket, void *topic, int size, int port){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->desub(topic, size, port); + return socket->m_socket->desub((char *)topic, size, port); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; struct timespec timeout = {sec, nsec}; - return socket->m_socket->desub_timeout(topic, size, port, &timeout); + return socket->m_socket->desub_timeout((char *)topic, size, port, &timeout); } int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->desub_nowait(topic, size, port); + return socket->m_socket->desub_nowait((char *)topic, size, port); } @@ -183,17 +183,17 @@ */ int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->pub(topic, topic_size, content, content_size, port); + return socket->m_socket->pub((char *)topic, topic_size, content, content_size, port); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; struct timespec timeout = {sec, nsec}; - return socket->m_socket->pub_timeout(topic, topic_size, content, content_size, port, &timeout); + return socket->m_socket->pub_timeout((char *)topic, topic_size, content, content_size, port, &timeout); } int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port){ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->pub_nowait(topic, topic_size, content, content_size, port); + return socket->m_socket->pub_nowait((char *)topic, topic_size, content, content_size, port); } diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c index 3086dd0..df57a9f 100644 --- a/src/socket/dmod_socket.c +++ b/src/socket/dmod_socket.c @@ -1,7 +1,7 @@ #include "dmod_socket.h" -void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb) { +void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; @@ -12,7 +12,7 @@ subscripter_set = map_iter->second; if(subscripter_set != NULL) { for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - cb(subscripter_set, set_iter); + cb(subscripter_set, *set_iter); } } } @@ -31,14 +31,38 @@ } size_t DModSocket::remove_subscripters(int keys[], size_t length) { - size_t count; - foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, SHMKeySet::iterator set_iter){ - if (include_in_keys(*set_iter, keys, length)) { - subscripter_set->erase(set_iter); - count++; + size_t count = 0; + int key; + for(int i = 0; i < length; i++) { + key = keys[i]; + SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + SHMKeySet *subscripter_set; + SHMKeySet::iterator set_iter; + SHMTopicSubMap::iterator map_iter; + + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { + subscripter_set->erase(set_iter); +// printf("remove_subscripter %s, %d\n", map_iter->first, key); + count++; + } + } } - }); + } return count; +// foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, int key){ +// printf("foreach===========\n"); +// if (include_in_keys(key, keys, length)) { + +// //subscripter_set->erase(key); +// printf("remove_subscripter %d\n", key); +// count++; +// } +// }); +// printf("remove_subscripters count = %d\n", count); + } @@ -177,14 +201,14 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int DModSocket::sub( void *topic, int size, int port){ +int DModSocket::sub(char *topic, int size, int port){ return _sub_( topic, size, port, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::sub_timeout(void *topic, int size, int port, struct timespec *timeout){ +int DModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){ return _sub_(topic, size, port, timeout, 0); } -int DModSocket::sub_nowait(void *topic, int size, int port) { +int DModSocket::sub_nowait(char *topic, int size, int port) { return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); } @@ -196,14 +220,14 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int DModSocket::desub( void *topic, int size, int port){ +int DModSocket::desub(char *topic, int size, int port){ return _desub_( topic, size, port, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){ +int DModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){ return _desub_(topic, size, port, timeout, 0); } -int DModSocket::desub_nowait(void *topic, int size, int port) { +int DModSocket::desub_nowait(char *topic, int size, int port) { return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); } @@ -215,14 +239,14 @@ * @content 涓婚鍐呭 * @port 鎬荤嚎绔彛 */ -int DModSocket::pub(void *topic, int topic_size, void *content, int content_size, int port){ +int DModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){ return _pub_(topic, topic_size, content, content_size, port, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::pub_timeout(void *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ +int DModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ return _pub_( topic, topic_size, content, content_size, port, timeout, 0); } -int DModSocket::pub_nowait(void *topic, int topic_size, void *content, int content_size, int port){ +int DModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){ return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); } @@ -240,11 +264,11 @@ /** * @port 鎬荤嚎绔彛 */ -int DModSocket::_sub_( void *topic, int size, int port, +int DModSocket::_sub_(char *topic, int size, int port, struct timespec *timeout, int flags) { char buf[8192]; int rv; - snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); if(rv == 0) { bus_set->insert(port); @@ -256,21 +280,24 @@ /** * @port 鎬荤嚎绔彛 */ -int DModSocket::_desub_( void *topic, int size, int port, +int DModSocket::_desub_(char *topic, int size, int port, struct timespec *timeout, int flags) { char buf[8192]; - snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + if(topic == NULL) { + topic = ""; + } + snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); } /** * @port 鎬荤嚎绔彛 */ -int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port, +int DModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags) { int head_len; char buf[8192+content_size]; - snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); head_len = strlen(buf); memcpy(buf+head_len, content, content_size); return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); @@ -284,7 +311,7 @@ SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; - +printf("_proxy_sub topic = %s\n", topic); if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; } else { @@ -306,7 +333,9 @@ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; + subscripter_set->erase(port); +printf("============ desub %d\n", port); } } @@ -321,6 +350,7 @@ for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { subscripter_set = map_iter->second; subscripter_set->erase(port); +printf("============ desub %d\n", port); } } @@ -376,20 +406,23 @@ const char *topic_delim = ","; // printf("run_pubsub_proxy server receive before\n"); while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) { -// printf("run_pubsub_proxy server recv after: %s \n", buf); +//printf("run_pubsub_proxy server recv after: %s \n", buf); if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { +// printf("run_pubsub_proxy %s %s \n", action, topics); if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); +//printf("run_pubsub_proxy topic = %s\n", topic); while(topic) { _proxy_sub(trim(topic, 0), port); topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "desub") == 0) { +printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� - printf("鍙栨秷鎵�鏈夎闃�"); + printf("====鍙栨秷鎵�鏈夎闃匼n"); _proxy_desub_all(port); } else { @@ -473,12 +506,12 @@ char *topic = (char *)malloc(topic_len+1); strncpy(topic, topic_start_ptr, topic_len); - *topic = '\0'; + *(topic+topic_len) = '\0'; *_topic = topic; char *action = (char *)malloc(action_len+1); strncpy(action, action_start_ptr, action_len); - *action = '\0'; + *(action+action_len) = '\0'; *_action = action; *head_len = ptr-str; diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h index 8eb0fbf..e34e4cc 100644 --- a/src/socket/include/dmod_socket.h +++ b/src/socket/include/dmod_socket.h @@ -46,14 +46,14 @@ void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port); void *run_pubsub_proxy(); int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); - int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags); - int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags); + int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags); + int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags); void _proxy_desub( char *topic, int port); void _proxy_desub_all(int port); - int _desub_( void *topic, int size, int port, struct timespec *timeout, int flags); + int _desub_( char *topic, int size, int port, struct timespec *timeout, int flags); - static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb); + static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); static bool include_in_keys(int key, int keys[], size_t length); static size_t remove_subscripters(int keys[], size_t length) ; public: @@ -119,10 +119,10 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ - int sub(void *topic, int size, int port); + int sub(char *topic, int size, int port); // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int sub_timeout(void *topic, int size, int port, struct timespec *timeout); - int sub_nowait(void *topic, int size, int port); + int sub_timeout(char *topic, int size, int port, struct timespec *timeout); + int sub_nowait(char *topic, int size, int port); /** @@ -131,10 +131,10 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ - int desub( void *topic, int size, int port); + int desub( char *topic, int size, int port); // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int desub_timeout(void *topic, int size, int port, struct timespec *timeout); - int desub_nowait(void *topic, int size, int port) ; + int desub_timeout(char *topic, int size, int port, struct timespec *timeout); + int desub_nowait(char *topic, int size, int port) ; /** * 鍙戝竷涓婚 @@ -142,10 +142,10 @@ * @content 涓婚鍐呭 * @port 鎬荤嚎绔彛 */ - int pub(void *topic, int topic_size, void *content, int content_size, int port); + int pub(char *topic, int topic_size, void *content, int content_size, int port); // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int pub_timeout(void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout); - int pub_nowait(void *topic, int topic_size, void *content, int content_size, int port); + int pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout); + int pub_nowait(char *topic, int topic_size, void *content, int content_size, int port); /** diff --git a/test/test.c b/test/test.c index b978ef2..1f7533b 100644 --- a/test/test.c +++ b/test/test.c @@ -2,4 +2,10 @@ #include "usg_typedef.h" int main() { + + char buf[1024]; + sprintf(buf, "%s\n", (char*)"" ); + printf(buf); + int d = strcmp(trim("", 0), ""); + printf("%d\n", d); } \ No newline at end of file diff --git a/test/test_type.c b/test/test_type.c new file mode 100644 index 0000000..6415266 --- /dev/null +++ b/test/test_type.c @@ -0,0 +1,15 @@ +#include "usg_common.h" +#include "usg_typedef.h" +#include "dgram_mod_socket.h" +#include "shm_mm.h" +#include "mm.h" +#include <typeinfo> +#include "lock_free_queue.h" + +int main() { + shm_init(512); + LockFreeQueue<int> * queue = new LockFreeQueue<int>(16); + void * tmp = (void *)queue; + std::cout << typeid(queue).name() << std::endl; + std::cout << typeid(tmp).name() << std::endl; +} \ No newline at end of file diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c index 5a549bc..f800f10 100644 --- a/test_socket/dgram_mod_bus.c +++ b/test_socket/dgram_mod_bus.c @@ -10,7 +10,6 @@ } void server(int port, bool restart) { - signal(SIGINT, sigint_handler); server_socket = dgram_mod_open_socket(); @@ -39,6 +38,8 @@ void client(int port) { void *socket = dgram_mod_open_socket(); + + pthread_t tid; pthread_create(&tid, NULL, run_recv, socket); int size; @@ -56,7 +57,7 @@ printf("Please input topic!\n"); scanf("%s", topic); if (dgram_mod_sub(socket, topic, strlen(topic), port) == 0) { - printf("Sub success!\n"); + printf("%d Sub success!\n", dgram_mod_get_port(socket)); } else { printf("Sub failture!\n"); exit(0); @@ -66,24 +67,25 @@ printf("Please input topic!\n"); scanf("%s", topic); if (dgram_mod_desub(socket, topic, strlen(topic), port) == 0) { - printf("Desub success!\n"); + printf("%d Desub success!\n", dgram_mod_get_port(socket)); } else { printf("Desub failture!\n"); exit(0); } - } - else if(strcmp(action, "pub") == 0) { + } else if(strcmp(action, "pub") == 0) { // printf("%s %s %s\n", action, topic, content); printf("Please input topic and content\n"); scanf("%s %s", topic, content); if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port) == 0){ - printf("Pub success!\n"); + printf("%d Pub success!\n", dgram_mod_get_port(socket)); } else { printf("Pub failture!\n"); } } else if(strcmp(action, "quit") == 0) { + printf("(%d) quit\n", dgram_mod_get_port(socket)); + dgram_mod_close_socket(socket); break; } else { printf("error input argument\n"); @@ -91,8 +93,7 @@ } } - printf("(%d) quit\n", dgram_mod_get_port(socket)); - dgram_mod_close_socket(socket); + } @@ -101,7 +102,7 @@ shm_init(512); int port; if (argc < 3) { - fprintf(stderr, "Usage: %s %s|%s <PORT> ...\n", argv[0], "server", "client"); + fprintf(stderr, "Usage: %s %s|%s|rmkey <PORT> ...\n", argv[0], "server", "client"); return 1; } @@ -115,10 +116,16 @@ server(port, false); } + } else if (strcmp("client", argv[1]) == 0) { + client(port); + } else if(strcmp("rmkey", argv[1]) == 0) { + for(int i = 2; i < argc; i++) { + port = atoi(argv[i]); + dgram_mod_remove_key(port); + // printf("%d\n", port); + } } - if (strcmp("client", argv[1]) == 0) - client(port); return 0; -- Gitblit v1.8.0