| | |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | ShmQueueStMap * shmQueueStMap ; |
| | | // ShmQueueStMap * shmQueueStMap ; |
| | | |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | |
| | | |
| | | |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { |
| | | queue = new LockFreeQueue<shm_packet_t>(1024); |
| | | queue = new LockFreeQueue<shm_packet_t>(32); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | hashtable_unlock(hashtable); |
| | | return queue; |
| | |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | |
| | | shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | // shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | |
| | | return sockt; |
| | | } |
| | |
| | | static int _shm_socket_close_(shm_socket_t *sockt) { |
| | | |
| | | int rv, i; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | logger->debug("shm_socket_close\n"); |
| | | |
| | | |
| | | if(sockt->key != 0) { |
| | | auto it = shmQueueStMap->find(sockt->key); |
| | | if(it != shmQueueStMap->end()) { |
| | | it->second.status = SHM_QUEUE_ST_CLOSED; |
| | | it->second.closeTime = time(NULL); |
| | | } |
| | | } |
| | | // if(sockt->key != 0) { |
| | | // auto it = shmQueueStMap->find(sockt->key); |
| | | // if(it != shmQueueStMap->end()) { |
| | | // it->second.status = SHM_QUEUE_ST_CLOSED; |
| | | // it->second.closeTime = time(NULL); |
| | | // } |
| | | // } |
| | | |
| | | |
| | | printf("====sockt->queue addr = %p\n", sockt->queue); |
| | | // printf("====sockt->queue addr = %p\n", sockt->queue); |
| | | |
| | | if(sockt->queue != NULL) { |
| | | sockt->queue->close(); |
| | | for( i = 0; i < sockt->queue->size(); i++) { |
| | | mm_free((*(sockt->queue))[i].buf); |
| | | logger->info("======= %d free queue element buf\n", sockt->key); |
| | | } |
| | | sleep(1); |
| | | |
| | | // hashtable_remove(hashtable, mkey); |
| | | hashtable_remove(hashtable, sockt->key); |
| | | // sockt->queue = NULL; |
| | | } |
| | | |
| | | |
| | | // hashtable_remove(hashtable, mkey); |
| | | // if(sockt->queue != NULL) { |
| | | // sockt->queue = NULL; |
| | | // } |
| | | |
| | | |
| | | pthread_mutex_destroy(&(sockt->mutex) ); |
| | | free(sockt); |
| | | return 0; |
| | |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap->insert({sockt->key, stRecord}); |
| | | // stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // 检查key标记的状态 |
| | | auto it = shmQueueStMap->find(key); |
| | | if(it != shmQueueStMap->end()) { |
| | | if(it->second.status == SHM_QUEUE_ST_CLOSED) { |
| | | // key对应的状态是关闭的 |
| | | goto ERR_CLOSED; |
| | | } |
| | | } |
| | | // auto it = shmQueueStMap->find(key); |
| | | // if(it != shmQueueStMap->end()) { |
| | | // if(it->second.status == SHM_QUEUE_ST_CLOSED) { |
| | | // // key对应的状态是关闭的 |
| | | // goto ERR_CLOSED; |
| | | // } |
| | | // } |
| | | |
| | | remoteQueue = shm_socket_attach_queue(key); |
| | | |
| | | if (remoteQueue == NULL ) { |
| | | goto ERR_CLOSED; |
| | | } else if(remoteQueue->isClosed()) { |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap->insert({sockt->key, stRecord}); |
| | | // stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |