Fu Juntang
2021-09-01 8dead3c23da957fce542d8483c7bce7d5a7232ed
src/bh_api.cpp
@@ -7,11 +7,8 @@
#include "bh_api.h"
#include <pthread.h>
#include <getopt.h>
#include "bhome_msg_api.pb.h"
#include "bhome_msg.pb.h"
#include "error_msg.pb.h"
#include "proto/bhome_msg.pb.h"
#include "proto/bhome_msg_api.pb.h"
#include "../proto/source/bhome_msg.pb.h"
#include "../proto/source/bhome_msg_api.pb.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -277,11 +274,11 @@
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv; 
  int min, i;
  int len, i;
  void *buf = NULL;
  int total = 0;
  int count = 0; 
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
@@ -308,6 +305,8 @@
  
   for(int i = 0; i < _input.amount; i++) {
      _input.topics[i] = input.topic_list(i).c_str();
    total += strlen(_input.topics[i]) + 1;
  }
#else 
  if ((topics == NULL) || (topics_len == 0)) {
@@ -319,6 +318,8 @@
    
      goto exit_entry;
  }
  total = topics_len;
#endif 
  if (gRun_stat == 0) {
@@ -333,38 +334,48 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    topics_buf = (char *)malloc(total);
    if (topics_buf == NULL) {
      rv = EBUS_RES_NO;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      logger->error("in BHRegisterTopics: Out of memory!\n");
      pthread_mutex_unlock(&mutex);
      goto exit_entry;
    }
    memset(topics_buf, 0x00, total);
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
      len = strlen(_input.topics[i]);
      strncpy(topics_buf + count, _input.topics[i], len);
      count += len;
        if (total >= strlen(_input.topics[i])) {
          total -= strlen(_input.topics[i]);
        }
        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      if ((_input.amount > 1) && (i < (_input.amount - 1))) {
        strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
        count++;
      }
    }
    logger->debug("the parsed compound register topics: %s!\n", topics_buf);
#else 
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
    memcpy(topics_buf, topics, topics_len);
    count = topics_len;
#endif 
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    free(topics_buf);
    pthread_mutex_unlock(&mutex);
  
  } else {
@@ -382,13 +393,13 @@
   *reply = malloc(*reply_len);
   mcr.SerializePartialToArray(*reply, *reply_len);
#else 
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  len = strlen(errString) + 1;
  buf = malloc(len) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *((char *)buf + len - 1) = '\0';
  
  *reply = buf;
  *reply_len = min;
  *reply_len = len;
#endif 
  
  if (rv == 0)
@@ -467,7 +478,7 @@
    buf = const_cast<void *>(topic);
    strncpy(topics_buf, (const char *)buf, min);
#endif 
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS);
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS);
   
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
@@ -717,9 +728,9 @@
  int sec, nsec;
  int total = 0;
  int count = 0;
  int min, i;
  int len, i;
  void *buf = NULL;
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
@@ -744,8 +755,11 @@
    _input.amount = MAX_STR_LEN;
  }
  
   for(int i = 0; i < _input.amount; i++)
   for(int i = 0; i < _input.amount; i++) {
      _input.topics[i] = input.topic_list(i).c_str();
    total += strlen(_input.topics[i]) + 1;
  }
#else 
  if ((topics == NULL) || (topics_len == 0)) {
@@ -769,51 +783,61 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
        if (total >= strlen(_input.topics[i])) {
          total -= strlen(_input.topics[i]);
        }
    topics_buf = (char *)malloc(total);
    if (topics_buf == NULL) {
      rv = EBUS_RES_NO;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      logger->error("in BHSubscribeTopics: Out of memory!\n");
      pthread_mutex_unlock(&mutex);
        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      }
      goto exit_entry;
    }
    logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
    memset(topics_buf, 0x00, total);
#if defined(PRO_DE_SERIALIZE)
    for (i = 0; i < _input.amount; i++) {
      len = strlen(_input.topics[i]);
      strncpy(topics_buf + count, _input.topics[i], len);
      count += len;
      if ((_input.amount > 1) && (i < (_input.amount - 1))) {
        strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
        count++;
      }
    }
#else 
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
    memcpy(topics_buf, topics, topics_len);
    count = topics_len;
#endif 
    if (timeout_ms > 0) {
    
      sec = timeout_ms / 1000;
      nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
      rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec);
      rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, count, sec, nsec);
    } else if (timeout_ms == 0) {
    
      rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
      rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, count);
    
    } else {
    
      rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
      rv = net_mod_socket_sub(gNetmod_socket, topics_buf, count);
    
    }
   
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
   
    free(topics_buf);
    pthread_mutex_unlock(&mutex);
  } else {
@@ -832,13 +856,13 @@
   *reply=malloc(*reply_len);
   mcr.SerializePartialToArray(*reply,*reply_len);
#else 
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  len = strlen(errString) + 1;
  buf = malloc(len) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *((char *)buf + len - 1) = '\0';
  
  *reply = buf;
  *reply_len = min;
  *reply_len = len;
#endif 
  
  if (rv == 0)
@@ -906,7 +930,7 @@
}
#if defined(PRO_DE_SERIALIZE)
int BHPublish(const char *msgpub, const char msgpub_len, const int timeout_ms)
int BHPublish(const char *msgpub, const int msgpub_len, const int timeout_ms)
#else
int BHPublish(const char *topic, const char *content, const int timeout_ms)
#endif
@@ -918,7 +942,7 @@
  int node_arr_len = 0;
#if defined(PRO_DE_SERIALIZE)
  struct _MsgPublish
  struct MsgPublish
   {
      const char *topic;
      const char *data;
@@ -1098,8 +1122,6 @@
    
#endif 
    pthread_mutex_unlock(&mutex);
  } else {
    memset(errString, 0x00, sizeof(errString));
@@ -1275,6 +1297,12 @@
  int sec, nsec;
  char topics_buf[MAX_STR_LEN] = { 0x00 };
  
  struct _RequestReply
  {
    std::string proc_id;
    std::string data;
  }rr;
#if defined(PRO_DE_SERIALIZE)
  struct _BHAddress
   {
@@ -1289,7 +1317,7 @@
      const char *topic;
      const char *data;
   }_input1;
  ::bhome_msg::BHAddress input0;
   ::bhome_msg::MsgRequestTopic input1;
   if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
@@ -1389,12 +1417,6 @@
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    struct _RequestReply
    {
      std::string proc_id;
      std::string data;
    }rr;
    if (rv == 0) {
      memset(topics_buf, 0x00, sizeof(topics_buf));