wangzhengquan
2021-02-25 f52f2c2828047c2f30d30fc1fe2b54d8db146d49
src/socket/shm_socket.cpp
@@ -17,10 +17,10 @@
}
static pthread_once_t _once_ = PTHREAD_ONCE_INIT;
static pthread_key_t _perthread_socket_key_;
static pthread_key_t _localthread_socket_key_;
static void _destrory_socket_perthread(void *tmp_socket);
static void _create_socket_key_perthread(void);
static void _destrory_threadlocal_socket_(void *tmp_socket);
static void _create_threadlocal_socket_key_(void);
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag);
@@ -52,7 +52,7 @@
    return queue;
  } else if(force) {
    hashtable_unlock(hashtable);
    return (LockFreeQueue<shm_packet_t> *) queue;
    return (LockFreeQueue<shm_packet_t> *) tmp_ptr;
  }
  
  hashtable_unlock(hashtable);
@@ -136,6 +136,11 @@
int shm_socket_close(shm_socket_t *sockt) {
  shm_socket_t * threadlocal_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_);
  if(threadlocal_socket != NULL) {
    _destrory_threadlocal_socket_(threadlocal_socket);
  }
  return _shm_socket_close_(sockt);
}
@@ -283,15 +288,17 @@
// =================================================================================================
 /* Free thread-specific data buffer */
static void _destrory_socket_perthread(void *tmp_socket)
static void _destrory_threadlocal_socket_(void *tmp_socket)
{
  int rv;
  logger->debug("%d destroy threadlocal socket\n", pthread_self());
  if(tmp_socket == NULL)
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self());
  _shm_socket_close_((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_perthread_socket_key_, NULL);
  rv =  pthread_setspecific(_localthread_socket_key_, NULL);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
    exit(1);
@@ -299,14 +306,14 @@
}
/* One-time key creation function */
static void _create_socket_key_perthread(void)
static void _create_threadlocal_socket_key_(void)
{
  int s;
  /* Allocate a unique thread-specific data key and save the address
     of the destructor for thread-specific data buffers */
  s = pthread_key_create(&_perthread_socket_key_, _destrory_socket_perthread);
  //s = pthread_key_create(&_perthread_socket_key_, NULL);
  s = pthread_key_create(&_localthread_socket_key_, _destrory_threadlocal_socket_);
  //s = pthread_key_create(&_localthread_socket_key_, NULL);
  if (s != 0) {
     logger->error(s, "pthread_key_create");
     exit(1);
@@ -403,22 +410,22 @@
  shm_packet_t recvpak;
  std::map<int, shm_packet_t>::iterator recvbufIter;
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  shm_socket_t *tmp_socket;
  shm_socket_t *tmp_socket = NULL;
 
  rv = pthread_once(&_once_, _create_socket_key_perthread);
  rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
  if (rv != 0) {
    logger->error(rv, "shm_sendandrecv pthread_once");
    exit(1);
  }
  tmp_socket = (shm_socket_t *)pthread_getspecific(_perthread_socket_key_);
  tmp_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_);
  if (tmp_socket == NULL)
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("%ld create tmp socket\n", (long)pthread_self() );
    logger->debug("%lu create threadlocal socket\n", (long)pthread_self() );
    tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    rv =  pthread_setspecific(_perthread_socket_key_, tmp_socket);
    rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
    if ( rv != 0) {
      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
      exit(1);
@@ -502,7 +509,7 @@
      tryn++;
      rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
      if(rv != 0) {
        logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
        logger->error("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv));
        return rv;
      }
     
@@ -537,9 +544,9 @@
  if( sockt->queue != NULL) 
    goto LABEL_PUSH;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
  // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
  //   return EBUS_EXCEED_LIMIT;
  // }
 
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
@@ -614,9 +621,9 @@
  if( sockt->queue != NULL) 
    goto LABEL_POP;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
  // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
  //   return EBUS_EXCEED_LIMIT;
  // }
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
@@ -646,15 +653,6 @@
  
LABEL_POP:
  // 检查key标记的状态
  // auto shmQueueMapIter =  shmQueueStMap->find(sockt->key);
  // if(shmQueueMapIter != shmQueueStMap->end()) {
  //   stRecord = shmQueueMapIter->second;
  //   if(stRecord.status = SHM_QUEUE_ST_CLOSED) {
  //     // key对应的状态是关闭的
  //     goto ERR_CLOSED;
  //   }
  // }
 
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0) {