Fu Juntang
2021-09-01 8f30fea9909a6e519e802876ed8bc9c10c571863
Add the data dynamic parse feature
4个文件已修改
131 ■■■■■ 已修改文件
src/bh_api.cpp 123 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.cpp 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -274,11 +274,11 @@
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv; 
  int min, i;
  int len, i;
  void *buf = NULL;
  int total = 0;
  int count = 0; 
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
@@ -305,6 +305,8 @@
  
    for(int i = 0; i < _input.amount; i++) {
        _input.topics[i] = input.topic_list(i).c_str();
    total += strlen(_input.topics[i]) + 1;
  }
#else 
  if ((topics == NULL) || (topics_len == 0)) {
@@ -316,6 +318,8 @@
    
        goto exit_entry;
  }
  total = topics_len;
#endif 
  if (gRun_stat == 0) {
@@ -330,29 +334,37 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    topics_buf = (char *)malloc(total);
    if (topics_buf == NULL) {
      rv = EBUS_RES_NO;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      logger->error("in BHRegisterTopics: Out of memory!\n");
      goto exit_entry;
    }
    memset(topics_buf, 0x00, total);
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
      len = strlen(_input.topics[i]);
      strncpy(topics_buf + count, _input.topics[i], len);
      count += len;
        total -= min;
        if ((total > 1) && (_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      if ((_input.amount > 1) && (i < (_input.amount - 1))) {
        strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
        count++;
      }
    }
    //logger->debug("the parsed compound register topics: %s!\n", topics_buf);
#else 
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
    memcpy(topics_buf, topics, topics_len);
#endif 
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
@@ -360,6 +372,7 @@
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    
    free(topics_buf);
    pthread_mutex_unlock(&mutex);
  
  } else {
@@ -377,13 +390,13 @@
    *reply = malloc(*reply_len);
    mcr.SerializePartialToArray(*reply, *reply_len);
#else 
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  len = strlen(errString) + 1;
  buf = malloc(len) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *((char *)buf + len - 1) = '\0';
  
  *reply = buf;
  *reply_len = min;
  *reply_len = len;
#endif 
  
  if (rv == 0)
@@ -712,9 +725,9 @@
  int sec, nsec;
  int total = 0;
  int count = 0;
  int min, i;
  int len, i;
  void *buf = NULL;
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
@@ -739,8 +752,11 @@
    _input.amount = MAX_STR_LEN;
  }
  
    for(int i = 0; i < _input.amount; i++)
    for(int i = 0; i < _input.amount; i++) {
        _input.topics[i] = input.topic_list(i).c_str();
    total += strlen(_input.topics[i]) + 1;
  }
#else 
  if ((topics == NULL) || (topics_len == 0)) {
@@ -764,28 +780,36 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
        total -= min;
        if ((total > 1) && (_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      }
    topics_buf = (char *)malloc(total);
    if (topics_buf == NULL) {
      rv = EBUS_RES_NO;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      logger->error("in BHSubscribeTopics: Out of memory!\n");
      goto exit_entry;
    }
    //logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
    memset(topics_buf, 0x00, total);
#if defined(PRO_DE_SERIALIZE)
    for (i = 0; i < _input.amount; i++) {
      len = strlen(_input.topics[i]);
      strncpy(topics_buf + count, _input.topics[i], min);
      count += len;
      if ((_input.amount > 1) && (i < (_input.amount - 1))) {
        strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
        count++;
      }
    }
#else 
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
    memcpy(topics_buf, topics, topics_len);
#endif 
    if (timeout_ms > 0) {
@@ -807,6 +831,7 @@
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
   
    free(topics_buf);
    pthread_mutex_unlock(&mutex);
  } else {
@@ -825,13 +850,13 @@
    *reply=malloc(*reply_len);
    mcr.SerializePartialToArray(*reply,*reply_len);
#else 
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  len = strlen(errString) + 1;
  buf = malloc(len) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *((char *)buf + len - 1) = '\0';
  
  *reply = buf;
  *reply_len = min;
  *reply_len = len;
#endif 
  
  if (rv == 0)
src/net/net_mod_server_socket.cpp
@@ -64,7 +64,6 @@
    /* Wait for listening/connected descriptor(s) to become ready */
    pool.ready_set = pool.read_set;
    pool.nready = select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
 // LoggerFactory::getLogger()->debug("select return \n");
    /* If listening descriptor ready, add new client to pool */
    if (FD_ISSET(listenfd, &pool.ready_set))
    {
@@ -142,8 +141,6 @@
  request_head = NetModSocket::decode_request_head(request_head_bs);
  
// printf("server received request from host = %s:%d, key = %d, timeout=%d,\n",
//   request_head.host, request_head.port , request_head.key, request_head.timeout);
  if(request_head.content_length > max_buf) {
   
@@ -229,7 +226,7 @@
    if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
      return -1;
    }
// LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
    memcpy(response_head.host, request_head.host, NI_MAXHOST);
    response_head.port = request_head.port;
    // response_head.key = request_head.key;
src/queue/lock_free_queue.h
@@ -212,7 +212,6 @@
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
  // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
  if (sem_destroy(&slots) == -1) {
    err_exit(errno, "LockFreeQueue sem_destroy");
  }
src/socket/bus_server_socket.cpp
@@ -298,7 +298,7 @@
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  int count = 0;
  int i = 0;
  int len = 0;