wangzhengquan
2020-07-27 554529bb69cd610e83db2c9a80b4f36f5225d80f
restart bus
17个文件已修改
197 ■■■■ 已修改文件
demo/dgram_mod_req_rep 补丁 | 查看 | 原始文档 | blame | 历史
demo/dgram_mod_survey 补丁 | 查看 | 原始文档 | blame | 历史
src/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/hashtable.c 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/mem_pool.h 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_allocator.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_mm.h 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_queue.h 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_mm.c 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dgram_mod_socket.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/shm_socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.c 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_req_rep 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_survey 补丁 | 查看 | 原始文档 | blame | 历史
demo/dgram_mod_req_rep
Binary files differ
demo/dgram_mod_survey
Binary files differ
src/libshm_queue.a
Binary files differ
src/queue/hashtable.c
@@ -177,18 +177,7 @@
}
int hashtable_alloc_key(hashtable_t *hashtable) {
  int key = START_KEY;
  SemUtil::dec(hashtable->wlock);
  while(_hashtable_get(hashtable, key) != NULL) {
    key++;
  }
  _hashtable_put(hashtable, key, (void *)1);
  SemUtil::inc(hashtable->wlock);
  return key;
}
void *hashtable_get(hashtable_t *hashtable, int key) {
   SemUtil::dec(hashtable->mutex);
@@ -251,6 +240,19 @@
  }
}
int hashtable_alloc_key(hashtable_t *hashtable) {
  int key = START_KEY;
  SemUtil::dec(hashtable->wlock);
  while(_hashtable_get(hashtable, key) != NULL) {
    key++;
  }
  _hashtable_put(hashtable, key, (void *)1);
  SemUtil::inc(hashtable->wlock);
  return key;
}
std::set<int> * hashtable_keyset(hashtable_t *hashtable) {
  std::set<int> *keyset = new std::set<int>;
  tailq_entry_t *item;
@@ -267,3 +269,5 @@
  }
  return keyset;
}
src/queue/include/mem_pool.h
@@ -34,6 +34,22 @@
    return ptr;
}
template <typename T>
static inline  T* mem_pool_attach(int key) {
    void *ptr;
    // T* tptr;
    hashtable_t *hashtable = mm_get_hashtable();
  ptr = hashtable_get(hashtable, key);
printf("mem_pool_malloc_by_key  malloc before %d, %p\n", key, ptr);
  if(ptr == NULL || ptr == (void *)1 ) {
    ptr = mm_malloc(sizeof(T));
    hashtable_put(hashtable, key, ptr);
    new(ptr) T;
printf("mem_pool_malloc_by_key  use new %d, %p\n", key, ptr);
  }
  return (T*)ptr;
}
static inline void mem_pool_free (void *ptr) {
    mm_free(ptr);
    // notify malloc
@@ -45,10 +61,12 @@
    return mm_realloc(ptr, size);
}
static inline hashtable_t * mem_pool_get_hashtable() {
    return mm_get_hashtable();
static inline int mem_pool_alloc_key() {
    hashtable_t *hashtable = mm_get_hashtable();
    return hashtable_alloc_key(hashtable);
}
// extern int mm_checkheap(int verbose);
src/queue/include/shm_allocator.h
@@ -66,11 +66,13 @@
class SHM_Allocator {
  public:
    static void *allocate (size_t size) {
      return mem_pool_malloc(size);
      return mm_malloc(size);
      // return mem_pool_malloc(size);
    }
    static void deallocate (void *ptr) {
      return mem_pool_free(ptr);
      return mm_free(ptr);
      // return mem_pool_free(ptr);
    }
};
@@ -93,6 +95,6 @@
typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
#endif
src/queue/include/shm_mm.h
@@ -18,6 +18,9 @@
 */
void shm_destroy();
void* shm_malloc_by_key(int key, int size);
#ifdef __cplusplus
}
#endif
src/queue/include/shm_queue.h
@@ -38,7 +38,8 @@
  inline ELEM_T &operator[](unsigned i);
  static void remove_queues_exclude(int *keys, size_t length);
  static void remove_queues_exclude(int keys[], size_t length);
  static void remove_queues_include(int keys[], size_t length);
