wangzhengquan
2021-02-05 607ac3ae8bfc017e10a7907e69dcbc3ab2a0fb63
src/socket/shm_socket.cpp
@@ -112,23 +112,31 @@
int shm_close_socket(shm_socket_t *sockt) {
  
  int s;
  int rv;
  logger->debug("shm_close_socket\n");
  if(sockt->queue != NULL) {
    delete sockt->queue;
    sockt->queue = NULL;
  }
  s =  pthread_mutex_destroy(&(sockt->mutex) );
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  rv =  pthread_mutex_destroy(&(sockt->mutex) );
  if(rv != 0) {
    err_exit(rv, "shm_close_socket");
  }
  free(sockt);
  return 0;
}
int shm_socket_stop(shm_socket_t *sockt) {
  struct timespec timeout = {5, 0};
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  sendpak.action = BUS_ACTION_STOP;
  sendpak.size = 0;
  return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG);
}
int shm_socket_bind(shm_socket_t *sockt, int key) {
  sockt->key = key;
@@ -153,11 +161,14 @@
  int rv;
 
  shm_packet_t sendpak;
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  sendpak.size = size;
  sendpak.buf = mm_malloc(size);
  memcpy(sendpak.buf, buf, size);
  if(buf != NULL) {
    sendpak.buf = mm_malloc(size);
    memcpy(sendpak.buf, buf, size);
  }
  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag);
  return rv;
}
@@ -172,12 +183,15 @@
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
  std::string uuid = sole::uuid4().str();
  
  sendpak.key = sockt->key;
  sendpak.size = send_size;
  sendpak.buf = mm_malloc(send_size);
  memcpy(sendpak.buf, send_buf, send_size);
  if(send_buf != NULL) {
    sendpak.buf = mm_malloc(send_size);
    memcpy(sendpak.buf, send_buf, send_size);
  }
  memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1);
  // uuid.copy(sendpak.uuid, sizeof sendpak.uuid);
  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags);
@@ -186,7 +200,7 @@
    return rv;
  }
  while(true) {
  while(tryn > 0) {
    tryn--;
    recvbufIter = sockt->recvbuf.find(uuid);
    if(recvbufIter != sockt->recvbuf.end()) {
@@ -194,36 +208,37 @@
logger->debug("get from recvbuf: %s", uuid.c_str());
      recvpak = recvbufIter->second;
      sockt->recvbuf.erase(recvbufIter);
      break;
      goto LABLE_SUC;
    }
    rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags);
    if (rv != 0) {
      if(rv == ETIMEDOUT)
      if(rv == ETIMEDOUT) {
        return EBUS_TIMEOUT;
      }
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      return rv;
    } 
logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
    if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
    if(strlen(recvpak.uuid) == 0) {
      continue;
    } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
      // 发送与接受的UUID匹配成功
      break;
      goto LABLE_SUC;
    } else {
      // 答非所问,放到缓存里
      sockt->recvbuf.insert({recvpak.uuid, recvpak});
      continue;
    }
    if(tryn == 0) {
      // 尝试了tryn次都没有成功
      return EBUS_RECVFROM_WRONG_END;
    }
  }
LABLE_FAIL:
  return EBUS_RECVFROM_WRONG_END;
  // return rv;
 
LABLE_SUC:
 if(recv_buf != NULL) {
@@ -239,6 +254,8 @@
  return 0;
}
/**
@@ -251,35 +268,41 @@
  
  int rv;
 
  void *sendbuf = NULL;
  void *sendbuf, *recvbuf = NULL;
  int sendsize = 0;
  shm_packet_t recvpak;
  shm_packet_t recvpak = {0};
  rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag);
  if (rv != 0) {
    if(rv == ETIMEDOUT){
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
      logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    }
    
    logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    return rv;
  }
   
  void *recvbuf = malloc(recvpak.size);
  memcpy(recvbuf, recvpak.buf, recvpak.size);
  mm_free(recvpak.buf);
  if(recvpak.buf != NULL) {
    recvbuf = malloc(recvpak.size);
    memcpy(recvbuf, recvpak.buf, recvpak.size);
    mm_free(recvpak.buf);
  }
  callback(recvbuf, recvpak.size, recvpak.key, &sendbuf, &sendsize, user_data);
  shm_packet_t sendpak;
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  sendpak.size = sendsize;
  memcpy(sendpak.uuid, recvpak.uuid, sizeof sendpak.uuid);
  if(sendbuf !=NULL && sendsize > 0) {
    sendpak.buf = mm_malloc(sendsize);
    memcpy(sendpak.buf, sendbuf, sendsize);
  } else {
    logger->warn("%d shm_recvandsend : sendbuf is null", shm_socket_get_key(sockt));
    // return -1;
  }
 
  rv = shm_sendpakto(sockt, &sendpak, recvpak.key, timeout, flag);
@@ -307,7 +330,7 @@
  } 
 if(buf != NULL) {
 if(buf != NULL && recvpak.buf != NULL) {
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
    *buf = _buf; 
@@ -493,7 +516,7 @@
  
 
 LABEL_PUSH: 
  if (key == sockt->key) {
  if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) {
    logger->error( "can not send to your self!");
    return EBUS_SENDTO_SELF;
  }
@@ -513,10 +536,11 @@
}
// 短连接方式接受
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag) {
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
  int rv;
  
  hashtable_t *hashtable = mm_get_hashtable();
  shm_packet_t recvpak;
  if( sockt->queue != NULL) 
    goto LABEL_POP;
@@ -543,10 +567,18 @@
  
LABEL_POP:
 //
  // printf("%p start recv.....\n", sockt);
 
  rv = sockt->queue->pop(*recvpak, timeout, flag);
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0)
    return rv;
  if(recvpak.action == BUS_ACTION_STOP) {
    return EBUS_STOPED;
  }
  *_recvpak = recvpak;
  return rv;
}
// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,