wangzhengquan
2021-02-22 044e10574fa4e007be408d991861d34ecf22622a
src/socket/shm_socket.cpp
@@ -25,7 +25,7 @@
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag);
   
static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak,
static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
               const int key, const struct timespec *timeout, const int flag);
@@ -114,14 +114,11 @@
  
  int rv;
  logger->debug("shm_socket_close\n");
  // hashtable_remove(hashtable, mkey);
  // if(sockt->queue != NULL) {
  //   delete sockt->queue;
  //   sockt->queue = NULL;
  // }
  // hashtable_remove(hashtable, mkey);
  if(sockt->key != 0) {
    auto it =  shmQueueStMap->find(sockt->key);
    if(it != shmQueueStMap->end()) {
@@ -257,12 +254,8 @@
  if (rv != 0) {
    if(rv == ETIMEDOUT)
      return EBUS_TIMEOUT;
    else {
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      return rv;
    }
    logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    return rv;
   
  } 
@@ -279,6 +272,9 @@
  if(key != NULL)
    *key = recvpak.key;
  if(recvpak.key == 0) {
    err_exit(0, "key = %d, pid= %d, recvpak.key == 0",  shm_socket_get_key(sockt), getpid());
  }
  mm_free(recvpak.buf);
  return 0;
}
@@ -402,15 +398,13 @@
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  
 
  int rv, tryn = 6;
  int rv = 0, tryn = 16;
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<int, shm_packet_t>::iterator recvbufIter;
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  shm_socket_t *tmp_socket;
 
  /* If first call from this thread, allocate buffer for thread, and save its location */
  // logger->debug("%d create tmp socket\n", pthread_self() );
  rv = pthread_once(&_once_, _create_socket_key_perthread);
  if (rv != 0) {
    logger->error(rv, "shm_sendandrecv pthread_once");
@@ -431,7 +425,6 @@
    }
  }
 
  sendpak.key = tmp_socket->key;
  sendpak.size = send_size;
  if(send_buf != NULL) {
@@ -458,11 +451,6 @@
    rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags);
    if (rv != 0) {
      if(rv == ETIMEDOUT) {
        return EBUS_TIMEOUT;
      }
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
      return rv;
    } 
@@ -538,7 +526,7 @@
 
   
static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak,
static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
               const int key, const struct timespec *timeout, const int flag) {
  int rv;
@@ -564,6 +552,8 @@
      sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
      if(sockt->queue  == NULL ) {
        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
          err_exit(rv, "shm_sendto : pthread_mutex_unlock");
        return EBUS_KEY_INUSED;
      }
@@ -601,7 +591,11 @@
    goto ERR_CLOSED;
  }
  sendpak->key = sockt->key;
  rv = remoteQueue->push(*sendpak, timeout, flag);
  if(rv == ETIMEDOUT) {
    return EBUS_TIMEOUT;
  }
  return rv;
ERR_CLOSED:
@@ -635,6 +629,8 @@
    sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
    if(sockt->queue  == NULL ) {
      logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
      if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
        err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
      return EBUS_KEY_INUSED;
    }
    
@@ -661,14 +657,18 @@
  // }
 
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0)
  if(rv != 0) {
    if(rv == ETIMEDOUT) {
      return EBUS_TIMEOUT;
    }
    return rv;
  }
  
  if(recvpak.action == BUS_ACTION_STOP) {
    return EBUS_STOPED;
  }
  *_recvpak = recvpak;
  return rv;
  return 0;
}