private:
protected:
@@ -52,7 +53,7 @@
};
template <typename ELEM_T>
void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) {
void SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
@@ -67,12 +68,21 @@
      }
    }
    if (!found) {
      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable,
                                                                     *keyItr);
      delete mqueue;
      // mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      // delete mqueue;
      hashtable_remove(hashtable, *keyItr);
    }
  }
  delete keyset;
}
template <typename ELEM_T>
void SHMQueue<ELEM_T>::remove_queues_include(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  for(int i = 0; i< length; i++) {
    hashtable_remove(hashtable, keys[i]);
  }
}
template <typename ELEM_T>
@@ -86,8 +96,7 @@
    hashtable_put(hashtable, key, (void *)queue);
  }
  queue->reference++;
  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d",
                                   queue->reference.load());
  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
@@ -96,7 +105,7 @@
  // LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d",
  // queue->reference.load());
  if (queue->reference.load() == 0) {
    delete queue;
   // delete queue;
    hashtable_t *hashtable = mm_get_hashtable();
    hashtable_remove(hashtable, KEY);
    // LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n");
src/queue/shm_mm.c
@@ -1,5 +1,6 @@
#include "shm_mm.h"
#include "mem_pool.h"
#include "hashtable.h"
void shm_init(int size) {
    mem_pool_init(size);
src/socket/dgram_mod_socket.c
@@ -16,11 +16,15 @@
static Logger logger = LoggerFactory::getLogger();
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<SHMString, SHMKeySet *> > > SHMTopicSubMap;
typedef struct dgram_mod_socket_t {
  shm_socket_t *shm_socket;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    std::map<std::string, std::set<int> *> *topic_sub_map;
    SHMTopicSubMap *topic_sub_map;
} dgram_mod_socket_t;
static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
@@ -36,9 +40,9 @@
int dgram_mod_close_socket(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
    std::set<int> *subscripter_set;
    std::map<std::string, std::set<int> *>::iterator map_iter;
    SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
@@ -57,6 +61,12 @@
int dgram_mod_bind(void * _socket, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return  shm_socket_bind(socket->shm_socket, port);
}
int dgram_mod_force_bind(void * _socket, int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_socket_force_bind(socket->shm_socket, port);
}
int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
@@ -96,7 +106,14 @@
int  dgram_mod_start_bus(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    socket->topic_sub_map = new std::map<std::string, std::set<int> *>;
printf("mem_pool_malloc_by_key before\n");
    // void *map_ptr = mem_pool_malloc_by_key(1, sizeof(SHMTopicSubMap));
    socket->topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(1);
printf("mem_pool_malloc_by_key after\n");
    // socket->topic_sub_map = new(map_ptr) SHMTopicSubMap;
    //socket->topic_sub_map = new SHMTopicSubMap;
    run_pubsub_proxy(socket);
    // pthread_t tid;
    // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
@@ -136,16 +153,17 @@
 * 处理订阅
*/
void _proxy_sub(dgram_mod_socket_t *socket, char *topic, int port) {
    std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
    std::set<int> *subscripter_set;
    SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
    SHMKeySet *subscripter_set;
    std::map<std::string, std::set<int> *>::iterator map_iter;
    std::set<int>::iterator set_iter;
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
        subscripter_set = new std::set<int>;
        void *set_ptr = mm_malloc(sizeof(SHMKeySet));
        subscripter_set = new(set_ptr) SHMKeySet;
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(port);
@@ -155,11 +173,11 @@
 * 处理发布,代理转发
*/
void _proxy_pub(dgram_mod_socket_t * socket, char *topic, size_t head_len, void *buf, size_t size, int port) {
    std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map;
    std::set<int> *subscripter_set;
    SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
    SHMKeySet *subscripter_set;
    std::map<std::string, std::set<int> *>::iterator map_iter;
    std::set<int>::iterator set_iter;
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
    std::vector<int> subscripter_to_del;
    std::vector<int>::iterator vector_iter;
@@ -171,12 +189,14 @@
        subscripter_set = map_iter->second;
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            send_port = *set_iter;
// printf("run_accept_sub_request send before %d \n", send_port);
 printf("_proxy_pub send before %d \n", send_port);
            if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
                //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
                subscripter_to_del.push_back(send_port);
            } else {
printf("_proxy_pub send after: %d \n", send_port);
            }
// printf("run_accept_sub_request send after: %d \n", send_port);
            
        }
@@ -200,9 +220,9 @@
    size_t head_len;
    const char *topic_delim = ",";
//printf("server receive before\n");
printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) {
//printf("server recv after: %s \n", buf);
printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
src/socket/include/dgram_mod_socket.h
@@ -25,6 +25,8 @@
*/
int dgram_mod_bind(void * _socket, int port);
int dgram_mod_force_bind(void * _socket, int port);
/**
 * 发送信息
 * @port 发送给谁
src/socket/include/shm_socket.h
@@ -44,6 +44,7 @@
    shm_socket_type_t socket_type;
    // 本地port
    int port;
    bool force_bind;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;
    SHMQueue<shm_msg_t> *remoteQueue;
@@ -65,6 +66,9 @@
int shm_socket_bind(shm_socket_t * socket, int port) ;
int shm_socket_force_bind(shm_socket_t * socket, int port) ;
int shm_listen(shm_socket_t * socket) ;
shm_socket_t* shm_accept(shm_socket_t* socket);
src/socket/shm_socket.c
@@ -23,6 +23,7 @@
  shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  socket->socket_type = socket_type;
  socket->port = -1;
  socket->force_bind = false;
  socket->dispatch_thread = 0;
  socket->status = SHM_CONN_CLOSED;
@@ -46,6 +47,12 @@
  return 0;
}
int shm_socket_force_bind(shm_socket_t *socket, int port) {
  socket->force_bind = true;
  socket->port = port;
  return 0;
}
int shm_listen(shm_socket_t *socket) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
@@ -60,7 +67,7 @@
    socket->port = port;
  } else {
    if (hashtable_get(hashtable, socket->port) != NULL) {
    if (hashtable_get(hashtable, socket->port) != NULL && !socket->force_bind) {
      err_exit(0, "key %d has already been in used!", socket->port);
    }
  }
@@ -144,7 +151,7 @@
    socket->port = hashtable_alloc_key(hashtable);
  } else {
    if (hashtable_get(hashtable, socket->port) != NULL) {
    if (hashtable_get(hashtable, socket->port) != NULL && !socket->force_bind ) {
      err_exit(0, "key %d has already been in used!", socket->port);
    }
  }
@@ -243,6 +250,7 @@
    } else {
      if (hashtable_get(hashtable, socket->port) != NULL) {
        if(!socket->force_bind)
        err_exit(0, "key %d has already been in used!", socket->port);
      }
    }
@@ -299,6 +307,7 @@
    } else {
      if (hashtable_get(hashtable, socket->port) != NULL) {
        if(!socket->force_bind)
        err_exit(0, "key %d has already been in used!", socket->port);
      }
    }
test_socket/dgram_mod_bus
Binary files differ
test_socket/dgram_mod_bus.c
@@ -1,13 +1,30 @@
#include "dgram_mod_socket.h"
#include "shm_mm.h"
#include "usg_common.h"
#include "mm.h"
void server(int port) {
  void *socket = dgram_mod_open_socket();
  dgram_mod_bind(socket, port);
void sigint_handler(int sig) {
  printf("sigint_handler\n");
  hashtable_t *hashtable = mm_get_hashtable();
  //hashtable_remove(hashtable, 8);
  // dgram_mod_close_socket(server_socket);
  //SHMQueue<ELEM_T>::remove_queues_include
  exit(0);
}
   
  dgram_mod_start_bus(socket);
  
void server(int port, bool restart) {
  //signal(SIGINT,  sigint_handler);
  void * server_socket = dgram_mod_open_socket();
  if(restart) {
    dgram_mod_force_bind(server_socket, port);
  } else {
     dgram_mod_bind(server_socket, port);
  }
  dgram_mod_start_bus(server_socket);
}
@@ -68,14 +85,20 @@
  shm_init(512);
  int port;
  if (argc < 3) {
    fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
    fprintf(stderr, "Usage: %s %s|%s <PORT> ...\n", argv[0], "server", "client");
    return 1;
  }
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0) {
    server(port);
    if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
      server(port, true);
    }
    else{
      server(port, false);
    }
  }
  if (strcmp("client", argv[1]) == 0)
test_socket/dgram_mod_req_rep
Binary files differ
test_socket/dgram_mod_survey
Binary files differ