wangzhengquan
2021-02-05 14c345b38d57fd814f217eb8465963a08ca79f7e
src/socket/shm_socket.cpp
@@ -65,7 +65,7 @@
  return queue;
}
//删除包含在keys内的queue
size_t shm_socket_remove_keys(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> *mqueue;
@@ -79,6 +79,38 @@
  }
  return count;
}
// 删除不在keys内的queue
size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<shm_packet_t> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  int s, type;
@@ -112,23 +144,31 @@
int shm_close_socket(shm_socket_t *sockt) {
  
  int s;
  int rv;
  logger->debug("shm_close_socket\n");
  if(sockt->queue != NULL) {
    delete sockt->queue;
    sockt->queue = NULL;
  }
  s =  pthread_mutex_destroy(&(sockt->mutex) );
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  rv =  pthread_mutex_destroy(&(sockt->mutex) );
  if(rv != 0) {
    err_exit(rv, "shm_close_socket");
  }
  free(sockt);
  return 0;
}
int shm_socket_stop(shm_socket_t *sockt) {
  struct timespec timeout = {5, 0};
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  sendpak.action = BUS_ACTION_STOP;
  sendpak.size = 0;
  return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG);
}
int shm_socket_bind(shm_socket_t *sockt, int key) {
  sockt->key = key;
@@ -175,6 +215,7 @@
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<std::string, shm_packet_t>::iterator recvbufIter;
  std::string uuid = sole::uuid4().str();
  
  sendpak.key = sockt->key;
@@ -484,11 +525,15 @@
  if( sockt->queue != NULL) 
    goto LABEL_PUSH;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
 
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
    err_exit(rv, "shm_sendto : pthread_mutex_lock");
    if (sockt->queue == NULL) {
      if (sockt->key == 0) {
        sockt->key = hashtable_alloc_key(hashtable);
@@ -507,7 +552,7 @@
  
 
 LABEL_PUSH: 
  if (key == sockt->key) {
  if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) {
    logger->error( "can not send to your self!");
    return EBUS_SENDTO_SELF;
  }
@@ -527,13 +572,18 @@
}
// 短连接方式接受
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag) {
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
  int rv;
  
  hashtable_t *hashtable = mm_get_hashtable();
  shm_packet_t recvpak;
  if( sockt->queue != NULL) 
    goto LABEL_POP;
  if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
    return EBUS_EXCEED_LIMIT;
  }
  {
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
@@ -557,11 +607,18 @@
  
LABEL_POP:
 //
  // printf("%p start recv.....\n", sockt);
 
  printf("%p start recv.....\n", sockt);
  rv = sockt->queue->pop(*recvpak, timeout, flag);
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0)
    return rv;
  if(recvpak.action == BUS_ACTION_STOP) {
    return EBUS_STOPED;
  }
  *_recvpak = recvpak;
  return rv;
}
// int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,