#include "shm_mm_wrapper.h"
|
#include "mem_pool.h"
|
#include "hashtable.h"
|
#include "lock_free_queue.h"
|
#include "shm_socket.h"
|
|
#define BUFFER_TIME 10
|
|
|
void shm_mm_wrapper_init(int size) {
|
shm_mm_init(size);
|
|
}
|
|
void shm_mm_wrapper_destroy() {
|
shm_mm_destroy();
|
}
|
|
int shm_mm_wrapper_alloc_key() {
|
return mm_alloc_key();
|
}
|
|
|
/**
|
* 回收假删除的key
|
*/
|
int shm_mm_wrapper_start_resycle() {
|
ShmQueueStMap * shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
|
hashtable_t *hashtable = mm_get_hashtable();
|
LockFreeQueue<shm_packet_t> *mqueue;
|
while(true) {
|
for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) {
|
if(it->second.status = SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > 2 ) {
|
mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
|
if(mqueue != NULL) {
|
delete mqueue;
|
hashtable_remove(hashtable, it->first);
|
printf("reove queue %d\n", it->first);
|
// 不能 erase ,否则会出现多进程之间的同步问题, 而这正是这里要解决的问题
|
// it = shmQueueStMap->erase(it);
|
// continue;
|
}
|
}
|
}
|
|
sleep(1);
|
}
|
}
|
|
//删除包含在keys内的queue
|
int shm_mm_wrapper_remove_keys(int keys[], int length) {
|
hashtable_t *hashtable = mm_get_hashtable();
|
LockFreeQueue<shm_packet_t> *mqueue;
|
int count = 0;
|
for(int i = 0; i< length; i++) {
|
// 销毁共享内存的queue
|
mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
|
if(mqueue == NULL) {
|
continue;
|
}
|
if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
|
delete mqueue;
|
hashtable_remove(hashtable, keys[i]);
|
LoggerFactory::getLogger()->debug("remove queue %d", keys[i]);
|
count++;
|
}
|
|
}
|
return count;
|
}
|
|
|
// 删除不在keys内的queue
|
int shm_mm_wrapper_remove_keys_exclude(int keys[], int length) {
|
hashtable_t *hashtable = mm_get_hashtable();
|
std::set<int> *keyset = hashtable_keyset(hashtable);
|
std::set<int>::iterator keyItr;
|
LockFreeQueue<shm_packet_t> *mqueue;
|
bool found;
|
int count = 0;
|
for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
|
found = false;
|
for (size_t i = 0; i < length; i++) {
|
if (*keyItr == keys[i]) {
|
found = true;
|
break;
|
}
|
}
|
// 100内的是bus内部自己用的
|
if (!found && *keyItr > 100) {
|
// 销毁共享内存的queue
|
mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
|
if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) {
|
delete mqueue;
|
hashtable_remove(hashtable, *keyItr);
|
LoggerFactory::getLogger()->debug("remove queue %d", *keyItr);
|
count++;
|
}
|
|
}
|
}
|
delete keyset;
|
return count;
|
}
|