Fu Juntang
2021-09-01 8dead3c23da957fce542d8483c7bce7d5a7232ed
src/bh_api.cpp
@@ -274,11 +274,11 @@
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv; 
  int min, i;
  int len, i;
  void *buf = NULL;
  int total = 0;
  int count = 0; 
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
@@ -305,6 +305,8 @@
  
   for(int i = 0; i < _input.amount; i++) {
      _input.topics[i] = input.topic_list(i).c_str();
    total += strlen(_input.topics[i]) + 1;
  }
#else 
  if ((topics == NULL) || (topics_len == 0)) {
@@ -316,6 +318,8 @@
    
      goto exit_entry;
  }
  total = topics_len;
#endif 
  if (gRun_stat == 0) {
@@ -330,38 +334,48 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    topics_buf = (char *)malloc(total);
    if (topics_buf == NULL) {
      rv = EBUS_RES_NO;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      logger->error("in BHRegisterTopics: Out of memory!\n");
      pthread_mutex_unlock(&mutex);
      goto exit_entry;
    }
    memset(topics_buf, 0x00, total);
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
      len = strlen(_input.topics[i]);
      strncpy(topics_buf + count, _input.topics[i], len);
      count += len;
        if (total >= strlen(_input.topics[i])) {
          total -= strlen(_input.topics[i]);
        }
        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      if ((_input.amount > 1) && (i < (_input.amount - 1))) {
        strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
        count++;
      }
    }
    logger->debug("the parsed compound register topics: %s!\n", topics_buf);
#else 
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
    memcpy(topics_buf, topics, topics_len);
    count = topics_len;
#endif 
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    free(topics_buf);
    pthread_mutex_unlock(&mutex);
  
  } else {
@@ -379,13 +393,13 @@
   *reply = malloc(*reply_len);
   mcr.SerializePartialToArray(*reply, *reply_len);
#else 
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  len = strlen(errString) + 1;
  buf = malloc(len) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *((char *)buf + len - 1) = '\0';
  
  *reply = buf;
  *reply_len = min;
  *reply_len = len;
#endif 
  
  if (rv == 0)
@@ -464,7 +478,7 @@
    buf = const_cast<void *>(topic);
    strncpy(topics_buf, (const char *)buf, min);
#endif 
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS);
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS);
   
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
@@ -714,9 +728,9 @@
  int sec, nsec;
  int total = 0;
  int count = 0;
  int min, i;
  int len, i;
  void *buf = NULL;
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
@@ -741,8 +755,11 @@
    _input.amount = MAX_STR_LEN;
  }
  
   for(int i = 0; i < _input.amount; i++)
   for(int i = 0; i < _input.amount; i++) {
      _input.topics[i] = input.topic_list(i).c_str();
    total += strlen(_input.topics[i]) + 1;
  }
#else 
  if ((topics == NULL) || (topics_len == 0)) {
@@ -766,51 +783,61 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
        if (total >= strlen(_input.topics[i])) {
          total -= strlen(_input.topics[i]);
        }
    topics_buf = (char *)malloc(total);
    if (topics_buf == NULL) {
      rv = EBUS_RES_NO;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      logger->error("in BHSubscribeTopics: Out of memory!\n");
      pthread_mutex_unlock(&mutex);
        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      }
      goto exit_entry;
    }
    logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
    memset(topics_buf, 0x00, total);
#if defined(PRO_DE_SERIALIZE)
    for (i = 0; i < _input.amount; i++) {
      len = strlen(_input.topics[i]);
      strncpy(topics_buf + count, _input.topics[i], len);
      count += len;
      if ((_input.amount > 1) && (i < (_input.amount - 1))) {
        strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
        count++;
      }
    }
#else 
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
    memcpy(topics_buf, topics, topics_len);
    count = topics_len;
#endif 
    if (timeout_ms > 0) {
    
      sec = timeout_ms / 1000;
      nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
      rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec);
      rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, count, sec, nsec);
    } else if (timeout_ms == 0) {
    
      rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
      rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, count);
    
    } else {
    
      rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
      rv = net_mod_socket_sub(gNetmod_socket, topics_buf, count);
    
    }
   
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
   
    free(topics_buf);
    pthread_mutex_unlock(&mutex);
  } else {
@@ -829,13 +856,13 @@
   *reply=malloc(*reply_len);
   mcr.SerializePartialToArray(*reply,*reply_len);
#else 
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  len = strlen(errString) + 1;
  buf = malloc(len) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *((char *)buf + len - 1) = '\0';
  
  *reply = buf;
  *reply_len = min;
  *reply_len = len;
#endif 
  
  if (rv == 0)
@@ -915,7 +942,7 @@
  int node_arr_len = 0;
#if defined(PRO_DE_SERIALIZE)
  struct _MsgPublish
  struct MsgPublish
   {
      const char *topic;
      const char *data;
@@ -1094,8 +1121,6 @@
    memcpy(*proc_id, topics_buf, *proc_id_len);
    
#endif 
    pthread_mutex_unlock(&mutex);
  } else {