fujuntang
2021-12-07 22cd4140502e67d32967160bee56375eaa285011
src/bh_api.cpp
@@ -3,6 +3,7 @@
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wrapper.h"
#include "proc_def.h"
#include "mm.h"
#include "usg_common.h"
#include "bh_api.h"
#include <pthread.h>
@@ -17,11 +18,13 @@
static Logger *logger = LoggerFactory::getLogger();
static int gRun_stat = 0;
static int gRun_flag = true;
static void *gNetmod_socket = NULL;
static pthread_mutex_t mutex;
static pthread_t gTids;
static void *client_run_check(void *skptr) { 
  pthread_detach(pthread_self());
@@ -36,19 +39,24 @@
  sec = TIME_WAIT;
  nsec = 0;
  sprintf(buf, "%s", "Success");
  sprintf(buf, "%s", STR_EXEC);
  data = net_mod_socket_int_get(gNetmod_socket);
  while(true) {
  while(gRun_flag == true) {
    
    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf_temp, &size, &key, SVR_STR, data);
    if (rv == 0) {
      
      BHFree(buf_temp, size);
      if (strncmp((char *)buf_temp, STR_RET, strlen(STR_RET)) != 0) {
      rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
      if (rv != 0) {
        logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
        rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
        if (rv != 0) {
          logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
        }
      } else {
        gRun_flag = false;
      }
      BHFree(buf_temp, size);
      
    } else {
      
@@ -123,25 +131,25 @@
    shm_mm_wrapper_init(SHM_RES_SIZE);
    
#if defined(PRO_DE_SERIALIZE)
    if (_input.proc_id != NULL) {
    if (strlen(_input.proc_id) > 0) {
      count = strlen(_input.proc_id) + 1;
      min = count > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : count;
      strncpy(pData.proc_id, _input.proc_id, min);
    }
    if (_input.name != NULL) {
    if (strlen(_input.name) > 0) {
      count = strlen(_input.name) + 1;
      min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN -1) : count;
      strncpy(pData.name, _input.name, min); 
    }
    if (_input.public_info != NULL) {
    if (strlen(_input.public_info) > 0) {
      count = strlen(_input.public_info) + 1;
      min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1) : count;
      strncpy(pData.public_info, _input.public_info, min);
    }
 
    if (_input.private_info != NULL) {
    if (strlen(_input.private_info) > 0) {
      count = strlen(_input.private_info) + 1;
      min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1): count;
      strncpy(pData.private_info, _input.private_info, min);
@@ -172,11 +180,12 @@
    }
#endif 
    if (pData.proc_id == NULL) {
    if (strlen(pData.proc_id) == 0) {
      rv = EBUS_INVALID_PARA;
      bus_errorset(rv);
      gRun_stat = 0;
      pthread_mutex_unlock(&mutex);
      
      goto exit_entry;
@@ -185,13 +194,13 @@
    gNetmod_socket = net_mod_socket_open();
    hashtable_t *hashtable = mm_get_hashtable();
    key = hashtable_alloc_key(hashtable);
    net_mod_socket_bind(gNetmod_socket, key);
    count = hashtable_alloc_key(hashtable);
    rv = hashtable_alloc_key(hashtable);
    net_mod_socket_int_set(gNetmod_socket, count);
    net_mod_socket_svr_set(gNetmod_socket, rv);
    sprintf(pData.int_info, "%d", count);
    sprintf(pData.svr_info, "%d", rv);
    net_mod_socket_bind(gNetmod_socket, key);
  
    rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
@@ -228,6 +237,7 @@
#endif 
  
  if (rv == 0) {
    gRun_flag = true;
    pthread_create(&gTids, NULL, client_run_check, NULL);
    
    return true;
@@ -240,8 +250,11 @@
{
  int rv;
  int min;
  int data;
  int diff;
  void *buf = NULL;
  char *errString = NULL;
  struct timeval start, end;
#if defined(PRO_DE_SERIALIZE)
  struct _ProcInfo_proto
@@ -289,7 +302,23 @@
  if (rv == 0) {
    rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG);
    if (rv == 0) {
      gettimeofday(&start, NULL);
      data = net_mod_socket_int_get(gNetmod_socket);
      rv = net_mod_socket_sendto_timeout(gNetmod_socket, STR_RET, strlen(STR_RET), data, 3, 0);
      if (rv != 0) {
        logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
      }
      while(gRun_flag == true) {
        sleep(1);
        gettimeofday(&end, NULL);
        diff = end.tv_sec - start.tv_sec;
        if (diff >= TIME_DUR)
          break;
      };
      net_mod_socket_close(gNetmod_socket);
      
      gNetmod_socket = NULL;
@@ -539,21 +568,28 @@
  if (rv == 0) {
    
    ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
    mtr_list_num = ptr->num;
    min = *(int *)buf;
    if (min > 0) {
      ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
      mtr_list_num = ptr->num;
      if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
        mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
      }
    
    if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
      mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
      Proc_ptr = &(ptr->procData);
      for(int i = 0; i < mtr_list_num; i++) {
        mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
        mtr_list[i].mq_id = ID_RSV;
        mtr_list[i].abs_addr = ABS_ID_RSV;
        mtr_list[i].ip = "127.0.0.1";
        mtr_list[i].port = 5000;
      }
    } else {
      mtr_list_num = 0;
    }
    Proc_ptr = &(ptr->procData);
    for(int i = 0; i < mtr_list_num; i++) {
      mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
      mtr_list[i].mq_id = ID_RSV;
      mtr_list[i].abs_addr = ABS_ID_RSV;
      mtr_list[i].ip = "127.0.0.1";
      mtr_list[i].port = 5000;
    }
    free(buf);
  }
  
exit_entry:
@@ -684,27 +720,31 @@
    if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
      mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
    }
    Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
    for(int i = 0; i < mpr_list_num; i++) {
      mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
      mpr_list[i].name = (Proc_ptr + i)->procData.name;
      mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
      mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
      mpr_list[i].online = (Proc_ptr + i)->stat;
      mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
      {
        if (j == 0) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
        } else if (j == 1) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
        } else if (j == 2) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
    if (mpr_list_num > 0) {
      Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
      for(int i = 0; i < mpr_list_num; i++) {
        mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
        mpr_list[i].name = (Proc_ptr + i)->procData.name;
        mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
        mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
        mpr_list[i].online = (Proc_ptr + i)->stat;
        mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
        for(int j = 0; j < mpr_list[i].topic_list_num; j++)
        {
          if (j == 0) {
            mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
          } else if (j == 1) {
            mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
          } else if (j == 2) {
            mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
          }
        }
      }
    }
    free(buf);
  }
    
  errString = bus_strerror(0, 1);
@@ -866,8 +906,8 @@
  ::bhome_msg::MsgCommonReply mcr;
   mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
   mcr.mutable_errmsg()->set_errstring(errString);
   *reply_len=mcr.ByteSizeLong();
   *reply=malloc(*reply_len);
   *reply_len = mcr.ByteSizeLong();
   *reply = malloc(*reply_len);
   mcr.SerializePartialToArray(*reply,*reply_len);
#else 
  len = strlen(errString) + 1;
@@ -1204,7 +1244,8 @@
  char data_buf[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
#if defined(PRO_DE_SERIALIZE)
  struct _BHAddress
   {
@@ -1261,14 +1302,19 @@
#else 
  strncpy(buf_temp, (char *)request, (sizeof(buf_temp) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(buf_temp) - 1));
#endif 
  str = buf_temp;
  val = net_mod_socket_buf_data_get(gNetmod_socket, str);
  if(val > 0) {
  if ((val > 0) && (hashtable_get(hashtable, val) != NULL)) {
    rv = 0;
  } else {
    if ((val > 0) && (hashtable_get(hashtable, val) == NULL)) {
      net_mod_socket_buf_data_del(gNetmod_socket, str);
    }
    rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
    if (rv == 0) {
@@ -1380,7 +1426,8 @@
  char *errString = NULL;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  struct _RequestReply
  {
    std::string proc_id;
@@ -1447,11 +1494,13 @@
 
  str = buf_temp;
  val = net_mod_socket_buf_data_get(gNetmod_socket, str);
  if(val > 0) {
  if ((val > 0) && (hashtable_get(hashtable, val) != NULL)) {
    rv = 0;
  } else {
    if ((val > 0) && (hashtable_get(hashtable, val) == NULL)) {
      net_mod_socket_buf_data_del(gNetmod_socket, str);
    }
    rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
    if (rv == 0) {
    
@@ -1583,6 +1632,13 @@
exit_entry:
  errString = bus_strerror(0, 1);
  
  if (rv != 0) {
    if ((proc_id != NULL) && (proc_id_len != NULL)) {
      *proc_id_len = 0;
      *proc_id = NULL;
    }
  }
#if defined(PRO_DE_SERIALIZE) 
  ::bhome_msg::MsgRequestTopicReply mrt; 
  mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -1707,6 +1763,12 @@
  if (rv != 0) { 
    rrr.topic = STR_RSV;
    rrr.data = STR_RSV;   
    if ((proc_id != NULL) && (proc_id_len != NULL)) {
      *proc_id_len = 0;
      *proc_id = NULL;
    }
  }
#if defined(PRO_DE_SERIALIZE)
@@ -1814,13 +1876,21 @@
}
#if defined(MSG_HANDLER)
int inter_key_get(void)
{
  if (gNetmod_socket != NULL)
    return net_mod_socket_get_key(gNetmod_socket);
  return 0;
  return SHM_BUS_KEY;
}
#endif
void *socket_data_get(void)
{
  return gNetmod_socket;
}
void inter_key_set(int key)
{
  net_mod_socket_bind(gNetmod_socket, key);
}