fujuntang
2021-10-23 bae3a4fd9406635608edf0c0d16c52cf7ca06a66
src/socket/bus_server_socket.cpp
@@ -462,6 +462,7 @@
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char data_buf[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  int count = 0;
  int i = 0;
@@ -539,7 +540,9 @@
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        BusServerSocket::_data_remove(data1);
        BusServerSocket::_data_remove(data2);
        BusServerSocket::_data_remove(key);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
@@ -702,9 +705,9 @@
      count = 0;
    }
    memset(buf_temp, 0x00, sizeof(buf_temp));
    sprintf(buf_temp, "%d", count);
    shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
    memset(data_buf, 0x00, sizeof(data_buf));
    sprintf(data_buf, "%d", count);
    shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
  } else {
@@ -849,6 +852,7 @@
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  }
}
@@ -891,10 +895,10 @@
    ProcDataZone::iterator proc_que_iter;
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
  int rv;
  char send_buf[512] = { 0x00 };
    int rv;
    char send_buf[512] = { 0x00 };
  const char *topic_delim = ",";
    const char *topic_delim = ",";
   while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
      head = ShmModSocket::decode_bus_head(buf);
      topics = buf + BUS_HEAD_SIZE;
@@ -972,39 +976,29 @@
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
      else if (strncmp(buf, "request", strlen("request")) == 0) {
      sprintf(send_buf, "%4d", key);
      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
      if(rv != 0) {
        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
      }
    }
    else if(strcmp(action, "stop") == 0) {
         free(buf);
         break;
      } else {
         logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
      }
      free(buf);
   }
      free(buf);
      break;
    } else {
      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
    }
    free(buf);
  }
   return rv;
  return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
void BusServerSocket::_data_remove(int val) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
  void *data_ptr = hashtable_get(hashtable, val);
  if (data_ptr != NULL) {
    if (data_ptr != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
@@ -1012,20 +1006,7 @@
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
    hashtable_remove(hashtable, val);
  }
}