fujuntang
2021-11-10 c479ef57baaaa28964fc3ec8d80ff99dffa7d49f
src/socket/bus_server_socket.cpp
@@ -2,6 +2,7 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
#include "msg_mgr.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -303,7 +304,7 @@
  LinkNode *pNew = NULL;
  LinkNode *pCur = NULL;
 
  pNew = new(LinkNode);
  pNew = (LinkNode *)malloc(sizeof(LinkNode));
  pNew->data = aData;
  pNew->data_fix = bData;
  pNew->count = 0;
@@ -340,7 +341,7 @@
    
    head = pCur->next;
    
    delete(pCur);
    free(pCur);
    
    pCur = head;
    
@@ -353,7 +354,7 @@
      pCur->next = pNext->next;
      pCur = pNext->next;
      delete(pNext);
      free(pNext);
    } else {
    
      pCur = pNext;
@@ -559,7 +560,10 @@
        procQuePart->erase(buf_temp);
      }
      BusServerSocket::buf_data_remove(key);
      find_mm_data(key);
    }
  } else if (flag == PROC_REG_TCS) {
    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -709,7 +713,7 @@
    sprintf(data_buf, "%d", count);
    shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
  } else {
  } else if (flag == PROC_QUE_ATCS) {
    int val;
    int temp = 0;
@@ -853,6 +857,17 @@
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  } else {
    char *ptr = NULL;
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data1 = atoi(buf_temp);
    ptr = strstr(buf_temp, STR_MAGIC);
    if (ptr != NULL) {
      data2 = atoi(ptr + 1);
    }
    BusServerSocket::buf_data_set(data1, data2);
  }
}
@@ -888,7 +903,7 @@
   int key;
  int flag;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
   char * action, *topic, *topics, *buf, *content;
   char *action, *topic, *topics, *buf, *content;
   size_t head_len;
   bus_head_t head;
    int val;
@@ -935,7 +950,8 @@
      } 
    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
            || (strcmp(action, "bufreg") == 0)) {
      content = topics + head.topic_size;
      if (strcmp(action, "reg") == 0) {
        
@@ -957,15 +973,19 @@
        
        flag = PROC_QUE_STCS; 
      } else {
      } else if (strcmp(action, "atcsque") == 0) {
        
        flag = PROC_QUE_ATCS;
      } else {
        flag = PROC_REG_BUF;
      }
        
      if (flag == PROC_REG) {
        memcpy(buf_temp, content, strlen(content) + 1);
        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
          val = proc_que_iter->second;
@@ -996,6 +1016,7 @@
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr = hashtable_get(hashtable, val);
  if (data_ptr != NULL) {
    if (data_ptr != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
@@ -1011,3 +1032,41 @@
}
void BusServerSocket::buf_data_set(int data, int val) {
  recvbuf_val *val_buf;
  recvbuf_data::iterator data_iter;
  recvbuf_val::iterator val_iter;
  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
    val_buf = data_iter->second;
  } else {
    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
    val_buf = new(set_ptr) recvbuf_val;
    recvBuf_data.insert({data, val_buf});
  }
  val_buf->insert(val);
}
void BusServerSocket::buf_data_remove(int data) {
  int val;
  recvbuf_val *val_buf;
  recvbuf_data::iterator data_iter;
  recvbuf_val::iterator val_iter;
  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
    val_buf = data_iter->second;
    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
      val = *val_iter;
      BusServerSocket::_data_remove(val);
    }
    recvBuf_data.erase(data);
  }
}