wangzhengquan
2021-02-22 45e00aca28504b27f3ad6b4abf364c3d57f34510
src/socket/shm_socket.cpp
@@ -6,6 +6,7 @@
#include "bus_error.h"
#include "sole.h"
#include "shm_mm.h"
#include "key_def.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -109,7 +110,7 @@
}
int shm_socket_close(shm_socket_t *sockt) {
static int _shm_socket_close_(shm_socket_t *sockt) {
  
  int rv;
  logger->debug("shm_socket_close\n");
@@ -118,17 +119,27 @@
  //   sockt->queue = NULL;
  // }
  pthread_mutex_destroy(&(sockt->mutex) );
  // hashtable_remove(hashtable, mkey);
  free(sockt);
  auto it =  shmQueueStMap.find(key);
  if(it != shmQueueStMap.end()) {
    it->second.status = SHM_QUEUE_ST_CLOSED
    it->second.closeTime = time(NULL);
  if(sockt->key != 0) {
    auto it =  shmQueueStMap->find(sockt->key);
    if(it != shmQueueStMap->end()) {
      it->second.status = SHM_QUEUE_ST_CLOSED;
      it->second.closeTime = time(NULL);
    }
  }
  pthread_mutex_destroy(&(sockt->mutex) );
  free(sockt);
  return 0;
}
int shm_socket_close(shm_socket_t *sockt) {
  return _shm_socket_close_(sockt);
}
@@ -283,7 +294,7 @@
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self()); 
  shm_socket_close((shm_socket_t *)tmp_socket);
  _shm_socket_close_((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_perthread_socket_key_, NULL);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -311,7 +322,7 @@
                    const int send_size, const int key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int rv, tryn = 3;
  int rv, tryn = 6;
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
@@ -520,7 +531,7 @@
    return EBUS_RECVFROM_WRONG_END;
  } 
   
  shm_socket_close(tmp_socket);
  _shm_socket_close_(tmp_socket);
  return rv;
 
}
@@ -532,6 +543,7 @@
  int rv;
  shm_queue_status_t stRecord;
  LockFreeQueue<shm_packet_t> *remoteQueue;
  hashtable_t *hashtable = mm_get_hashtable();
  if( sockt->queue != NULL) 
@@ -558,7 +570,7 @@
      // 标记key对应的状态 ,为opened
      stRecord.status = SHM_QUEUE_ST_OPENED;
      stRecord.createTime = time(NULL);
      shmQueueStMap.insert({sockt->key, stRecord});
      shmQueueStMap->insert({sockt->key, stRecord});
      
    }
@@ -575,15 +587,15 @@
  }
  // 检查key标记的状态
  auto it =  shmQueueStMap.find(key);
  if(it != shmQueueStMap.end()) {
  auto it =  shmQueueStMap->find(key);
  if(it != shmQueueStMap->end()) {
    if(it->second.status == SHM_QUEUE_ST_CLOSED) {
      // key对应的状态是关闭的
      goto ERR_CLOSED;
    }
  }
  LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key);
  remoteQueue = shm_socket_attach_queue(key);
  if (remoteQueue == NULL ) {
    goto ERR_CLOSED;
@@ -629,7 +641,7 @@
    // 标记key对应的状态 ,为opened
    stRecord.status = SHM_QUEUE_ST_OPENED;
    stRecord.createTime = time(NULL);
    shmQueueStMap.insert({sockt->key, stRecord});
    shmQueueStMap->insert({sockt->key, stRecord});
    
    if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -639,8 +651,8 @@
LABEL_POP:
  // 检查key标记的状态
  // auto shmQueueMapIter =  shmQueueStMap.find(sockt->key);
  // if(shmQueueMapIter != shmQueueStMap.end()) {
  // auto shmQueueMapIter =  shmQueueStMap->find(sockt->key);
  // if(shmQueueMapIter != shmQueueStMap->end()) {
  //   stRecord = shmQueueMapIter->second;
  //   if(stRecord.status = SHM_QUEUE_ST_CLOSED) {
  //     // key对应的状态是关闭的