Fu Juntang
2021-09-13 0cc00936da93b7003b882c58af01c1345f9cac38
src/socket/bus_server_socket.cpp
@@ -298,10 +298,11 @@
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  int count = 0;
  int i = 0;
  int len = 0;
  int data1, data2;
  char *data_ptr;
  ProcInfo Data_stru;
  ProcZone::iterator proc_iter;
@@ -333,6 +334,13 @@
      
      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
    }
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +370,9 @@
      if ((proc_iter = proc->find(key)) != proc->end()) {
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
@@ -504,7 +515,9 @@
    free(last_buf);
  } else if (flag == PROC_QUE_STCS) {
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +525,9 @@
    
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
        count = *svr_proc_iter;
        if ((proc_iter = proc->find(count)) != proc->end()) {
          count = atoi((proc_iter->second).svr_info);
        }
        break;
      }
@@ -676,7 +692,6 @@
  int flag;
   char * action, *topic, *topics, *buf, *content;
   size_t head_len;
   char resp_buf[128];
   bus_head_t head;
  int rv;
@@ -771,3 +786,40 @@
   return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
  }
}