fujuntang
2021-12-09 73689afc09ce346f9eb00e02faf7f242e55dc7ee
src/socket/bus_server_socket.cpp
@@ -2,9 +2,11 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
#include "msg_mgr.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
static pthread_mutex_t gMutex;
list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
@@ -83,11 +85,13 @@
int  BusServerSocket::start(){
  int rv;
   topic_sub_map =   shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   rv = _run_proxy_();
  topic_sub_map =   shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   return rv;
  pthread_mutex_init(&gMutex, NULL);
  rv = _run_proxy_();
  return rv;
}
@@ -200,7 +204,7 @@
   int rv;
   struct timespec timeout = {1,0};
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
@@ -303,7 +307,7 @@
  LinkNode *pNew = NULL;
  LinkNode *pCur = NULL;
 
  pNew = new(LinkNode);
  pNew = (LinkNode *)malloc(sizeof(LinkNode));
  pNew->data = aData;
  pNew->data_fix = bData;
  pNew->count = 0;
@@ -340,7 +344,7 @@
    
    head = pCur->next;
    
    delete(pCur);
    free(pCur);
    
    pCur = head;
    
@@ -353,7 +357,7 @@
      pCur->next = pNext->next;
      pCur = pNext->next;
      delete(pNext);
      free(pNext);
    } else {
    
      pCur = pNext;
@@ -462,6 +466,7 @@
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char data_buf[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  int count = 0;
  int i = 0;
@@ -514,9 +519,11 @@
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
    ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
    if (flag == PROC_REG) {
      pthread_mutex_lock(&gMutex);
      if ((proc_iter = proc->find(key)) == proc->end()) {
        proc->insert({key, Data_stru});
      }
      pthread_mutex_unlock(&gMutex);
      if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
        procPart->insert({key, Data_stru.proc_id});
@@ -535,16 +542,20 @@
        SvrSub_ele->erase(key);
      }
      pthread_mutex_lock(&gMutex);
      if ((proc_iter = proc->find(key)) != proc->end()) {
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        BusServerSocket::_data_remove(data1);
        BusServerSocket::_data_remove(data2);
        BusServerSocket::_data_remove(key);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
        proc->erase(key);
      }
      pthread_mutex_unlock(&gMutex);
      if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
@@ -556,7 +567,13 @@
        procQuePart->erase(buf_temp);
      }
      pthread_mutex_lock(&gMutex);
      BusServerSocket::buf_data_remove(key);
      pthread_mutex_unlock(&gMutex);
      find_mm_data(key);
    }
  } else if (flag == PROC_REG_TCS) {
    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -573,6 +590,7 @@
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); 
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      data_ptr = trim(data_ptr, 0);
      TcsSub_ele->insert(data_ptr);
      if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
        SvrSub_ele = svr_tcs_iter->second;
@@ -602,6 +620,7 @@
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      data_ptr = trim(data_ptr, 0);
      ret = Qurey_object(data_ptr, &len);
      if (ret != NULL) {
    
@@ -687,7 +706,7 @@
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
    if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) {
      SvrSub_ele = svr_tcs_iter->second;
    
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
@@ -702,11 +721,11 @@
      count = 0;
    }
    memset(buf_temp, 0x00, sizeof(buf_temp));
    sprintf(buf_temp, "%d", count);
    shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
    memset(data_buf, 0x00, sizeof(data_buf));
    sprintf(data_buf, "%d", count);
    shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
  } else {
  } else if (flag == PROC_QUE_ATCS) {
    int val;
    int temp = 0;
@@ -850,6 +869,17 @@
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  } else {
    char *ptr = NULL;
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data1 = atoi(buf_temp);
    ptr = strstr(buf_temp, STR_MAGIC);
    if (ptr != NULL) {
      data2 = atoi(ptr + 1);
    }
    BusServerSocket::buf_data_set(data1, data2);
  }
}
@@ -858,9 +888,12 @@
  ProcZone::iterator proc_iter;
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  pthread_mutex_lock(&gMutex);
  if ((proc_iter = proc->find(val)) != proc->end()) {
    pthread_mutex_unlock(&gMutex);
    return true;
  }
  pthread_mutex_unlock(&gMutex);
  
  return false;
  
@@ -885,17 +918,17 @@
   int key;
  int flag;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
   char * action, *topic, *topics, *buf, *content;
   char *action, *topic, *topics, *buf, *content;
   size_t head_len;
   bus_head_t head;
    int val;
    ProcDataZone::iterator proc_que_iter;
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
  int rv;
  char send_buf[512] = { 0x00 };
    int rv;
    char send_buf[512] = { 0x00 };
  const char *topic_delim = ",";
    const char *topic_delim = ",";
   while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
      head = ShmModSocket::decode_bus_head(buf);
      topics = buf + BUS_HEAD_SIZE;
@@ -932,7 +965,8 @@
      } 
    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
            || (strcmp(action, "bufreg") == 0)) {
      content = topics + head.topic_size;
      if (strcmp(action, "reg") == 0) {
        
@@ -954,15 +988,19 @@
        
        flag = PROC_QUE_STCS; 
      } else {
      } else if (strcmp(action, "atcsque") == 0) {
        
        flag = PROC_QUE_ATCS;
      } else {
        flag = PROC_REG_BUF;
      }
        
      if (flag == PROC_REG) {
        memcpy(buf_temp, content, strlen(content) + 1);
        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
          val = proc_que_iter->second;
@@ -973,39 +1011,30 @@
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
      else if (strncmp(buf, "request", strlen("request")) == 0) {
      sprintf(send_buf, "%4d", key);
      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
      if(rv != 0) {
        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
      }
    }
    else if(strcmp(action, "stop") == 0) {
         free(buf);
         break;
      } else {
         logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
      }
      free(buf);
   }
      free(buf);
      break;
    } else {
      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
    }
    free(buf);
  }
   return rv;
  return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
void BusServerSocket::_data_remove(int val) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
  void *data_ptr = hashtable_get(hashtable, val);
  if (data_ptr != NULL) {
    if (data_ptr != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
@@ -1013,21 +1042,46 @@
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
    hashtable_remove(hashtable, val);
  }
}
void BusServerSocket::buf_data_set(int data, int val) {
  recvbuf_val *val_buf;
  recvbuf_data::iterator data_iter;
  recvbuf_val::iterator val_iter;
  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
    val_buf = data_iter->second;
  } else {
    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
    val_buf = new(set_ptr) recvbuf_val;
    recvBuf_data.insert({data, val_buf});
  }
  val_buf->insert(val);
}
void BusServerSocket::buf_data_remove(int data) {
  int val;
  recvbuf_val *val_buf;
  recvbuf_data::iterator data_iter;
  recvbuf_val::iterator val_iter;
  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
    val_buf = data_iter->second;
    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
      val = *val_iter;
      BusServerSocket::_data_remove(val);
    }
    recvBuf_data.erase(data);
  }
}