fujuntang
2021-09-23 82b028cf63953d8080b63d85468eae488d212194
Fix the data parsing when in multiple threads.
12个文件已修改
118 ■■■■ 已修改文件
src/bh_api.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_proxy_start.cpp 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -530,9 +530,10 @@
    if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
      mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
    }
    Proc_ptr = &(ptr->procData);
    for(int i = 0; i < mtr_list_num; i++) {
      mtr_list[i].proc_id = ptr->procData.proc_id;
      mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
      mtr_list[i].mq_id = ID_RSV;
      mtr_list[i].abs_addr = ABS_ID_RSV;
      mtr_list[i].ip = "127.0.0.1";
@@ -1162,6 +1163,7 @@
  int sec, nsec;
  std::string MsgID;
  int timeout_ms = 3000;
  char data_buf[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
  
@@ -1225,7 +1227,9 @@
  rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
  if (rv == 0) {
    val = atoi((char *)buf);
    len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
    memcpy(data_buf, (char *)buf, len);
    val = atoi((char *)data_buf);
    free(buf);
@@ -1316,6 +1320,7 @@
  net_node_t node;
  int node_size;  
  int recv_arr_size;
  char data_buf[MAX_STR_LEN] = { 0x00 };
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
@@ -1389,7 +1394,9 @@
  rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
  if (rv == 0) {
    
    val = atoi((char *)buf);
    len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
    memcpy(data_buf, (char *)buf, len);
    val = atoi((char *)data_buf);
    free(buf);
@@ -1401,7 +1408,6 @@
      len += strlen(_input1.data);
#endif
      data = net_mod_socket_svr_get(gNetmod_socket);
      topics_buf = (char *)malloc(len);
      if (topics_buf == NULL) {
        
src/bus_proxy_start.cpp
@@ -45,9 +45,6 @@
  return NULL;
}
void *svr_start(void *skptr) {
  int port = *(int *)skptr;
src/net/net_mod_socket.cpp
@@ -46,10 +46,6 @@
  return shmModSocket.force_bind(key);
}
int NetModSocket::bind_proc_id(char *buf, int len) {
  return shmModSocket.bind_proc_id(buf, len);
}
int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
  
  return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
src/net/net_mod_socket.h
@@ -120,7 +120,6 @@
  */
  int force_bind( int key);
  int bind_proc_id(char *buf, int len);
  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
  
  /**
src/net/net_mod_socket_wrapper.cpp
@@ -103,11 +103,6 @@
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
}
int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->bind_proc_id(proc_id, len);
}
void net_mod_socket_int_set(void * _socket, int data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  sockt->int_set(data);
src/shm/hashtable.cpp
@@ -178,7 +178,6 @@
    goto suc;
  }
  val = _hashtable_get(hashtable, key);
  // val = 1是allockey的情况
  if(val != NULL && val != (void *)1) 
    goto fail;
src/socket/bus_server_socket.cpp
@@ -539,7 +539,9 @@
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        BusServerSocket::_data_remove(data1);
        BusServerSocket::_data_remove(data2);
        BusServerSocket::_data_remove(key);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
@@ -892,10 +894,10 @@
    ProcDataZone::iterator proc_que_iter;
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
  int rv;
  char send_buf[512] = { 0x00 };
    int rv;
    char send_buf[512] = { 0x00 };
  const char *topic_delim = ",";
    const char *topic_delim = ",";
    while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
        head = ShmModSocket::decode_bus_head(buf);
        topics = buf + BUS_HEAD_SIZE;
@@ -973,39 +975,29 @@
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
        else if (strncmp(buf, "request", strlen("request")) == 0) {
      sprintf(send_buf, "%4d", key);
      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
      if(rv != 0) {
        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
      }
    }
    else if(strcmp(action, "stop") == 0) {
            free(buf);
            break;
        } else {
            logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
        }
        free(buf);
    }
      free(buf);
      break;
    } else {
      logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
    }
    free(buf);
  }
    return rv;
  return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
void BusServerSocket::_data_remove(int val) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
  void *data_ptr = hashtable_get(hashtable, val);
  if (data_ptr != NULL) {
    if (data_ptr != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
@@ -1013,20 +1005,7 @@
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
    hashtable_remove(hashtable, val);
  }
}
src/socket/bus_server_socket.h
@@ -121,7 +121,7 @@
     */
    int get_key() ;
  void _data_remove(int val1, int val2);
  void _data_remove(int val);
};
src/socket/shm_mod_socket.cpp
@@ -38,10 +38,6 @@
    return shm_socket_force_bind(shm_socket, key);
}
int ShmModSocket::bind_proc_id(char *buf, int len) {
  return shm_socket_bind_proc_id(shm_socket, buf, len);
}
int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
{
  int ret;
src/socket/shm_mod_socket.h
@@ -62,7 +62,6 @@
    */
    int force_bind(int key);
  int bind_proc_id(char *buf, int len);
  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
    
  int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
src/socket/shm_socket.cpp
@@ -166,20 +166,8 @@
  return 0;
}
int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
  strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
  return 0;
}
int shm_socket_get_key(shm_socket_t *sockt){
  return sockt->key;
}
int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
  strncpy(buf, sockt->proc_id, len);
  return 0;
}
// 短连接方式发送
@@ -462,9 +450,8 @@
    tryn--;
    recvbufIter = tmp_socket->recvbuf2.find(key);
    if(recvbufIter != tmp_socket->recvbuf2.end()) {
      // 在缓存里查到了key匹配成功的
      recvpak = recvbufIter->second;
      tmp_socket->recvbuf2.erase(recvbufIter);
      tmp_socket->recvbuf2.erase(key);
      goto LABLE_SUC;
    }
@@ -481,7 +468,6 @@
    } else {
      // 答非所问,放到缓存里
      tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
      exit(0);
      continue;
    }
  }
src/socket/shm_socket.h
@@ -23,7 +23,7 @@
    size_t size;
    void * buf;
    char uuid[64];
    char uuid[1];
    int action;
} shm_packet_t;
@@ -34,7 +34,6 @@
typedef struct shm_socket_t {
    shm_socket_type_t socket_type;
    int key;
  char proc_id[MAX_STR_LEN];
    bool force_bind;
    pthread_mutex_t mutex;
@@ -62,7 +61,6 @@
int shm_socket_force_bind(shm_socket_t * socket, int key) ;
int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len);
/**
 * @flags : BUS_NOWAIT_FLAG
 */