wangzhengquan
2021-02-05 31b6fc52f49449189e31e13626c392d4c576c833
update
1个文件已修改
176 ■■■■ 已修改文件
src/socket/shm_socket.cpp 176 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp
@@ -26,6 +26,15 @@
static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak,
               const int key, const struct timespec *timeout, const int flag);
static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf,
                    const int send_size, const int key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags);
static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) ;
//  检查key是否已经被使用, 未被使用则绑定key
static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) {
  hashtable_t *hashtable = mm_get_hashtable();
@@ -205,90 +214,12 @@
  return rv;
}
int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
                    const int send_size, const int key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
}
  int rv, tryn = 3;
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
  std::string uuid = sole::uuid4().str();
  sendpak.key = sockt->key;
  sendpak.size = send_size;
  if(send_buf != NULL) {
    sendpak.buf = mm_malloc(send_size);
    memcpy(sendpak.buf, send_buf, send_size);
  }
  memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1);
  // uuid.copy(sendpak.uuid, sizeof sendpak.uuid);
  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags);
  if(rv != 0) {
    return rv;
  }
  while(tryn > 0) {
    tryn--;
    recvbufIter = sockt->recvbuf.find(uuid);
    if(recvbufIter != sockt->recvbuf.end()) {
      // 在缓存里查到了UUID匹配成功的
logger->debug("get from recvbuf: %s", uuid.c_str());
      recvpak = recvbufIter->second;
      sockt->recvbuf.erase(recvbufIter);
      goto LABLE_SUC;
    }
    rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags);
    if (rv != 0) {
      if(rv == ETIMEDOUT) {
        return EBUS_TIMEOUT;
      }
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      return rv;
    }
logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
    if(strlen(recvpak.uuid) == 0) {
      continue;
    } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
      // 发送与接受的UUID匹配成功
      goto LABLE_SUC;
    } else {
      // 答非所问,放到缓存里
      sockt->recvbuf.insert({recvpak.uuid, recvpak});
      continue;
    }
  }
LABLE_FAIL:
  return EBUS_RECVFROM_WRONG_END;
  // return rv;
LABLE_SUC:
 if(recv_buf != NULL) {
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
    *recv_buf = _buf;
  }
  if(recv_size != NULL)
    *recv_size = recvpak.size;
  mm_free(recvpak.buf);
  return 0;
}
/**
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize) 
@@ -413,8 +344,91 @@
}
static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf,
                    const int send_size, const int key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int rv, tryn = 3;
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
  std::string uuid = sole::uuid4().str();
  sendpak.key = sockt->key;
  sendpak.size = send_size;
  if(send_buf != NULL) {
    sendpak.buf = mm_malloc(send_size);
    memcpy(sendpak.buf, send_buf, send_size);
  }
  memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1);
  // uuid.copy(sendpak.uuid, sizeof sendpak.uuid);
  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags);
  if(rv != 0) {
    return rv;
  }
  while(tryn > 0) {
    tryn--;
    recvbufIter = sockt->recvbuf.find(uuid);
    if(recvbufIter != sockt->recvbuf.end()) {
      // 在缓存里查到了UUID匹配成功的
logger->debug("get from recvbuf: %s", uuid.c_str());
      recvpak = recvbufIter->second;
      sockt->recvbuf.erase(recvbufIter);
      goto LABLE_SUC;
    }
    rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags);
    if (rv != 0) {
      if(rv == ETIMEDOUT) {
        return EBUS_TIMEOUT;
      }
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      return rv;
    }
logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
    if(strlen(recvpak.uuid) == 0) {
      continue;
    } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
      // 发送与接受的UUID匹配成功
      goto LABLE_SUC;
    } else {
      // 答非所问,放到缓存里
      sockt->recvbuf.insert({recvpak.uuid, recvpak});
      continue;
    }
  }
LABLE_FAIL:
  return EBUS_RECVFROM_WRONG_END;
  // return rv;
LABLE_SUC:
 if(recv_buf != NULL) {
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
    *recv_buf = _buf;
  }
  if(recv_size != NULL)
    *recv_size = recvpak.size;
  mm_free(recvpak.buf);
  return 0;
}
// use thread local
int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf,
static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int recv_key;