From 00dba6082e245d917cb7d6eed3c627211ff41cd7 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 25 九月 2020 15:53:21 +0800 Subject: [PATCH] update --- src/socket/dmod_socket.c | 167 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 133 insertions(+), 34 deletions(-) diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c index 586e49f..a0aa96b 100644 --- a/src/socket/dmod_socket.c +++ b/src/socket/dmod_socket.c @@ -1,7 +1,7 @@ #include "dmod_socket.h" -void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)> cb) { +void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); SHMKeySet *subscripter_set; SHMKeySet::iterator set_iter; @@ -12,35 +12,97 @@ subscripter_set = map_iter->second; if(subscripter_set != NULL) { for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - cb(subscripter_set, set_iter); + cb(subscripter_set, *set_iter); } } } } } +bool DModSocket::include_in_keys(int key, int keys[], size_t length) { + if(length == 0) { + return false; + } + for(int i = 0; i < length; i++) { + if(keys[i] == key) + return true; + } + return false; +} + +size_t DModSocket::remove_subscripters(int keys[], size_t length) { + size_t count = 0; + int key; + for(int i = 0; i < length; i++) { + key = keys[i]; + SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + SHMKeySet *subscripter_set; + SHMKeySet::iterator set_iter; + SHMTopicSubMap::iterator map_iter; + + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { + subscripter_set->erase(set_iter); +// printf("remove_subscripter %s, %d\n", map_iter->first, key); + count++; + } + } + } + } + return count; + +} + + +size_t DModSocket::remove_keys(int keys[], size_t length) { + remove_subscripters(keys, length); + return shm_socket_remove_keys(keys, length); +} DModSocket::DModSocket() { - shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + mod = (socket_mod_t)0; + shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + bus_set = new std::set<int>; + topic_sub_map = NULL; } DModSocket::~DModSocket() { +// printf("DModSocket destory 1\n"); SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; + struct timespec timeout = {1, 0}; + if(bus_set != NULL) { + for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { +// printf("DModSocket desub_timeout before"); + desub_timeout(NULL, 0, *bus_iter, &timeout); +// printf("DModSocket desub_timeout after %d\n", *bus_iter); + } + delete bus_set; + } +// printf("DModSocket destory 2\n"); if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { subscripter_set = map_iter->second; - subscripter_set->clear(); - mm_free((void *)subscripter_set); - //delete subscripter_set; - // printf("=============delete subscripter_set\n"); +// printf("DModSocket destory 2-1\n"); + if(subscripter_set != NULL) { +// printf("DModSocket destory 2-2\n"); + subscripter_set->clear(); +// printf("DModSocket destory 2-3\n"); + mm_free((void *)subscripter_set); +// printf("DModSocket destory 2-4\n"); + } + } topic_sub_map->clear(); mem_pool_free_by_key(BUS_MAP_KEY); } +// printf("DModSocket destory 3\n"); // printf("=============close socket\n"); shm_close_socket(shm_socket); +// printf("DModSocket destory 4\n"); } int DModSocket::bind(int port) { @@ -142,14 +204,14 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int DModSocket::sub( void *topic, int size, int port){ +int DModSocket::sub(char *topic, int size, int port){ return _sub_( topic, size, port, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::sub_timeout(void *topic, int size, int port, struct timespec *timeout){ +int DModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){ return _sub_(topic, size, port, timeout, 0); } -int DModSocket::sub_nowait(void *topic, int size, int port) { +int DModSocket::sub_nowait(char *topic, int size, int port) { return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); } @@ -161,14 +223,14 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int DModSocket::desub( void *topic, int size, int port){ +int DModSocket::desub(char *topic, int size, int port){ return _desub_( topic, size, port, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){ +int DModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){ return _desub_(topic, size, port, timeout, 0); } -int DModSocket::desub_nowait(void *topic, int size, int port) { +int DModSocket::desub_nowait(char *topic, int size, int port) { return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); } @@ -180,14 +242,14 @@ * @content 涓婚鍐呭 * @port 鎬荤嚎绔彛 */ -int DModSocket::pub(void *topic, int topic_size, void *content, int content_size, int port){ +int DModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){ return _pub_(topic, topic_size, content, content_size, port, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::pub_timeout(void *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ +int DModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ return _pub_( topic, topic_size, content, content_size, port, timeout, 0); } -int DModSocket::pub_nowait(void *topic, int topic_size, void *content, int content_size, int port){ +int DModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){ return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); } @@ -205,32 +267,40 @@ /** * @port 鎬荤嚎绔彛 */ -int DModSocket::_sub_( void *topic, int size, int port, +int DModSocket::_sub_(char *topic, int size, int port, struct timespec *timeout, int flags) { char buf[8192]; - snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); - return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); + int rv; + snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); + rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); + if(rv == 0) { + bus_set->insert(port); + } + return rv; } /** * @port 鎬荤嚎绔彛 */ -int DModSocket::_desub_( void *topic, int size, int port, +int DModSocket::_desub_(char *topic, int size, int port, struct timespec *timeout, int flags) { char buf[8192]; - snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + if(topic == NULL) { + topic = ""; + } + snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); } /** * @port 鎬荤嚎绔彛 */ -int DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port, +int DModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags) { int head_len; char buf[8192+content_size]; - snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); head_len = strlen(buf); memcpy(buf+head_len, content, content_size); return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); @@ -244,7 +314,7 @@ SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; - +printf("_proxy_sub topic = %s\n", topic); if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; } else { @@ -266,9 +336,25 @@ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; + subscripter_set->erase(port); +printf("============ desub %d\n", port); } - +} + +/* + * 澶勭悊鍙栨秷鎵�鏈夎闃� +*/ +void DModSocket::_proxy_desub_all(int port) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + // SHMKeySet::iterator set_iter; + for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + subscripter_set->erase(port); +printf("============ desub %d\n", port); + } } /* @@ -323,23 +409,34 @@ const char *topic_delim = ","; // printf("run_pubsub_proxy server receive before\n"); while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) { -// printf("run_pubsub_proxy server recv after: %s \n", buf); +//printf("run_pubsub_proxy server recv after: %s \n", buf); if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { +// printf("run_pubsub_proxy %s %s \n", action, topics); if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); +//printf("run_pubsub_proxy topic = %s\n", topic); while(topic) { _proxy_sub(trim(topic, 0), port); topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "desub") == 0) { - // 璁㈤槄鏀寔澶氫富棰樿闃� - topic = strtok(topics, topic_delim); - while(topic) { - _proxy_desub(trim(topic, 0), port); - topic = strtok(NULL, topic_delim); - } +// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); + if(strcmp(trim(topics, 0), "") == 0) { + // 鍙栨秷鎵�鏈夎闃� + printf("====鍙栨秷鎵�鏈夎闃匼n"); + _proxy_desub_all(port); + } else { + + topic = strtok(topics, topic_delim); + while(topic) { + _proxy_desub(trim(topic, 0), port); + topic = strtok(NULL, topic_delim); + } + } + + } else if(strcmp(action, "pub") == 0) { _proxy_pub(topics, head_len, buf, size, port); @@ -410,12 +507,14 @@ return 0; } - char *topic = (char *)calloc(1, topic_len+1); + char *topic = (char *)malloc(topic_len+1); strncpy(topic, topic_start_ptr, topic_len); + *(topic+topic_len) = '\0'; *_topic = topic; - char *action = (char *)calloc(1, action_len+1); + char *action = (char *)malloc(action_len+1); strncpy(action, action_start_ptr, action_len); + *(action+action_len) = '\0'; *_action = action; *head_len = ptr-str; -- Gitblit v1.8.0