wangzhengquan
2021-02-08 e1bea2c6430ddadd589fd3377b69ca06226bb872
src/socket/shm_socket.cpp
@@ -75,57 +75,57 @@
}
//删除包含在keys内的queue
size_t shm_socket_remove_keys(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  LockFreeQueue<shm_packet_t> *mqueue;
  size_t count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
  }
  return count;
}
// size_t shm_socket_remove_keys(int keys[], size_t length) {
//   hashtable_t *hashtable = mm_get_hashtable();
//   LockFreeQueue<shm_packet_t> *mqueue;
//   size_t count = 0;
//   for(int i = 0; i< length; i++) {
//     // 销毁共享内存的queue
//     mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]);
//     delete mqueue;
//     hashtable_remove(hashtable, keys[i]);
//     count++;
//   }
//   return count;
// }
 
// 删除不在keys内的queue
size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  LockFreeQueue<shm_packet_t> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
    found = false;
    for (size_t i = 0; i < length; i++) {
      if (*keyItr == keys[i]) {
        found = true;
        break;
      }
    }
    // 100内的是bus内部自己用的
    if (!found && *keyItr > 100) {
      // 销毁共享内存的queue
      mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
    }
  }
  delete keyset;
  return count;
}
// // 删除不在keys内的queue
// size_t shm_socket_remove_keys_exclude(int keys[], size_t length) {
//   hashtable_t *hashtable = mm_get_hashtable();
//   std::set<int> *keyset = hashtable_keyset(hashtable);
//   std::set<int>::iterator keyItr;
//   LockFreeQueue<shm_packet_t> *mqueue;
//   bool found;
//   size_t count = 0;
//   for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
//     found = false;
//     for (size_t i = 0; i < length; i++) {
//       if (*keyItr == keys[i]) {
//         found = true;
//         break;
//       }
//     }
//     // 100内的是bus内部自己用的
//     if (!found && *keyItr > 100) {
//       // 销毁共享内存的queue
//       mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr);
//       delete mqueue;
//       hashtable_remove(hashtable, *keyItr);
//       count++;
//     }
//   }
//   delete keyset;
//   return count;
// }
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
shm_socket_t *shm_socket_open(shm_socket_type_t socket_type) {
  int s, type;
  pthread_mutexattr_t mtxAttr;
  logger->debug("shm_open_socket\n");
  logger->debug("shm_socket_open\n");
  // shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  shm_socket_t *sockt = new shm_socket_t;
  sockt->socket_type = socket_type;
@@ -151,10 +151,14 @@
  return sockt;
}
int shm_close_socket(shm_socket_t *sockt) {
int shm_socket_close(shm_socket_t *sockt) {
}
int _shm_socket_close_(shm_socket_t *sockt) {
  
  int rv;
  logger->debug("shm_close_socket\n");
  logger->debug("shm_socket_close\n");
  if(sockt->queue != NULL) {
    delete sockt->queue;
    sockt->queue = NULL;
@@ -162,7 +166,7 @@
  rv =  pthread_mutex_destroy(&(sockt->mutex) );
  if(rv != 0) {
    err_exit(rv, "shm_close_socket");
    err_exit(rv, "shm_socket_close");
  }
  free(sockt);
@@ -320,7 +324,7 @@
    return;
  logger->debug("%d destroy tmp socket\n", pthread_self()); 
  shm_close_socket((shm_socket_t *)tmp_socket);
  shm_socket_close((shm_socket_t *)tmp_socket);
  rv =  pthread_setspecific(_perthread_socket_key_, NULL);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -414,12 +418,13 @@
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
    *recv_buf = _buf; 
    mm_free(recvpak.buf);
  }
 
  if(recv_size != NULL)
    *recv_size = recvpak.size;
  mm_free(recvpak.buf);
  return 0;
@@ -453,7 +458,7 @@
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("%ld create tmp socket\n", (long)pthread_self() );
    tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    rv =  pthread_setspecific(_perthread_socket_key_, tmp_socket);
    if ( rv != 0) {
@@ -461,36 +466,6 @@
      exit(1);
    }
  }
   // int rv;
  // int tryn = 0;
  // int recv_key;
  // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags);
  // if (() == 0) {
  //   while(tryn < 3) {
  //     tryn++;
  //     rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
  //     if(rv != 0) {
  //       logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
  //       return rv;
  //     }
  //      // 超时导致接发送对象,与返回对象不对应的情况
  //     if(send_key != recv_key) {
  //       logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
  //       // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
  //       // exit(1);
  //       continue;
  //       // return EBUS_RECVFROM_WRONG_END;
  //     }
  //     return 0;
  //   }
  //   return EBUS_RECVFROM_WRONG_END;
  // }
 
  sendpak.key = tmp_socket->key;
@@ -550,12 +525,13 @@
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
    *recv_buf = _buf; 
    mm_free(recvpak.buf);
  }
 
  if(recv_size != NULL)
    *recv_size = recvpak.size;
  mm_free(recvpak.buf);
  return 0;
}
@@ -569,7 +545,7 @@
  shm_socket_t *tmp_socket;
 
 
  tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
  tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
@@ -595,7 +571,7 @@
    return EBUS_RECVFROM_WRONG_END;
  } 
   
  shm_close_socket(tmp_socket);
  shm_socket_close(tmp_socket);
  return rv;
 
}