wangzhengquan
2021-01-13 940bb9e9238488025bf41eb2b2d3df077274004f
src/socket/shm_socket.cpp
@@ -386,9 +386,9 @@
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    if(errno == EAGAIN) {
    if(errno == ETIMEDOUT) {
      bus_errno = EBUS_TIMEOUT;
      logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno));
      logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    } else {
      logger->error(errno, "sendto key %d failed!", key);
@@ -498,8 +498,8 @@
}
int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf,
// use thread local
int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  int recv_key;
@@ -547,6 +547,35 @@
  return -1;
}
int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  int recv_key;
  int rv;
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  shm_socket_t *tmp_socket;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  /* If first call from this thread, allocate buffer for thread, and save its location */
  // logger->debug("%d create tmp socket\n", pthread_self() );
  tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
  }
  shm_close_socket(tmp_socket);
  return rv;
}
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
@@ -572,7 +601,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  return  shm_sendandrecv_safe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
  return  shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
}