#include "shm_mm_wrapper.h" #include "shm_mm.h" #include "hashtable.h" #include "lock_free_queue.h" #include "shm_socket.h" #define BUFFER_TIME 1 void shm_mm_wrapper_init(int size) { shm_mm_init(size); } void shm_mm_wrapper_destroy() { shm_mm_destroy(); } int shm_mm_wrapper_alloc_key() { return mm_alloc_key(); } /** * 回收假删除的key */ int shm_mm_wrapper_start_resycle() { ShmQueueStMap * shmQueueStMap = shm_mm_attach(SHM_QUEUE_ST_KEY); hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue *mqueue; while(true) { for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) { if(it->second.status == SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > BUFFER_TIME ) { // mqueue = (LockFreeQueue *)hashtable_get(hashtable, keys[i]); // if(mqueue != NULL) { // delete mqueue; // } hashtable_remove(hashtable, it->first); printf("reomved queue %d\n\n", it->first); it->second.status = SHM_QUEUE_ST_RECYCLED; // 不能 erase ,否则会出现多进程之间的同步问题, 而这正是这里要解决的问题 // it = shmQueueStMap->erase(it); // continue; } } sleep(1); } return 0; } //删除包含在keys内的queue int shm_mm_wrapper_remove_keys(int keys[], int length) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue *mqueue; int count = 0; for(int i = 0; i< length; i++) { // 销毁共享内存的queue hashtable_remove(hashtable, keys[i]); LoggerFactory::getLogger()->debug("remove queue %d", keys[i]); count++; } return count; } // 删除不在keys内的queue int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) { hashtable_t *hashtable = mm_get_hashtable(); std::set *keyset = hashtable_keyset(hashtable); std::set::iterator keyItr; LockFreeQueue *mqueue; bool found; int count = 0; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { found = false; for (size_t i = 0; i < length; i++) { if (*keyItr == keys[i]) { found = true; break; } } // 100内的是bus内部自己用的 if (!found && *keyItr > 100) { // 销毁共享内存的queue hashtable_remove(hashtable, *keyItr); LoggerFactory::getLogger()->debug("remove queue %d", *keyItr); count++; } } delete keyset; return count; }