wangzhengquan
2021-02-05 14c345b38d57fd814f217eb8465963a08ca79f7e
src/socket/shm_socket.cpp
@@ -65,7 +65,7 @@
  return queue;
}
//删除包含在keys内的queue
size_t shm_socket_remove_keys(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> *mqueue;
@@ -79,6 +79,38 @@
  }
  return count;
}
// 删除不在keys内的queue
size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<shm_packet_t> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  int s, type;
@@ -112,23 +144,31 @@
int shm_close_socket(shm_socket_t *sockt) {
  
  int s;
  int rv;
  logger->debug("shm_close_socket\n");
  if(sockt->queue != NULL) {
    delete sockt->queue;
    sockt->queue = NULL;
  }
  s =  pthread_mutex_destroy(&(sockt->mutex) );
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  rv =  pthread_mutex_destroy(&(sockt->mutex) );
  if(rv != 0) {
    err_exit(rv, "shm_close_socket");
  }
  free(sockt);
  return 0;
}
int shm_socket_stop(shm_socket_t *sockt) {
  struct timespec timeout = {5, 0};
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  sendpak.action = BUS_ACTION_STOP;
  sendpak.size = 0;
  return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG);
}
int shm_socket_bind(shm_socket_t *sockt, int key) {
  sockt->key = key;
@@ -175,6 +215,7 @@
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
  std::string uuid = sole::uuid4().str();
  
  sendpak.key = sockt->key;
@@ -191,7 +232,7 @@
    return rv;
  }
  while(true) {
  while(tryn > 0) {
    tryn--;
    recvbufIter = sockt->recvbuf.find(uuid);
    if(recvbufIter != sockt->recvbuf.end()) {
@@ -199,36 +240,37 @@
logger->debug("get from recvbuf: %s", uuid.c_str());
      recvpak = recvbufIter->second;
      sockt->recvbuf.erase(recvbufIter);
      break;
      goto LABLE_SUC;
    }
    rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags);
    if (rv != 0) {
      if(rv == ETIMEDOUT)
      if(rv == ETIMEDOUT) {
        return EBUS_TIMEOUT;
      }
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      return rv;
    } 
logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
    if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
    if(strlen(recvpak.uuid) == 0) {
      continue;
    } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
      // 发送与接受的UUID匹配成功
      break;
      goto LABLE_SUC;
    } else {
      // 答非所问,放到缓存里
      sockt->recvbuf.insert({recvpak.uuid, recvpak});
      continue;
    }
    if(tryn == 0) {
      // 尝试了tryn次都没有成功
      return EBUS_RECVFROM_WRONG_END;
    }
  }
LABLE_FAIL:
  return EBUS_RECVFROM_WRONG_END;
  // return rv;
 
LABLE_SUC:
 if(recv_buf != NULL) {
@@ -243,6 +285,8 @@
  mm_free(recvpak.buf);
  return 0;
}
@@ -481,11 +525,15 @@
  if( sockt->queue != NULL) 
    goto LABEL_PUSH;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
 
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
    err_exit(rv, "shm_sendto : pthread_mutex_lock");
    if (sockt->queue == NULL) {
      if (sockt->key == 0) {
        sockt->key = hashtable_alloc_key(hashtable);
@@ -504,7 +552,7 @@
  
 
 LABEL_PUSH: 
  if (key == sockt->key) {
  if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) {
    logger->error( "can not send to your self!");
    return EBUS_SENDTO_SELF;
  }
@@ -524,13 +572,18 @@
}
// 短连接方式接受
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag) {
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
  int rv;
  
  hashtable_t *hashtable = mm_get_hashtable();
  shm_packet_t recvpak;
  if( sockt->queue != NULL) 
    goto LABEL_POP;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
@@ -554,11 +607,18 @@
  
LABEL_POP:
 //
  // printf("%p start recv.....\n", sockt);
 
  printf("%p start recv.....\n", sockt);
  rv = sockt->queue->pop(*recvpak, timeout, flag);
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0)
    return rv;
  if(recvpak.action == BUS_ACTION_STOP) {
    return EBUS_STOPED;
  }
  *_recvpak = recvpak;
  return rv;
}
// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,