fujuntang
2021-09-17 97428da8d339d3d0c4d00aa328d76a32c7858412
src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
   SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   SHMKeySet *subscripter_set;
@@ -296,12 +297,176 @@
  return dataBuf;
}
void list::Insert(int aData, int bData)
{
  LinkNode *pHead = NULL;
  LinkNode *pNew = NULL;
  LinkNode *pCur = NULL;
  pNew = new(LinkNode);
  pNew->data = aData;
  pNew->data_fix = bData;
  pNew->count = 0;
  pHead = head;
  pCur = pHead;
  if(pHead == NULL) {
    head = pNew;
    pNew->next = NULL;
  } else {
    while(pCur->next != NULL) {
      pCur = pCur->next;
    }
    pCur->next = pNew;
    pNew->next = NULL;
  }
}
void list::Delete(int data)
{
  LinkNode *pHead;
  LinkNode *pCur;
  LinkNode *pNext;
  pHead = head;
  pCur = pHead;
  if(pHead == NULL)
    return;
  while((pCur != NULL) && (pCur->data == data)) {
    head = pCur->next;
    delete(pCur);
    pCur = head;
  }
  while((pCur != NULL) && (pCur->next != NULL)) {
    pNext = pCur->next;
    if(pNext->data == data) {
      pCur->next = pNext->next;
      pCur = pNext->next;
      delete(pNext);
    } else {
      pCur = pNext;
    }
  }
}
void list::dataSet(int data, int val)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return;
  while(pCur != NULL) {
    if(pCur->data == data) {
      pCur->count = val;
    }
    pCur = pCur->next;
  }
}
int list::dataGet(int data)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return 0;
  while(pCur != NULL) {
    if(pCur->data == data) {
      return pCur->count;
    }
    pCur = pCur->next;
  }
  return 0;
}
int list::dataFixGet(int data)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return 0;
  while(pCur != NULL) {
    if(pCur->data == data) {
      return pCur->data_fix;
    }
    pCur = pCur->next;
  }
  return 0;
}
int list::NodeNum(void)
{
  int count = 0;
  LinkNode *pCur = head;
  if (pCur == NULL) {
    return 0;
  }
  while(pCur != NULL) {
    ++count;
    pCur = pCur->next;
  }
  return count;
}
int list::nodeGet(int index)
{
  int count = 0;
  LinkNode *pCur = head;
  if (pCur == NULL) {
    return 0;
  }
  while((pCur != NULL) && (count <= index)) {
    if (count == index) {
      return pCur->data;
    }
    ++count;
    pCur = pCur->next;
  }
  return 0;
}
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  int count = 0;
  int i = 0;
  int len = 0;
  int data1, data2;
  char *data_ptr;
  ProcInfo Data_stru;
  ProcZone::iterator proc_iter;
@@ -333,6 +498,16 @@
      
      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      if (flag == PROC_REG) {
        gLinkedList.Insert(key, atoi(Data_stru.int_info));
      }
    }
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +537,9 @@
      if ((proc_iter = proc->find(key)) != proc->end()) {
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
@@ -504,7 +682,9 @@
    free(last_buf);
  } else if (flag == PROC_QUE_STCS) {
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +692,9 @@
    
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
        count = *svr_proc_iter;
        if ((proc_iter = proc->find(count)) != proc->end()) {
          count = atoi((proc_iter->second).svr_info);
        }
        break;
      }
@@ -666,7 +849,34 @@
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  }
}
int BusServerSocket::get_data(int val) {
  ProcZone::iterator proc_iter;
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  if ((proc_iter = proc->find(val)) != proc->end()) {
    return true;
  }
  return false;
}
int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag) {
  int ret;
  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
  return ret;
}
void BusServerSocket::remove_proc(int val) {
  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
}
// 运行代理
@@ -674,10 +884,13 @@
   int size;
   int key;
  int flag;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
   char * action, *topic, *topics, *buf, *content;
   size_t head_len;
   char resp_buf[128];
   bus_head_t head;
    int val;
    ProcDataZone::iterator proc_que_iter;
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
  int rv;
  char send_buf[512] = { 0x00 };
@@ -747,6 +960,16 @@
      }
        
      if (flag == PROC_REG) {
        memcpy(buf_temp, content, strlen(content) + 1);
        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
          val = proc_que_iter->second;
          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
        }
      }
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
@@ -771,3 +994,40 @@
   return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
  }
}