Fu Juntang
2021-09-17 5c912c70e9333298ff48f7ea15424f72ca977b99
Add the heartbeat logic feature.
11个文件已修改
694 ■■■■ 已修改文件
src/bh_api.cpp 267 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_proxy_start.cpp 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proc_def.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 207 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.cpp 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -13,6 +13,8 @@
#include "../proto/source/bhome_msg.pb.h"
#include "../proto/source/bhome_msg_api.pb.h"
#define TIME_WAIT       3
static Logger *logger = LoggerFactory::getLogger();
static int gRun_stat = 0;
@@ -20,7 +22,43 @@
static pthread_mutex_t mutex;
static char errString[100] = { 0x00 };
static pthread_t gTids;
static void *client_run_check(void *skptr) {
  pthread_detach(pthread_self());
  int data;
  int sec, nsec;
  int rv;
  int key;
  char buf[MAX_STR_LEN] = { 0x00 };
  void *buf_temp = NULL;
  int size;
  sec = TIME_WAIT;
  nsec = 0;
  sprintf(buf, "%s", "Success");
  data = net_mod_socket_int_get(gNetmod_socket);
  while(true) {
    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf_temp, &size, &key, SVR_STR, data);
    if (rv == 0) {
      BHFree(buf_temp, size);
      rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
      if (rv != 0) {
        logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
      }
    } else {
      logger->error("the process check failed with error: %s!\n", bus_strerror(rv));
    }
  }
}
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
@@ -29,6 +67,7 @@
  int count = 0;
  void *buf = NULL;
  int min = 0;
  char *errString = NULL;
  ProcInfo pData;
  
