wangzhengquan
2020-08-10 dc5557c7bb411037002e706cb20df0f71d12e5ee
fix topic_sub_map
1个文件已添加
6个文件已修改
55 ■■■■■ 已修改文件
src/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/lock_free_queue.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dmod_socket.c 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/util/sem_util.c 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_survey.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/test_open_close.c 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/Makefile
@@ -26,7 +26,7 @@
PREFIX = $(ROOT)/build
ifeq ($(DEBUG),y)
  MYLIBS = $(LIBSQUEUE)
  MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
else
  MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
endif
src/queue/include/lock_free_queue.h
@@ -252,7 +252,7 @@
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue push_timeout");
            // err_msg(errno, "LockFreeQueue push_timeout");
            return false;
        }
    }
@@ -324,7 +324,7 @@
        if (errno == EAGAIN)
            return false;
        else {
            err_msg(errno, "LockFreeQueue pop_timeout");
            // err_msg(errno, "LockFreeQueue pop_timeout");
            return false;
        }
    }
src/socket/dmod_socket.c
@@ -52,16 +52,6 @@
        }
    }
    return count;
//     foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, int key){
// printf("foreach===========\n");
//         if (include_in_keys(key, keys, length)) {
//             //subscripter_set->erase(key);
// printf("remove_subscripter %d\n", key);
//             count++;
//         }
//     });
// printf("remove_subscripters count = %d\n", count);
    
}
@@ -72,34 +62,47 @@
}
DModSocket::DModSocket() {
    mod = (socket_mod_t)0;
        shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
        bus_set = new std::set<int>;
    topic_sub_map = NULL;
}
DModSocket::~DModSocket() {
// printf("DModSocket  destory 1\n");
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    struct timespec timeout = {1, 0};
    if(bus_set != NULL) {
        for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
            desub_timeout(NULL, 0, *bus_iter, &timeout);
// printf("DModSocket  desub_timeout before");
            // desub_timeout(NULL, 0, *bus_iter, &timeout);
// printf("DModSocket  desub_timeout after %d\n", *bus_iter);
        }
        delete bus_set;
    }
    
// printf("DModSocket  destory 2\n");
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
// printf("DModSocket  destory 2-1\n");
            if(subscripter_set != NULL) {
// printf("DModSocket  destory 2-2\n");
            subscripter_set->clear();
// printf("DModSocket  destory 2-3\n");
            mm_free((void *)subscripter_set);
            //delete subscripter_set;
            // printf("=============delete subscripter_set\n");
// printf("DModSocket  destory 2-4\n");
            }
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(BUS_MAP_KEY);
    }
// printf("DModSocket  destory 3\n");
    // printf("=============close socket\n");
    shm_close_socket(shm_socket);
// printf("DModSocket  destory 4\n");
}
int DModSocket::bind(int port) {
src/util/sem_util.c
@@ -114,7 +114,7 @@
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec_timeout");
     // err_msg(errno, "SemUtil::dec_timeout");
      return -1;
    }
@@ -123,7 +123,6 @@
/* Release semaphore - increment it by 1 */
int SemUtil::inc(int semId) {
logger.debug("%d: SemUtil::inc\n", semId);
  struct sembuf sops;
  sops.sem_num = 0;
test_socket/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    dgram_mod_bus dgram_mod_survey dgram_mod_req_rep test_timeout
PROGS =    dgram_mod_bus dgram_mod_survey dgram_mod_req_rep test_timeout test_open_close
build: $(PROGS)
test_socket/dgram_mod_survey.c
@@ -43,7 +43,7 @@
    sprintf(sendbuf, "%d", i);
    printf("SEND HEART:%s\n", sendbuf);
    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    // sleep(1);
    sleep(1);
    i++;
  }
  dgram_mod_close_socket(socket);
@@ -63,7 +63,7 @@
    sprintf(sendbuf, "%d", i);
    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
   // sleep(1);
    sleep(1);
    i++;
  }
  
test_socket/test_open_close.c
New file
@@ -0,0 +1,11 @@
#include "dgram_mod_socket.h"
#include "shm_mm.h"
#include "usg_common.h"
#include "lock_free_queue.h"
int main() {
    shm_init(512);
    void *socket = dgram_mod_open_socket();
    dgram_mod_close_socket(socket);
    // dgram_mod_close_socket(socket);
}