From 0f99e00822866416e29d136db06f9f2863c1088c Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 29 三月 2021 14:39:03 +0800 Subject: [PATCH] update --- src/socket/shm_socket.cpp | 83 ++++++++++++++++++++++++----------------- 1 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 2b3e626..8025564 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -10,7 +10,7 @@ static Logger *logger = LoggerFactory::getLogger(); -ShmQueueStMap * shmQueueStMap ; +// ShmQueueStMap * shmQueueStMap ; static void print_msg(char *head, shm_packet_t &msg) { // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); @@ -46,7 +46,7 @@ if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { - queue = new LockFreeQueue<shm_packet_t>(16); + queue = new LockFreeQueue<shm_packet_t>(32); hashtable_put(hashtable, key, (void *)queue); hashtable_unlock(hashtable); return queue; @@ -104,7 +104,7 @@ err_exit(s, "pthread_mutexattr_destroy"); - shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); + // shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); return sockt; } @@ -112,23 +112,34 @@ static int _shm_socket_close_(shm_socket_t *sockt) { - int rv; + int rv, i; + hashtable_t *hashtable = mm_get_hashtable(); logger->debug("shm_socket_close\n"); - // hashtable_remove(hashtable, mkey); - // if(sockt->queue != NULL) { - // sockt->queue = NULL; - // } - - if(sockt->key != 0) { - auto it = shmQueueStMap->find(sockt->key); - if(it != shmQueueStMap->end()) { - it->second.status = SHM_QUEUE_ST_CLOSED; - it->second.closeTime = time(NULL); - } - } - + // if(sockt->key != 0) { + // auto it = shmQueueStMap->find(sockt->key); + // if(it != shmQueueStMap->end()) { + // it->second.status = SHM_QUEUE_ST_CLOSED; + // it->second.closeTime = time(NULL); + // } + // } + + + // printf("====sockt->queue addr = %p\n", sockt->queue); + + if(sockt->queue != NULL) { + sockt->queue->close(); + for( i = 0; i < sockt->queue->size(); i++) { + mm_free((*(sockt->queue))[i].buf); + logger->info("======= %d free queue element buf\n", sockt->key); + } + sleep(1); + + hashtable_remove(hashtable, sockt->key); + // sockt->queue = NULL; + } + pthread_mutex_destroy(&(sockt->mutex) ); free(sockt); return 0; @@ -168,8 +179,6 @@ int shm_socket_get_key(shm_socket_t *sockt){ return sockt->key; } - - // 鐭繛鎺ユ柟寮忓彂閫� int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, @@ -292,7 +301,7 @@ { int rv; - logger->debug("%d destroy threadlocal socket\n", pthread_self()); + logger->debug("%lu destroy threadlocal socket\n", pthread_self()); if(tmp_socket == NULL) return; @@ -325,7 +334,7 @@ const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { - int rv, tryn = 6; + int rv, tryn = 16; shm_packet_t sendpak; shm_packet_t recvpak; std::map<std::string, shm_packet_t>::iterator recvbufIter; @@ -565,9 +574,9 @@ } // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened - stRecord.status = SHM_QUEUE_ST_OPENED; - stRecord.createTime = time(NULL); - shmQueueStMap->insert({sockt->key, stRecord}); + // stRecord.status = SHM_QUEUE_ST_OPENED; + // stRecord.createTime = time(NULL); + // shmQueueStMap->insert({sockt->key, stRecord}); } @@ -584,22 +593,28 @@ } // 妫�鏌ey鏍囪鐨勭姸鎬� - auto it = shmQueueStMap->find(key); - if(it != shmQueueStMap->end()) { - if(it->second.status == SHM_QUEUE_ST_CLOSED) { - // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨� - goto ERR_CLOSED; - } - } + // auto it = shmQueueStMap->find(key); + // if(it != shmQueueStMap->end()) { + // if(it->second.status == SHM_QUEUE_ST_CLOSED) { + // // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨� + // goto ERR_CLOSED; + // } + // } remoteQueue = shm_socket_attach_queue(key); if (remoteQueue == NULL ) { goto ERR_CLOSED; + } else if(remoteQueue->isClosed()) { + goto ERR_CLOSED; } sendpak->key = sockt->key; rv = remoteQueue->push(*sendpak, timeout, flag); + + if(rv != 0) { + mm_free(sendpak->buf); + } if(rv == ETIMEDOUT) { return EBUS_TIMEOUT; } @@ -642,9 +657,9 @@ } // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened - stRecord.status = SHM_QUEUE_ST_OPENED; - stRecord.createTime = time(NULL); - shmQueueStMap->insert({sockt->key, stRecord}); + // stRecord.status = SHM_QUEUE_ST_OPENED; + // stRecord.createTime = time(NULL); + // shmQueueStMap->insert({sockt->key, stRecord}); if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); -- Gitblit v1.8.0