wangzhengquan
2020-08-06 9b9220321e647b381a999f67cad12345334b2cbe
update
10个文件已修改
154 ■■■■ 已修改文件
src/queue/include/shm_queue.h 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_queue_wrapper.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue_wrapper.c 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dmod_socket.c 85 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dgram_mod_socket.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dmod_socket.h 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/shm_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_queue.h
@@ -38,9 +38,10 @@
  inline ELEM_T &operator[](unsigned i);
  static void remove_queues_exclude(int keys[], size_t length);
  static void remove_queues_include(int keys[], size_t length);
  static void remove_queue(int key);
 // @deprecate
  static size_t remove_queues_exclude(int keys[], size_t length);
  static size_t remove_queues(int keys[], size_t length);
  static size_t remove_queue(int key);
private:
protected:
@@ -53,13 +54,15 @@
  SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
};
// @deprecate
template <typename ELEM_T>
void SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
@@ -73,29 +76,33 @@
      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
template <typename ELEM_T>
void SHMQueue<ELEM_T>::remove_queues_include(int keys[], size_t length) {
size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  size_t count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
  }
  return count;
}
template <typename ELEM_T>
void SHMQueue<ELEM_T>::remove_queue(int key) {
size_t SHMQueue<ELEM_T>::remove_queue(int key) {
  int keys[] = {key};
  remove_queues_include(keys, 1);
  return remove_queues(keys, 1);
}
template <typename ELEM_T>
src/queue/include/shm_queue_wrapper.h
@@ -11,10 +11,11 @@
 
//移除不包含在keys中的队列
/**
 * @depracate 已废弃不用
 * 移除不包含在keys中的队列
 */
void shm_remove_queues_exclude(void *keys, int length);
//移除不包含在keys中的队列
void shm_remove_queue(int key);
/**
 * 创建队列
 * @ shmqueue 
src/queue/shm_queue_wrapper.c
@@ -16,13 +16,11 @@
//移除不包含在keys中的队列
// 移除不包含在keys中的队列,
void shm_remove_queues_exclude(void *keys, int length) {
    SHMQueue<ele_t>::remove_queues_exclude((int*)keys, (size_t)length);
    //SHMQueue<ele_t>::remove_queues_exclude((int*)keys, (size_t)length);
}
void shm_remove_queue(int key) {
    SHMQueue<ele_t>::remove_queue(key);
}
/**
 * 创建队列
src/socket/dgram_mod_socket.c
@@ -6,6 +6,15 @@
  
} dgram_mod_socket_t;
int dgram_mod_remove_keys(int keys[], int length){
    return DModSocket::remove_keys(keys, length);
}
int dgram_mod_remove_key(int key){
    int keys[] = {key};
    return DModSocket::remove_keys(keys, 1);
}
/**
 * 创建socket
 * @return socket地址
src/socket/dmod_socket.c
@@ -19,15 +19,50 @@
    }
}
bool DModSocket::include_in_keys(int key, int keys[], size_t length) {
    if(length == 0) {
        return false;
    }
    for(int i = 0; i < length; i++) {
        if(keys[i] == key)
            return true;
    }
    return false;
}
size_t DModSocket::remove_subscripters(int keys[], size_t length) {
    size_t count;
    foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, SHMKeySet::iterator set_iter){
        if (include_in_keys(*set_iter, keys, length)) {
            subscripter_set->erase(set_iter);
            count++;
        }
    });
    return count;
}
size_t DModSocket::remove_keys(int keys[], size_t length) {
    remove_subscripters(keys, length);
    return shm_socket_remove_keys(keys, length);
}
DModSocket::DModSocket() {
        shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
        bus_set = new std::set<int>;
}
DModSocket::~DModSocket() {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    struct timespec timeout = {1, 0};
    if(bus_set != NULL) {
        for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
            desub_timeout(NULL, 0, *bus_iter, &timeout);
        }
        delete bus_set;
    }
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
@@ -208,8 +243,13 @@
int  DModSocket::_sub_( void *topic, int size, int port,  
    struct timespec *timeout, int flags) {
    char buf[8192];
    int rv;
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
    rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
    if(rv == 0) {
        bus_set->insert(port);
    }
    return rv;
}
@@ -268,7 +308,20 @@
        subscripter_set = map_iter->second;
        subscripter_set->erase(port);
    }
}
/*
 * 处理取消所有订阅
*/
void DModSocket::_proxy_desub_all(int port) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            subscripter_set->erase(port);
    }
}
/*
@@ -334,12 +387,20 @@
              }
            } else if(strcmp(action, "desub") == 0) {
                // 订阅支持多主题订阅
                topic = strtok(topics, topic_delim);
              while(topic) {
           _proxy_desub(trim(topic, 0), port);
            topic =  strtok(NULL, topic_delim);
              }
                if(strcmp(trim(topics, 0), "") == 0) {
                    // 取消所有订阅
        printf("取消所有订阅");
                    _proxy_desub_all(port);
                } else {
                    topic = strtok(topics, topic_delim);
                  while(topic) {
               _proxy_desub(trim(topic, 0), port);
                topic =  strtok(NULL, topic_delim);
                  }
                }
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(topics, head_len, buf, size, port);
@@ -410,12 +471,14 @@
  return 0;
 }
 char *topic = (char *)calloc(1, topic_len+1);
 char *topic = (char *)malloc(topic_len+1);
 strncpy(topic, topic_start_ptr, topic_len);
 *topic = '\0';
 *_topic = topic;
 char *action = (char *)calloc(1, action_len+1);
 char *action = (char *)malloc(action_len+1);
 strncpy(action, action_start_ptr, action_len);
 *action = '\0';
 *_action = action;
 *head_len = ptr-str;
src/socket/include/dgram_mod_socket.h
@@ -6,7 +6,9 @@
extern "C" {
#endif
int dgram_mod_remove_keys(int keys[], int length);
int dgram_mod_remove_key(int key);
/**
 * 创建socket
 * @return socket地址
src/socket/include/dmod_socket.h
@@ -38,6 +38,7 @@
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
    std::set<int> *bus_set;
private:
    inline int _recvfrom_(void **buf, int *size, int *port,  struct timespec *timeout, int flags);
@@ -49,8 +50,14 @@
    int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
    void _proxy_desub( char *topic, int port);
    void _proxy_desub_all(int port);
    int  _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
    static void  foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)>  cb) ;
    static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)>  cb);
    static bool include_in_keys(int key, int keys[], size_t length);
    static size_t remove_subscripters(int keys[], size_t length) ;
public:
    static size_t remove_keys(int keys[], size_t length);
public:
    DModSocket();
    ~DModSocket();
@@ -145,6 +152,8 @@
     * 获取soket端口号
     */
    int get_port() ;
};
#endif
src/socket/include/shm_socket.h
@@ -68,6 +68,7 @@
size_t shm_socket_remove_keys(int keys[], size_t length);
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
src/socket/shm_socket.c
@@ -30,6 +30,12 @@
SHMQueue<shm_msg_t> *_attach_remote_queue(int port);
size_t shm_socket_remove_keys(int keys[], size_t length) {
  return SHMQueue<shm_msg_t>::remove_queues(keys,  length);
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  socket->socket_type = socket_type;
test/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
 
PROGS = protocle_parse strtok test_set test_vector lambda test
PROGS = protocle_parse strtok test_set test_vector lambda test test_type
build: $(PROGS)