wangzhengquan
2021-02-05 14c345b38d57fd814f217eb8465963a08ca79f7e
update
11个文件已修改
217 ■■■■ 已修改文件
src/bus_error.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm_wrapper.cpp 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm_wrapper.h 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp
@@ -19,7 +19,8 @@
  "Network fault",
  "Send to self error",
  "Receive from wrong end",
  "Service stoped"
  "Service stoped",
  "Exceed resource limit"
};
src/bus_error.h
@@ -12,6 +12,7 @@
#define EBUS_SENDTO_SELF 505
#define EBUS_RECVFROM_WRONG_END 506
#define EBUS_STOPED 507
#define EBUS_EXCEED_LIMIT 508
extern int bus_errno;
src/queue/shm_queue.h
@@ -61,33 +61,33 @@
};
// @deprecate
template <typename ELEM_T>
size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    if (!found) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
// template <typename ELEM_T>
// size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) {
//   hashtable_t *hashtable = mm_get_hashtable();
//   std::set<int> *keyset = hashtable_keyset(hashtable);
//   std::set<int>::iterator keyItr;
//   LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
//   bool found;
//   size_t count = 0;
//   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
//     found = false;
//     for (size_t i = 0; i < length; i++) {
//       if (*keyItr == keys[i]) {
//         found = true;
//         break;
//       }
//     }
//     if (!found && *keyItr > 100) {
//       // 销毁共享内存的queue
//       mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
//       delete mqueue;
//       hashtable_remove(hashtable, *keyItr);
//       count++;
//     }
//   }
//   delete keyset;
//   return count;
// }
src/shm/hashtable.cpp
@@ -5,6 +5,9 @@
#include "logger_factory.h"
#include <set>
#include <functional>
#include <limits.h>
typedef struct tailq_entry_t
{
@@ -29,22 +32,12 @@
  memset(hashtable, 0, sizeof(hashtable_t));
  hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
  // hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
  // hashtable->cond = svsem_get(IPC_PRIVATE, 1);
  // hashtable->readcnt = 0;
  // FILE * semfile = fopen("./sem.txt", "w+");
  // if(semfile == NULL) {
  //   err_exit(errno, "fopen");
  // }
  // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
  // fclose(semfile);
  hashtable->queueCount = 0;
  hashtable->currentKey = START_KEY;
}
void hashtable_destroy(hashtable_t *hashtable) {
  svsem_remove( hashtable->mutex);
  // svsem_remove( hashtable->wlock);
  // svsem_remove( hashtable->cond);
}
@@ -130,6 +123,7 @@
        /* mm_free the item as we don't need it anymore. */
        mm_free(item);
        hashtable->queueCount--;
        svsem_post(hashtable->mutex);
        return oldvalue;
      }
@@ -150,6 +144,7 @@
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  _hashtable_put(hashtable, key, value); 
  hashtable->queueCount++;
}
bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) {
@@ -183,11 +178,18 @@
  return false;
}
int hashtable_get_queue_count(hashtable_t *hashtable) {
  return hashtable->queueCount;
}
int hashtable_alloc_key(hashtable_t *hashtable) {
  int rv;
  int key = START_KEY;
  int key = hashtable->currentKey;
  if( key == INT_MAX || key < START_KEY) {
    key = START_KEY;
  }
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
@@ -199,10 +201,12 @@
  // 占用key
  _hashtable_put(hashtable, key, (void *)1);
  hashtable->currentKey = key;
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
  }
  return key;
}
@@ -279,6 +283,7 @@
    hashtable->array[i] = NULL;
  }
  hashtable->queueCount = 0;
  if((rv = svsem_post(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n");
  }
src/shm/hashtable.h
@@ -7,13 +7,20 @@
#define MAPSIZE 1024
// 创建Queue数量的上限
#define QUEUE_COUNT_LIMIT 300
typedef struct hashtable_t
{
 struct tailq_header_t* array[MAPSIZE];
 int mutex;
 int queueCount;
 int currentKey; // 当前分配的key
 // int wlock;
 // int cond;
 // size_t readcnt;
} hashtable_t;
typedef void (*hashtable_foreach_cb)(int key, void *value);
@@ -29,6 +36,8 @@
int hashtable_lock(hashtable_t *hashtable);
int hashtable_unlock(hashtable_t *hashtable);
int hashtable_get_queue_count(hashtable_t *hashtable) ;
/** 
 * 遍历hash_table
 * @demo 
src/shm/shm_mm_wrapper.cpp
@@ -1,6 +1,8 @@
#include "shm_mm_wrapper.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "lock_free_queue.h"
#include "shm_socket.h"
void shm_mm_wrapper_init(int size) {
    mem_pool_init(size);
@@ -13,3 +15,54 @@
int shm_mm_wrapper_alloc_key() {
    return mm_alloc_key();
}
//删除包含在keys内的queue
int shm_mm_wrapper_remove_keys(int keys[], int length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> *mqueue;
  int count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
    if(mqueue == NULL) {
        continue;
    }
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
  }
  return count;
}
// 删除不在keys内的queue
int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<shm_packet_t> *mqueue;
  bool found;
  int count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
src/shm/shm_mm_wrapper.h
@@ -33,6 +33,19 @@
int shm_mm_wrapper_alloc_key();
/**
 * @brief 删除包含在keys内的queue
 * @return 删除的个数
 */
int shm_mm_wrapper_remove_keys(int keys[], int length);
/**
 * @brief 删除不在keys内的queue
 * @return 删除的个数
 */
int shm_mm_wrapper_remove_keys_exclude(int keys[], int length);
#ifdef __cplusplus
}
#endif
src/socket/shm_mod_socket.cpp
@@ -8,6 +8,11 @@
    return shm_socket_remove_keys(keys, length);
}
size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) {
    BusServerSocket::remove_subscripters(keys, length);
    return shm_socket_remove_keys_exclude(keys, length);
}
ShmModSocket::ShmModSocket() {
    shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    bus_set = new std::set<int>;
src/socket/shm_mod_socket.h
@@ -37,6 +37,7 @@
public:
    static size_t remove_keys(int keys[], size_t length);
    static size_t remove_keys_exclude(int keys[], size_t length);
  // bus header 编码为网络传输的字节
  static void * encode_bus_head(bus_head_t & bushead);
src/socket/shm_socket.cpp
@@ -65,7 +65,7 @@
  return queue;
}
//删除包含在keys内的queue
size_t shm_socket_remove_keys(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> *mqueue;
@@ -79,6 +79,38 @@
  }
  return count;
}
// 删除不在keys内的queue
size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<shm_packet_t> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  int s, type;
@@ -493,11 +525,15 @@
  if( sockt->queue != NULL) 
    goto LABEL_PUSH;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
 
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
    err_exit(rv, "shm_sendto : pthread_mutex_lock");
    if (sockt->queue == NULL) {
      if (sockt->key == 0) {
        sockt->key = hashtable_alloc_key(hashtable);
@@ -545,6 +581,10 @@
  if( sockt->queue != NULL) 
    goto LABEL_POP;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
src/socket/shm_socket.h
@@ -46,6 +46,7 @@
typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn;
size_t shm_socket_remove_keys(int keys[], size_t length);
size_t shm_socket_remove_keys_exclude(int keys[], size_t length);
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);