#if defined(PRO_DE_SERIALIZE)
@@ -43,9 +82,7 @@
  ::bhome_msg::ProcInfo input;
    if ((!input.ParseFromArray(proc_info, proc_info_len)) || (reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -58,9 +95,7 @@
#else   
  if ((proc_info == NULL) || (proc_info_len == 0) || (reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x90, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -74,8 +109,7 @@
    logger->error("the process has already registered!\n");
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -136,6 +170,16 @@
    }
#endif 
    if (pData.proc_id == NULL) {
      rv = EBUS_INVALID_PARA;
      bus_errorset(rv);
      pthread_mutex_unlock(&mutex);
      return false;
    }
    gNetmod_socket = net_mod_socket_open();
    hashtable_t *hashtable = mm_get_hashtable();
    key = hashtable_alloc_key(hashtable);
@@ -149,19 +193,19 @@
  
    rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
  errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgCommonReply mcr;
@@ -181,6 +225,8 @@
    
#endif 
  pthread_create(&gTids, NULL, client_run_check, NULL);
  return true;
}
@@ -190,6 +236,7 @@
  int rv;
  int min;
  void *buf = NULL;
  char *errString = NULL;
#if defined(PRO_DE_SERIALIZE)
  struct _ProcInfo_proto
@@ -205,9 +252,7 @@
    if(!input.ParseFromArray(proc_info, proc_info_len) || (reply == NULL) || (reply_len == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -219,9 +264,7 @@
#else 
  if ((reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -232,8 +275,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -250,21 +292,19 @@
      gRun_stat = 0;
      
    }
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
  
  errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -293,6 +333,7 @@
  void *buf = NULL;
  int total = 0;
  int count = 0; 
  char *errString = NULL;
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
@@ -306,9 +347,7 @@
    if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -327,9 +366,7 @@
  if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -341,8 +378,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -351,8 +387,7 @@
  if (topics_buf == NULL) {
    
    rv = EBUS_NO_MEM;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    logger->error("in BHRegisterTopics: Out of memory!\n");
@@ -382,10 +417,10 @@
  rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS);
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  free(topics_buf);
  bus_errorset(rv);
  errString = bus_strerror(0, 1);
  
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgCommonReply mcr;
@@ -414,6 +449,7 @@
  int min;
  void *buf = NULL;
  int size;
  char *errString = NULL;
  char topics_buf[MAX_STR_LEN] = { 0x00 };
  ProcInfo_query *ptr = NULL;
  ProcInfo *Proc_ptr = NULL;
@@ -433,9 +469,7 @@
  ::bhome_msg::MsgQueryTopic input1;
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len) || (reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -449,9 +483,7 @@
#else 
  if ((topic == NULL) || (topic_len == 0) || (reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -461,8 +493,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -477,9 +508,7 @@
#endif 
  rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS);
 
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
#if defined(PRO_DE_SERIALIZE)
    struct _MsgQueryTopicReply
@@ -510,6 +539,8 @@
      mtr_list[i].port = 5000;
    }
  }
  errString = bus_strerror(0, 1);
  
  ::bhome_msg::MsgQueryTopicReply mtr;
    mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -554,6 +585,7 @@
  void *buf = NULL;
  int size;
  int min;
  char *errString = NULL;
  ProcInfo_sum *Proc_ptr = NULL;
  char data_buf[MAX_STR_LEN] = { 0x00 };
@@ -573,8 +605,7 @@
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len) || (reply == NULL) || (reply_len == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -587,8 +618,7 @@
#else 
  if ((reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -598,8 +628,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -609,9 +638,7 @@
  }
  
  rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
  
#if defined(PRO_DE_SERIALIZE)
  struct _MsgQueryProcReply
@@ -657,6 +684,8 @@
        }
      }
    }
    errString = bus_strerror(0, 1);
    ::bhome_msg::MsgQueryProcReply mpr;
    mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -707,6 +736,7 @@
  int count = 0;
  int len, i;
  void *buf = NULL;
  char *errString = NULL;
  char *topics_buf = NULL;
  
#if defined(PRO_DE_SERIALIZE)
@@ -720,8 +750,7 @@
    if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -741,8 +770,7 @@
#else 
  if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -752,8 +780,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -762,8 +789,7 @@
  if (topics_buf == NULL) {
    
    rv = EBUS_NO_MEM;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    logger->error("in BHSubscribeTopics: Out of memory!\n");
@@ -806,8 +832,7 @@
  
  }
 
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  errString = bus_strerror(0, 1);
 
  free(topics_buf);
@@ -848,6 +873,7 @@
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  char *errString = NULL;
  
#if defined(PRO_DE_SERIALIZE)
  struct _ProcInfo_proto
@@ -862,9 +888,7 @@
    if(!input.ParseFromArray(proc_info,proc_info_len)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -875,8 +899,8 @@
    _input.private_info = input.private_info().c_str();
  rv = 0;
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
  errString = bus_strerror(0, 1);
  
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -912,8 +936,7 @@
    if(!input.ParseFromArray(msgpub, msgpub_len)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -924,8 +947,7 @@
  if ((topic == NULL) || (content == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -935,8 +957,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -966,8 +987,7 @@
  if (rv > 0)
    return true;
  
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
  
  return false;
}
@@ -995,16 +1015,14 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
  
  if ((msgpub == NULL) || (msgpub_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -1035,8 +1053,7 @@
    if (topics_buf == NULL) {
      
      rv = EBUS_NO_MEM;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      bus_errorset(rv);
      
      logger->error("in BHRequest: Out of memory!\n");
@@ -1052,8 +1069,7 @@
      if (data_buf == NULL) {
        
        rv = EBUS_NO_MEM;
        memset(errString, 0x00, sizeof(errString));
        strncpy(errString, bus_strerror(rv), sizeof(errString));
        bus_errorset(rv);
        
        logger->error("in BHRequest: Out of memory!\n");
        
@@ -1124,8 +1140,8 @@
  } else {
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
  }
  if (rv == 0)
@@ -1169,8 +1185,7 @@
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
  
        return false;
  }
@@ -1186,8 +1201,7 @@
  if ((request == NULL) || (request_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
  
        return false;
  }
@@ -1197,8 +1211,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -1227,8 +1240,7 @@
      if (topics_buf == NULL) {
        
        rv = EBUS_NO_MEM;
        memset(errString, 0x00, sizeof(errString));
        strncpy(errString, bus_strerror(rv), sizeof(errString));
        bus_errorset(rv);
        
        logger->error("in BHRequest: Out of memory!\n");
@@ -1267,9 +1279,7 @@
    }
  }
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
  if((msg_id == NULL) || (msg_id_len == NULL)) { 
    if (rv == 0)
      return true;
@@ -1309,6 +1319,7 @@
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  char *errString = NULL;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
  
@@ -1338,8 +1349,7 @@
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len) || (reply == NULL) || (reply_len == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -1355,8 +1365,7 @@
  if ((request == NULL) || (request_len == 0) || (reply == NULL) || (reply_len == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -1366,8 +1375,7 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
@@ -1398,8 +1406,7 @@
      if (topics_buf == NULL) {
        
        rv = EBUS_NO_MEM;
        memset(errString, 0x00, sizeof(errString));
        strncpy(errString, bus_strerror(rv), sizeof(errString));
        bus_errorset(rv);
        
        logger->error("in BHRequest: Out of memory!\n");
        
@@ -1463,9 +1470,7 @@
    free(topics_buf);
  }
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
  if (rv == 0) {
    if ((proc_id != NULL) && (proc_id_len != NULL)) {
      memset(buf_temp, 0x00, sizeof(buf_temp));
@@ -1481,8 +1486,7 @@
    if (topics_buf == NULL) {
      
      rv = EBUS_NO_MEM;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      bus_errorset(rv);
      
      logger->error("in BHRequest: Out of memory!\n");
@@ -1496,6 +1500,8 @@
    free(buf);
    free(topics_buf);
  }
  errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE) 
  if (rv == 0) {
@@ -1541,16 +1547,14 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
  
  if ((request == NULL) || (request_len == 0) || (src == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
        return false;
  }
@@ -1594,8 +1598,7 @@
    if (topics_buf == NULL) {
      
      rv = EBUS_NO_MEM;
      memset(errString, 0x00, sizeof(errString));
      strncpy(errString, bus_strerror(rv), sizeof(errString));
      bus_errorset(rv);
      
      logger->error("in BHReadRequest: Out of memory!\n");
      
@@ -1637,8 +1640,7 @@
    *src = buf;
  }
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
    
  if (rv == 0)
    return true;
@@ -1650,6 +1652,8 @@
{
  int rv;
  int data;
  int sec = 3;
  int nsec = 0;
  const char *_input;
  
#if defined(PRO_DE_SERIALIZE)
@@ -1657,8 +1661,7 @@
if (!input.ParseFromArray(reply, reply_len) || (src == NULL)) {
    
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -1669,8 +1672,7 @@
  if ((src == NULL) || (reply == NULL) || (reply_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    
    return false;
  }
@@ -1683,17 +1685,15 @@
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    bus_errorset(rv);
    return false;
  }
  data = net_mod_socket_svr_get(gNetmod_socket);
  rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data);
  rv = net_mod_socket_sendto_timeout(gNetmod_socket, _input, strlen(_input), *(int *)src, sec, nsec, SVR_STR, data);
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  bus_errorset(rv);
  
  if (rv == 0)
    return true;
@@ -1713,6 +1713,7 @@
int BHGetLastError(void **msg, int *msg_len)
{
  void *buf = NULL;
  char *errString = bus_strerror(0, 1);
  buf = malloc(strlen(errString) + 1);
src/bus_error.cpp
@@ -49,15 +49,11 @@
}
char *
bus_strerror(int err)
bus_strerror(int err, int flag)
{
  int s;
  char *buf;
  /* Make first caller allocate key for thread-specific data */
  if (err == 0) {
    err = EBUS_BASE;
  }
  s = pthread_once(&once, createKey);
  if (s != 0)
@@ -68,15 +64,23 @@
  {
    /* If first call from this thread, allocate
                                   buffer for thread, and save its location */
    buf = (char *)malloc(MAX_ERROR_LEN);
    buf = (char *)malloc(MAX_ERROR_LEN + sizeof(int));
    if (buf == NULL)
      err_exit(errno, "malloc");
    memset(buf, 0x00, MAX_ERROR_LEN + sizeof(int));
    s = pthread_setspecific(strerrorKey, buf);
    if (s != 0)
      err_exit(s, "pthread_setspecific");
  }
  if (flag != 0) {
    err = *(int *)(buf + MAX_ERROR_LEN);
  }
  if (err == 0) {
    err = EBUS_BASE;
  }
  if(err < EBUS_BASE) {
    // libc错误
@@ -106,3 +110,33 @@
  return buf;
}
void bus_errorset(int err)
{
  int s;
  char *buf;
  /* Make first caller allocate key for thread-specific data */
  s = pthread_once(&once, createKey);
  if (s != 0)
    err_exit(s, "pthread_once");
  buf = (char *)pthread_getspecific(strerrorKey);
  if (buf == NULL)
  {
    /* If first call from this thread, allocate
                                   buffer for thread, and save its location */
    buf = (char *)malloc(MAX_ERROR_LEN + sizeof(int));
    if (buf == NULL)
      err_exit(errno, "malloc");
    s = pthread_setspecific(strerrorKey, buf);
    if (s != 0)
      err_exit(s, "pthread_setspecific");
  }
  *(int *)(buf + MAX_ERROR_LEN) = err;
}
src/bus_error.h
@@ -21,6 +21,7 @@
extern int bus_errno;
char *bus_strerror(int eno) ;
char *bus_strerror(int eno, int flag = 0);
void bus_errorset(int err);
#endif
src/bus_proxy_start.cpp
@@ -11,9 +11,13 @@
#include <getopt.h>
#include <stdlib.h>
using namespace std;
#define SVR_PORT            5000
#define TOTAL_THREADS       2
#define TOTAL_THREADS       3
#define MAX_RETRIES         3
static void *gBusServer_socket = NULL;
static void *gServer_socket = NULL;
@@ -24,8 +28,10 @@
static int gBusServer_act = 0;
static int gBusServer_stat = 0;
pthread_t tids[2];
void *res[2];
pthread_t tids[TOTAL_THREADS];
void *res[TOTAL_THREADS];
extern list gLinkedList;
void *bus_start(void *skptr) {
@@ -48,6 +54,58 @@
  gServer_socket  = net_mod_server_socket_open(port);
  if(net_mod_server_socket_start(gServer_socket) != 0) {
    printf("start net mod server failed\n");
  }
  return NULL;
}
void *check_start(void *skptr) {
  int i;
  int ret;
  int val;
  int thres;
  int data;
  int data_ret;
  int total;
  void *buf;
  int size;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  struct timespec timeout = {.tv_sec = 3, .tv_nsec = 0};
  while(true) {
    total = gLinkedList.NodeNum();
    for (i = 0; i < total; i++) {
      val = gLinkedList.nodeGet(i);
      if (val > 0) {
        data_ret = bus_server_socket_wrapper_data_get(gBusServer_socket, val);
        thres = gLinkedList.dataGet(val);
        if ((data_ret == true) && (thres < MAX_RETRIES)) {
          data = gLinkedList.dataFixGet(val);
          sprintf(buf_temp, "%d", i + 1);
          ret = bus_server_socket_wrapper_proc_check(gBusServer_socket, data, buf_temp, strlen(buf_temp), &buf, &size, &timeout, BUS_TIMEOUT_FLAG);
          if (ret == 0) {
            gLinkedList.dataSet(val, 0x00);
            free(buf);
          } else {
            gLinkedList.dataSet(val, ++thres);
          }
        } else {
          gLinkedList.Delete(val);
          if (thres >= MAX_RETRIES) {
            bus_server_socket_wrapper_proc_release(gBusServer_socket, val);
          }
        }
      }
    }
    sleep(10);
  }
  return NULL;
@@ -110,6 +168,8 @@
  if (gBusServer_stat >= 0) { 
    pthread_create(&tids[1], NULL, svr_start, (void *)&gPort);
    pthread_create(&tids[0], NULL, check_start, NULL);
  }
  for (i = 0; i< TOTAL_THREADS; i++) {
src/proc_def.h
@@ -67,7 +67,6 @@
}
#endif
#define INT_STR     0x01
#define SVR_STR     0x02
#endif  //end of file
src/queue/array_lock_free_queue.h
@@ -235,7 +235,9 @@
  }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  if (m_count < Q_SIZE) {
  AtomicAdd(&m_count, 1);
  }
#endif
  return true;
}
@@ -275,7 +277,9 @@
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      if (m_count > 0) {
      AtomicSub(&m_count, 1);
      }
#endif
      return true;
    }
@@ -295,6 +299,7 @@
template<typename ELEM_T, typename Allocator>
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount) {
@@ -302,6 +307,9 @@
              << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
#else
  uint32_t currentReadIndex = m_readIndex;
#endif
  return m_theQueue[countToIndex(currentReadIndex + i)];
}
src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
    SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
    SHMKeySet *subscripter_set;
@@ -296,6 +297,169 @@
  return dataBuf;
}
void list::Insert(int aData, int bData)
{
  LinkNode *pHead = NULL;
  LinkNode *pNew = NULL;
  LinkNode *pCur = NULL;
  pNew = new(LinkNode);
  pNew->data = aData;
  pNew->data_fix = bData;
  pNew->count = 0;
  pHead = head;
  pCur = pHead;
  if(pHead == NULL) {
    head = pNew;
    pNew->next = NULL;
  } else {
    while(pCur->next != NULL) {
      pCur = pCur->next;
    }
    pCur->next = pNew;
    pNew->next = NULL;
  }
}
void list::Delete(int data)
{
  LinkNode *pHead;
  LinkNode *pCur;
  LinkNode *pNext;
  pHead = head;
  pCur = pHead;
  if(pHead == NULL)
    return;
  while((pCur != NULL) && (pCur->data == data)) {
    head = pCur->next;
    delete(pCur);
    pCur = head;
  }
  while((pCur != NULL) && (pCur->next != NULL)) {
    pNext = pCur->next;
    if(pNext->data == data) {
      pCur->next = pNext->next;
      pCur = pNext->next;
      delete(pNext);
    } else {
      pCur = pNext;
    }
  }
}
void list::dataSet(int data, int val)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return;
  while(pCur != NULL) {
    if(pCur->data == data) {
      pCur->count = val;
    }
    pCur = pCur->next;
  }
}
int list::dataGet(int data)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return 0;
  while(pCur != NULL) {
    if(pCur->data == data) {
      return pCur->count;
    }
    pCur = pCur->next;
  }
  return 0;
}
int list::dataFixGet(int data)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return 0;
  while(pCur != NULL) {
    if(pCur->data == data) {
      return pCur->data_fix;
    }
    pCur = pCur->next;
  }
  return 0;
}
int list::NodeNum(void)
{
  int count = 0;
  LinkNode *pCur = head;
  if (pCur == NULL) {
    return 0;
  }
  while(pCur != NULL) {
    ++count;
    pCur = pCur->next;
  }
  return count;
}
int list::nodeGet(int index)
{
  int count = 0;
  LinkNode *pCur = head;
  if (pCur == NULL) {
    return 0;
  }
  while((pCur != NULL) && (count <= index)) {
    if (count == index) {
      return pCur->data;
    }
    ++count;
    pCur = pCur->next;
  }
  return 0;
}
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
@@ -341,6 +505,9 @@
      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
      count += strlen(buf + count) + 1;
      if (flag == PROC_REG) {
        gLinkedList.Insert(key, atoi(Data_stru.int_info));
      }
    }
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -685,14 +852,44 @@
  }
}
int BusServerSocket::get_data(int val) {
  ProcZone::iterator proc_iter;
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  if ((proc_iter = proc->find(val)) != proc->end()) {
    return true;
  }
  return false;
}
int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag) {
  int ret;
  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
  return ret;
}
void BusServerSocket::remove_proc(int val) {
  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
}
// 运行代理
int BusServerSocket::_run_proxy_() {
    int size;
    int key;
  int flag;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
    char * action, *topic, *topics, *buf, *content;
    size_t head_len;
    bus_head_t head;
    int val;
    ProcDataZone::iterator proc_que_iter;
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
  int rv;
  char send_buf[512] = { 0x00 };
@@ -762,6 +959,16 @@
      }
        
      if (flag == PROC_REG) {
        memcpy(buf_temp, content, strlen(content) + 1);
        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
          val = proc_que_iter->second;
          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
        }
      }
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
src/socket/bus_server_socket.h
@@ -18,6 +18,44 @@
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
typedef struct _LinkNode
{
  int data;
  int data_fix;
  int count;
  _LinkNode *next;
} LinkNode;
class list
{
private:
  LinkNode *head;
public:
  list() {head = NULL;};
  void Insert(int aDate, int bDate);
  void Delete(int Data);
  int dataFixGet(int data);
  int dataGet(int data);
  void dataSet(int data, int val);
  int NodeNum(void);
  int nodeGet(int index);
  LinkNode *getHead() {return head;};
};
class BusServerSocket {
private:
    shm_socket_t *shm_socket;
@@ -66,6 +104,7 @@
     * @return 0 成功, 其他值 失败的错误码
    */
    int  start();
    int get_data(int val);
    /**
     * 停止bus
@@ -73,8 +112,9 @@
     * @return 0 成功, 其他值 失败的错误码
    */
    int  stop();
    int check_proc(int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag);
    void remove_proc(int val);
    /**
     * 获取soket key
src/socket/bus_server_socket_wrapper.cpp
@@ -41,3 +41,37 @@
    }
    
}
int bus_server_socket_wrapper_data_get(void * _socket, int val) {
  int ret;
  BusServerSocket *sockt = (BusServerSocket *)_socket;
  ret = sockt->get_data(val);
  return ret;
}
int bus_server_socket_wrapper_proc_check(void * _socket, int val, char *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag) {
  int ret;
  BusServerSocket *sockt = (BusServerSocket *)_socket;
  ret = sockt->check_proc(val, buf, len, buf_ret, len_ret, timeout, flag);
  return ret;
}
void bus_server_socket_wrapper_proc_release(void * _socket, int val) {
  BusServerSocket *sockt = (BusServerSocket *)_socket;
  sockt->remove_proc(val);
}
src/socket/bus_server_socket_wrapper.h
@@ -40,7 +40,12 @@
*/
int  bus_server_socket_wrapper_start_bus(void * _socket);
int  bus_server_socket_wrapper_data_get(void * _socket, int val);
int  bus_server_socket_wrapper_proc_check(void * _socket, int val, char *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag);
void bus_server_socket_wrapper_proc_release(void * _socket, int val);
#ifdef __cplusplus
}
src/socket/shm_socket.cpp
@@ -46,7 +46,7 @@
  void *tmp_ptr = hashtable_get(hashtable, key);
  if (tmp_ptr == NULL || tmp_ptr == (void *)1  ) {
    queue = new LockFreeQueue<shm_packet_t>(32);
    queue = new LockFreeQueue<shm_packet_t>(LOCK_FREE_Q_DEFAULT_SIZE);
    hashtable_put(hashtable, key, (void *)queue);
    return queue;
  } else if(force) {
@@ -76,7 +76,6 @@
  int s, type;
  pthread_mutexattr_t mtxAttr;
  logger->debug("shm_socket_open\n");
  // shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  shm_socket_t *sockt = new shm_socket_t;
  sockt->socket_type = socket_type;
@@ -231,7 +230,7 @@
  if (rv != 0) {
    if(rv == ETIMEDOUT){
      logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
      logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    }
    
@@ -275,7 +274,7 @@
  if (rv != 0) {
    logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    return rv;
   
  } 
@@ -368,7 +367,6 @@
    recvbufIter = sockt->recvbuf.find(uuid);
    if(recvbufIter != sockt->recvbuf.end()) {
      // 在缓存里查到了UUID匹配成功的
logger->debug("get from recvbuf: %s", uuid.c_str());
      recvpak = recvbufIter->second;
      sockt->recvbuf.erase(recvbufIter);
      goto LABLE_SUC;
@@ -382,11 +380,10 @@
        return EBUS_TIMEOUT;
      }
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
      return rv;
    } 
logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
    if(strlen(recvpak.uuid) == 0) {
      continue;
    } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) {
@@ -474,7 +471,7 @@
    rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags);
    if (rv != 0) {
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
      logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
      return rv;
    }