| | |
| | | "Network fault", |
| | | "Send to self error", |
| | | "Receive from wrong end", |
| | | "Service stoped" |
| | | "Service stoped", |
| | | "Exceed resource limit" |
| | | |
| | | }; |
| | | |
| | |
| | | #define EBUS_SENDTO_SELF 505 |
| | | #define EBUS_RECVFROM_WRONG_END 506 |
| | | #define EBUS_STOPED 507 |
| | | #define EBUS_EXCEED_LIMIT 508 |
| | | |
| | | extern int bus_errno; |
| | | |
| | |
| | | }; |
| | | |
| | | // @deprecate |
| | | template <typename ELEM_T> |
| | | size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | bool found; |
| | | size_t count = 0; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for (size_t i = 0; i < length; i++) { |
| | | if (*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | if (!found) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, *keyItr); |
| | | count++; |
| | | } |
| | | } |
| | | delete keyset; |
| | | return count; |
| | | } |
| | | // template <typename ELEM_T> |
| | | // size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { |
| | | // hashtable_t *hashtable = mm_get_hashtable(); |
| | | // std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | // std::set<int>::iterator keyItr; |
| | | // LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | // bool found; |
| | | // size_t count = 0; |
| | | // for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | // found = false; |
| | | // for (size_t i = 0; i < length; i++) { |
| | | // if (*keyItr == keys[i]) { |
| | | // found = true; |
| | | // break; |
| | | // } |
| | | // } |
| | | // if (!found && *keyItr > 100) { |
| | | // // 销毁共享内存的queue |
| | | // mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | // delete mqueue; |
| | | // hashtable_remove(hashtable, *keyItr); |
| | | // count++; |
| | | // } |
| | | // } |
| | | // delete keyset; |
| | | // return count; |
| | | // } |
| | | |
| | | |
| | | |
| | |
| | | #include "logger_factory.h" |
| | | #include <set> |
| | | #include <functional> |
| | | #include <limits.h> |
| | | |
| | | |
| | | |
| | | typedef struct tailq_entry_t |
| | | { |
| | |
| | | |
| | | memset(hashtable, 0, sizeof(hashtable_t)); |
| | | hashtable->mutex = svsem_get(IPC_PRIVATE, 1); |
| | | // hashtable->wlock = svsem_get(IPC_PRIVATE, 1); |
| | | // hashtable->cond = svsem_get(IPC_PRIVATE, 1); |
| | | // hashtable->readcnt = 0; |
| | | |
| | | // FILE * semfile = fopen("./sem.txt", "w+"); |
| | | // if(semfile == NULL) { |
| | | // err_exit(errno, "fopen"); |
| | | // } |
| | | // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex); |
| | | // fclose(semfile); |
| | | hashtable->queueCount = 0; |
| | | hashtable->currentKey = START_KEY; |
| | | } |
| | | |
| | | void hashtable_destroy(hashtable_t *hashtable) { |
| | | svsem_remove( hashtable->mutex); |
| | | // svsem_remove( hashtable->wlock); |
| | | // svsem_remove( hashtable->cond); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | /* mm_free the item as we don't need it anymore. */ |
| | | mm_free(item); |
| | | hashtable->queueCount--; |
| | | svsem_post(hashtable->mutex); |
| | | return oldvalue; |
| | | } |
| | |
| | | |
| | | void hashtable_put(hashtable_t *hashtable, int key, void *value) { |
| | | _hashtable_put(hashtable, key, value); |
| | | hashtable->queueCount++; |
| | | } |
| | | |
| | | bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | |
| | | int hashtable_get_queue_count(hashtable_t *hashtable) { |
| | | return hashtable->queueCount; |
| | | } |
| | | |
| | | int hashtable_alloc_key(hashtable_t *hashtable) { |
| | | int rv; |
| | | int key = START_KEY; |
| | | int key = hashtable->currentKey; |
| | | |
| | | if( key == INT_MAX || key < START_KEY) { |
| | | key = START_KEY; |
| | | } |
| | | |
| | | rv = svsem_wait(hashtable->mutex); |
| | | if(rv != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n"); |
| | |
| | | // 占用key |
| | | _hashtable_put(hashtable, key, (void *)1); |
| | | |
| | | hashtable->currentKey = key; |
| | | rv = svsem_post(hashtable->mutex); |
| | | if(rv != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n"); |
| | | } |
| | | |
| | | return key; |
| | | } |
| | | |
| | |
| | | hashtable->array[i] = NULL; |
| | | } |
| | | |
| | | hashtable->queueCount = 0; |
| | | if((rv = svsem_post(hashtable->mutex)) != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n"); |
| | | } |
| | |
| | | |
| | | #define MAPSIZE 1024 |
| | | |
| | | // 创建Queue数量的上限 |
| | | #define QUEUE_COUNT_LIMIT 300 |
| | | |
| | | typedef struct hashtable_t |
| | | { |
| | | struct tailq_header_t* array[MAPSIZE]; |
| | | int mutex; |
| | | int queueCount; |
| | | int currentKey; // 当前分配的key |
| | | // int wlock; |
| | | // int cond; |
| | | // size_t readcnt; |
| | | |
| | | |
| | | |
| | | } hashtable_t; |
| | | typedef void (*hashtable_foreach_cb)(int key, void *value); |
| | |
| | | |
| | | int hashtable_lock(hashtable_t *hashtable); |
| | | int hashtable_unlock(hashtable_t *hashtable); |
| | | |
| | | int hashtable_get_queue_count(hashtable_t *hashtable) ; |
| | | /** |
| | | * 遍历hash_table |
| | | * @demo |
| | |
| | | #include "shm_mm_wrapper.h" |
| | | #include "mem_pool.h" |
| | | #include "hashtable.h" |
| | | #include "lock_free_queue.h" |
| | | #include "shm_socket.h" |
| | | |
| | | void shm_mm_wrapper_init(int size) { |
| | | mem_pool_init(size); |
| | |
| | | int shm_mm_wrapper_alloc_key() { |
| | | return mm_alloc_key(); |
| | | } |
| | | |
| | | |
| | | |
| | | //删除包含在keys内的queue |
| | | int shm_mm_wrapper_remove_keys(int keys[], int length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | int count = 0; |
| | | for(int i = 0; i< length; i++) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); |
| | | if(mqueue == NULL) { |
| | | continue; |
| | | } |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, keys[i]); |
| | | count++; |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | |
| | | // 删除不在keys内的queue |
| | | int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | bool found; |
| | | int count = 0; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for (size_t i = 0; i < length; i++) { |
| | | if (*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | // 100内的是bus内部自己用的 |
| | | if (!found && *keyItr > 100) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr); |
| | | |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, *keyItr); |
| | | count++; |
| | | } |
| | | } |
| | | delete keyset; |
| | | return count; |
| | | } |
| | |
| | | int shm_mm_wrapper_alloc_key(); |
| | | |
| | | |
| | | /** |
| | | * @brief 删除包含在keys内的queue |
| | | * @return 删除的个数 |
| | | */ |
| | | int shm_mm_wrapper_remove_keys(int keys[], int length); |
| | | |
| | | /** |
| | | * @brief 删除不在keys内的queue |
| | | * @return 删除的个数 |
| | | */ |
| | | int shm_mm_wrapper_remove_keys_exclude(int keys[], int length); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | |
| | | return shm_socket_remove_keys(keys, length); |
| | | } |
| | | |
| | | size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) { |
| | | BusServerSocket::remove_subscripters(keys, length); |
| | | return shm_socket_remove_keys_exclude(keys, length); |
| | | } |
| | | |
| | | ShmModSocket::ShmModSocket() { |
| | | shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | bus_set = new std::set<int>; |
| | |
| | | |
| | | public: |
| | | static size_t remove_keys(int keys[], size_t length); |
| | | static size_t remove_keys_exclude(int keys[], size_t length); |
| | | |
| | | // bus header 编码为网络传输的字节 |
| | | static void * encode_bus_head(bus_head_t & bushead); |
| | |
| | | return queue; |
| | | } |
| | | |
| | | |
| | | //删除包含在keys内的queue |
| | | size_t shm_socket_remove_keys(int keys[], size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | |
| | | // 删除不在keys内的queue |
| | | size_t shm_socket_remove_keys_exclude(int keys[], size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | bool found; |
| | | size_t count = 0; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for (size_t i = 0; i < length; i++) { |
| | | if (*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | // 100内的是bus内部自己用的 |
| | | if (!found && *keyItr > 100) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, *keyItr); |
| | | count++; |
| | | } |
| | | } |
| | | delete keyset; |
| | | return count; |
| | | } |
| | | |
| | | |
| | | |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { |
| | | int s, type; |
| | |
| | | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_PUSH; |
| | | |
| | | if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | | return EBUS_EXCEED_LIMIT; |
| | | } |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_lock"); |
| | | |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_POP; |
| | | |
| | | if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | | return EBUS_EXCEED_LIMIT; |
| | | } |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | |
| | | typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn; |
| | | |
| | | size_t shm_socket_remove_keys(int keys[], size_t length); |
| | | size_t shm_socket_remove_keys_exclude(int keys[], size_t length); |
| | | |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type); |
| | | |