From 9b9220321e647b381a999f67cad12345334b2cbe Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 06 八月 2020 13:06:23 +0800 Subject: [PATCH] update --- src/socket/dmod_socket.c | 85 +++++++++++++++++++++++++++++++++++++----- 1 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c index 586e49f..3086dd0 100644 --- a/src/socket/dmod_socket.c +++ b/src/socket/dmod_socket.c @@ -19,15 +19,50 @@ } } +bool DModSocket::include_in_keys(int key, int keys[], size_t length) { + if(length == 0) { + return false; + } + for(int i = 0; i < length; i++) { + if(keys[i] == key) + return true; + } + return false; +} + +size_t DModSocket::remove_subscripters(int keys[], size_t length) { + size_t count; + foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, SHMKeySet::iterator set_iter){ + if (include_in_keys(*set_iter, keys, length)) { + subscripter_set->erase(set_iter); + count++; + } + }); + return count; +} + + +size_t DModSocket::remove_keys(int keys[], size_t length) { + remove_subscripters(keys, length); + return shm_socket_remove_keys(keys, length); +} DModSocket::DModSocket() { shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + bus_set = new std::set<int>; } DModSocket::~DModSocket() { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; - + struct timespec timeout = {1, 0}; + if(bus_set != NULL) { + for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { + desub_timeout(NULL, 0, *bus_iter, &timeout); + } + delete bus_set; + } + if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { subscripter_set = map_iter->second; @@ -208,8 +243,13 @@ int DModSocket::_sub_( void *topic, int size, int port, struct timespec *timeout, int flags) { char buf[8192]; + int rv; snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); - return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); + rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); + if(rv == 0) { + bus_set->insert(port); + } + return rv; } @@ -268,7 +308,20 @@ subscripter_set = map_iter->second; subscripter_set->erase(port); } - +} + +/* + * 澶勭悊鍙栨秷鎵�鏈夎闃� +*/ +void DModSocket::_proxy_desub_all(int port) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + // SHMKeySet::iterator set_iter; + for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + subscripter_set->erase(port); + } } /* @@ -334,12 +387,20 @@ } } else if(strcmp(action, "desub") == 0) { - // 璁㈤槄鏀寔澶氫富棰樿闃� - topic = strtok(topics, topic_delim); - while(topic) { - _proxy_desub(trim(topic, 0), port); - topic = strtok(NULL, topic_delim); - } + if(strcmp(trim(topics, 0), "") == 0) { + // 鍙栨秷鎵�鏈夎闃� + printf("鍙栨秷鎵�鏈夎闃�"); + _proxy_desub_all(port); + } else { + + topic = strtok(topics, topic_delim); + while(topic) { + _proxy_desub(trim(topic, 0), port); + topic = strtok(NULL, topic_delim); + } + } + + } else if(strcmp(action, "pub") == 0) { _proxy_pub(topics, head_len, buf, size, port); @@ -410,12 +471,14 @@ return 0; } - char *topic = (char *)calloc(1, topic_len+1); + char *topic = (char *)malloc(topic_len+1); strncpy(topic, topic_start_ptr, topic_len); + *topic = '\0'; *_topic = topic; - char *action = (char *)calloc(1, action_len+1); + char *action = (char *)malloc(action_len+1); strncpy(action, action_start_ptr, action_len); + *action = '\0'; *_action = action; *head_len = ptr-str; -- Gitblit v1.8.0