Fu Juntang
2021-08-30 b861de29176891657cc96631ddbfb4ea9e114a42
re-structure the communication work flow.
26个文件已修改
5个文件已添加
3430 ■■■■■ 已修改文件
CMakeLists.txt 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build.sh 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 1619 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_def.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_proxy_start.cpp 127 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proc_def.h 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/shm_mm.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 534 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.cpp 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 150 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/CMakeLists.txt 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/bus_test_inter.cpp 502 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/bus_test_server_mode.cpp 122 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -15,6 +15,8 @@
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
add_compile_options(-fPIC)
option(BUILD_DOC "Build doc" OFF)
@@ -30,5 +32,9 @@
    add_subdirectory(${PROJECT_SOURCE_DIR}/test)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
#    add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
      include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto)
    #add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
endif()
add_definitions("-DPROTOBUF_USS_DLLS")
build.sh
@@ -2,7 +2,7 @@
BUILD_TYPE="Debug"
BUILD_DOC="OFF"
BUILD_SHARED_LIBS="OFF"
BUILD_SHARED_LIBS="ON"
function usage() {
    echo "build.sh [release | debug | doc]"
src/CMakeLists.txt
@@ -5,6 +5,9 @@
# to the source code
configure_file(bus_config.h.in bus_config.h)
#set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON)
add_compile_options(-fPIC)
#target_compile_options(shm_queue PRIVATE -fPIC)
list(APPEND _SOURCES_ 
./logger_factory.cpp
@@ -16,31 +19,33 @@
./bus_error.cpp
./futex_sem.cpp
./svsem.cpp
./bh_api.cpp
./net/net_conn_pool.cpp
./net/net_mod_server_socket_wrapper.cpp
./net/net_mod_socket_wrapper.cpp
./net/net_mod_socket.cpp
./net/net_mod_socket_io.cpp
./net/net_mod_server_socket.cpp
./proto/bhome_msg_api.pb.cc
./proto/bhome_msg.pb.cc
./proto/error_msg.pb.cc
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
./shm/shm_mm.cpp
./bh_api.cc
./proto/bhome_msg.pb.cc
./proto/bhome_msg_api.pb.cc
./proto/error_msg.pb.cc
)
if (BUILD_SHARED_LIBS)
  add_library(shm_queue SHARED ${_SOURCES_})
  target_compile_options(shm_queue PRIVATE -fPIC)
  set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON)
else()
 add_library(shm_queue STATIC ${_SOURCES_})
endif()
# STATIC SHARED
# add_library(shm_queue ${_SOURCES_})
#add_library(shm_queue ${_SOURCES_})
target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} )
@@ -48,11 +53,14 @@
                                                     ${PROJECT_BINARY_DIR}/src
                           ${CMAKE_CURRENT_SOURCE_DIR}
                           ${CMAKE_CURRENT_SOURCE_DIR}/shm
                           ${CMAKE_CURRENT_SOURCE_DIR}/proto
                           ${CMAKE_CURRENT_SOURCE_DIR}/queue
                           ${CMAKE_CURRENT_SOURCE_DIR}/socket
                           ${CMAKE_CURRENT_SOURCE_DIR}/net
                           )
add_executable(bus_proxy_start bus_proxy_start.cpp)
target_link_libraries(bus_proxy_start PRIVATE shm_queue  ${EXTRA_LIBS} )
target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
@@ -74,6 +82,7 @@
./bus_def.h
./logger_factory.h
./sole.h
./proc_def.h
./queue/linked_lock_free_queue.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
@@ -91,7 +100,6 @@
./shm/shm_mm_wrapper.h
./shm/shm_allocator.h
./shm/shm_mm.h
./bh_api.h
  DESTINATION include)
src/bh_api.cpp
New file
@@ -0,0 +1,1619 @@
#include "net_mod_socket_wrapper.h"
#include "net_mod_server_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wrapper.h"
#include "proc_def.h"
#include "usg_common.h"
#include "bh_api.h"
#include <pthread.h>
#include <getopt.h>
#include "bhome_msg_api.pb.h"
#include "bhome_msg.pb.h"
#include "error_msg.pb.h"
#include "proto/bhome_msg.pb.h"
#include "proto/bhome_msg_api.pb.h"
static Logger *logger = LoggerFactory::getLogger();
static int gRun_stat = 0;
static void *gNetmod_socket = NULL;
static pthread_mutex_t mutex;
static char errString[100] = { 0x00 };
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  int key;
  int count = 0;
  void *buf = NULL;
  int min = 0;
  ProcInfo pData;
#if defined(PRO_DE_SERIALIZE)
  struct _ProcInfo_proto
    {
        const char *proc_id;
        const char *name;
        const char *public_info;
        const char *private_info;
    }_input;
  ::bhome_msg::ProcInfo input;
    if(!input.ParseFromArray(proc_info, proc_info_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
    _input.proc_id = input.proc_id().c_str();
    _input.name = input.name().c_str();
    _input.public_info = input.public_info().c_str();
    _input.private_info = input.private_info().c_str();
#else
  if ((proc_info == NULL) || (proc_info_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x90, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
#endif
  memset(&pData, 0x00, sizeof(ProcInfo));
  if (gRun_stat == 0) {
    pthread_mutex_init(&mutex, NULL);
  } else {
    logger->error("the process has already registered!\n");
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    gRun_stat = 1;
    shm_mm_wrapper_init(SHM_RES_SIZE);
#if defined(PRO_DE_SERIALIZE)
    if (_input.proc_id != NULL) {
      count = strlen(_input.proc_id) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.proc_id, _input.proc_id, min);
    }
    if (_input.name != NULL) {
      count = strlen(_input.name) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.name, _input.name, min);
    }
    if (_input.public_info != NULL) {
      count = strlen(_input.public_info) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.public_info, _input.public_info, min);
    }
    if (_input.private_info != NULL) {
      count = strlen(_input.private_info) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.private_info, _input.private_info, min);
    }
#else
    if (strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) > 0) {
      count = strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.proc_id, ((ProcInfo *)proc_info)->proc_id, min);
    }
    if (strlen((const char *)(((ProcInfo *)proc_info)->name)) > 0) {
      count = strlen((const char *)(((ProcInfo *)proc_info)->name)) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.name, ((ProcInfo *)proc_info)->name, min);
    }
    if (strlen((const char *)(((ProcInfo *)proc_info)->public_info)) > 0) {
      count = strlen((const char *)(((ProcInfo *)proc_info)->public_info)) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.public_info, ((ProcInfo *)proc_info)->public_info, min);
    }
    if (strlen((const char *)(((ProcInfo *)proc_info)->private_info)) > 0) {
      count = strlen((const char *)(((ProcInfo *)proc_info)->private_info)) + 1;
      min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
      strncpy(pData.private_info, ((ProcInfo *)proc_info)->private_info, min);
    }
#endif
    gNetmod_socket = net_mod_socket_open();
    hashtable_t *hashtable = mm_get_hashtable();
    key = hashtable_alloc_key(hashtable);
    net_mod_socket_bind(gNetmod_socket, key);
    rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
exit_entry:
#if defined(PRO_DE_SERIALIZE)
    ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len = mcr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mcr.SerializePartialToArray(*reply, *reply_len);
#else
    min = strlen(errString) + 1;
    buf = malloc(min) ;
    memcpy(buf, errString, strlen(errString));
    *((char *)buf + min - 1) = '\0';
    *reply = buf;
    *reply_len = min;
#endif
  if (rv == 0)
    return true;
  return false;
}
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  int min;
  void *buf = NULL;
#if defined(PRO_DE_SERIALIZE)
  struct _ProcInfo_proto
    {
        const char *proc_id;
        const char *name;
        const char *public_info;
        const char *private_info;
    }_input;
  ::bhome_msg::ProcInfo input;
    if(!input.ParseFromArray(proc_info, proc_info_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
    _input.proc_id = input.proc_id().c_str();
    _input.name = input.name().c_str();
    _input.public_info = input.public_info().c_str();
    _input.private_info = input.private_info().c_str();
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG);
    if (rv == 0) {
      net_mod_socket_close(gNetmod_socket);
      gNetmod_socket = NULL;
      gRun_stat = 0;
    }
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
exit_entry:
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len = mcr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mcr.SerializePartialToArray(*reply, *reply_len);
#else
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *reply = buf;
  *reply_len = min;
#endif
  if (rv == 0)
    return true;
  return false;
}
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  int min, i;
  void *buf = NULL;
  int total = 0;
  int count = 0;
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
    {
        int amount;
        const char *topics[MAX_STR_LEN];
    }_input;
  ::bhome_msg::MsgTopicList input;
    if(!input.ParseFromArray(topics, topics_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
    _input.amount = input.topic_list_size();
  if (_input.amount > MAX_STR_LEN) {
    _input.amount = MAX_STR_LEN;
  }
    for(int i = 0; i < _input.amount; i++) {
        _input.topics[i] = input.topic_list(i).c_str();
  }
#else
  if ((topics == NULL) || (topics_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
        if (total >= strlen(_input.topics[i])) {
          total -= strlen(_input.topics[i]);
        }
        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      }
    }
    logger->debug("the parsed compound register topics: %s!\n", topics_buf);
#else
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
#endif
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
exit_entry:
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len = mcr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mcr.SerializePartialToArray(*reply, *reply_len);
#else
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *reply = buf;
  *reply_len = min;
#endif
  if (rv == 0)
    return true;
  return false;
}
int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  int min;
  void *buf = NULL;
  int size;
  char topics_buf[MAX_STR_LEN] = { 0x00 };
  ProcInfo_query *ptr = NULL;
  ProcInfo *Proc_ptr = NULL;
#if defined(PRO_DE_SERIALIZE)
  struct _BHAddress
    {
        unsigned long long mq_id;
        long long abs_addr;
        const char *ip;
        int port;
    }_input0;
    const char *_input1;
  ::bhome_msg::BHAddress input0;
  ::bhome_msg::MsgQueryTopic input1;
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
  _input0.mq_id = input0.mq_id();
    _input0.abs_addr = input0.abs_addr();
    _input0.ip = input0.ip().c_str();
    _input0.port = input0.port();
    _input1 = input1.topic().c_str();
#else
  if ((topic == NULL) || (topic_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    min = (strlen(_input1) > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : strlen(_input1));
    strncpy(topics_buf, _input1, min);
#else
    min = (topic_len > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : topic_len);
    buf = const_cast<void *>(topic);
    strncpy(topics_buf, (const char *)buf, min);
#endif
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
exit_entry:
#if defined(PRO_DE_SERIALIZE)
    struct _MsgQueryTopicReply
    {
        std::string proc_id;
        unsigned long long mq_id;
        long long abs_addr;
        std::string ip;
        int port;
    }mtr_list[128];
    int mtr_list_num = 0;
  if (rv == 0) {
    ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
    mtr_list_num = ptr->num;
    if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
      mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
    }
    for(int i = 0; i < mtr_list_num; i++) {
      mtr_list[i].proc_id = ptr->procData.proc_id;
      mtr_list[i].mq_id = 0x00;
      mtr_list[i].abs_addr = 0x00;
      mtr_list[i].ip = "192.168.1.1";
      mtr_list[i].port = 5000;
    }
  }
  ::bhome_msg::MsgQueryTopicReply mtr;
    mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mtr.mutable_errmsg()->set_errstring(errString);
    for(int i = 0; i < mtr_list_num; i++)
    {
      ::bhome_msg::MsgQueryTopicReply_BHNodeAddress *mtrb = mtr.add_node_address();
        mtrb->set_proc_id(mtr_list[i].proc_id);
        mtrb->mutable_addr()->set_mq_id(mtr_list[i].mq_id);
        mtrb->mutable_addr()->set_abs_addr(mtr_list[i].abs_addr);
        mtrb->mutable_addr()->set_ip(mtr_list[i].ip);
        mtrb->mutable_addr()->set_port(mtr_list[i].port);
    }
    *reply_len = mtr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mtr.SerializePartialToArray(*reply, *reply_len);
#else
  if (rv == 0) {
    *reply = buf;
    *reply_len = size;
  } else {
    min = strlen(errString) + 1;
    buf = malloc(min) ;
    memcpy(buf, errString, strlen(errString));
    *((char *)buf + min - 1) = '\0';
    *reply = buf;
    *reply_len = min;
  }
#endif
  if (rv == 0)
    return true;
  return false;
}
int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  void *buf = NULL;
  int size;
  int min;
  ProcInfo_sum *Proc_ptr = NULL;
  char data_buf[MAX_STR_LEN] = { 0x00 };
#if defined(PRO_DE_SERIALIZE)
  struct _BHAddress
    {
        unsigned long long mq_id;
        long long abs_addr;
        const char *ip;
        int port;
    }_input0;
    const char *_input1;
  ::bhome_msg::BHAddress input0;
    ::bhome_msg::MsgQueryProc input1;
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
    _input0.mq_id = input0.mq_id();
    _input0.abs_addr = input0.abs_addr();
    _input0.ip = input0.ip().c_str();
    _input0.port = input0.port();
    _input1 = input1.proc_id().c_str();
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    if (query != NULL) {
      strncpy(data_buf, (char *)query, (sizeof(data_buf) - 1) > query_len ? query_len : (sizeof(data_buf) - 1));
    }
    rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
exit_entry:
#if defined(PRO_DE_SERIALIZE)
  struct _MsgQueryProcReply
    {
        std::string proc_id;
        std::string name;
        std::string public_info;
        std::string private_info;
        bool online;
        std::string topic_list[128];
        int topic_list_num;
    } mpr_list[128];
    int mpr_list_num = 0;
  if (rv == 0) {
    mpr_list_num = *(int *)buf;
    if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
      mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
    }
    Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
    for(int i = 0; i < mpr_list_num; i++) {
      mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
      mpr_list[i].name = (Proc_ptr + i)->procData.name;
      mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
      mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
      mpr_list[i].online = (Proc_ptr + i)->stat;
      mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
      {
        if (j == 0) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
        } else if (j == 1) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
        } else if (j == 2) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
        }
      }
    }
    ::bhome_msg::MsgQueryProcReply mpr;
    mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mpr.mutable_errmsg()->set_errstring(errString);
    for(int i = 0; i < mpr_list_num; i++)
    {
      ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list();
      mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id);
      mpri->mutable_proc()->set_name(mpr_list[i].name);
      mpri->mutable_proc()->set_public_info(mpr_list[i].public_info);
      mpri->mutable_proc()->set_private_info(mpr_list[i].private_info);
      mpri->set_online(mpr_list[i].online);
      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
      {
        mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]);
      }
    }
    *reply_len = mpr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mpr.SerializePartialToArray(*reply,*reply_len);
  }
#else
  if (rv == 0) {
    *reply = buf;
    *reply_len = size;
  } else {
    min = strlen(errString) + 1;
    buf = malloc(min) ;
    memcpy(buf, errString, strlen(errString));
    *((char *)buf + min - 1) = '\0';
    *reply = buf;
    *reply_len = min;
  }
#endif
    if (rv == 0)
    return true;
  return false;
}
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  int sec, nsec;
  int total = 0;
  int count = 0;
  int min, i;
  void *buf = NULL;
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
#if defined(PRO_DE_SERIALIZE)
  struct _MsgTopicList
    {
        int amount;
        const char *topics[MAX_STR_LEN];
    }_input;
  ::bhome_msg::MsgTopicList input;
    if(!input.ParseFromArray(topics, topics_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
  _input.amount = input.topic_list_size();
  if (_input.amount > MAX_STR_LEN) {
    _input.amount = MAX_STR_LEN;
  }
    for(int i = 0; i < _input.amount; i++)
        _input.topics[i] = input.topic_list(i).c_str();
#else
  if ((topics == NULL) || (topics_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        goto exit_entry;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    goto exit_entry;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    total = sizeof(topics_buf) / sizeof(char);
    for (i = 0; i < _input.amount; i++) {
      min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
      if (min > 0) {
        strncpy(topics_buf + count, _input.topics[i], min);
        count += min;
        if (total >= strlen(_input.topics[i])) {
          total -= strlen(_input.topics[i]);
        }
        if ((_input.amount > 1) && (i < (_input.amount - 1))) {
          strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
          total -= 1;
          count++;
        }
      } else {
        topics_buf[strlen(topics_buf) - 1] = '\0';
      }
    }
    logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
#else
    memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
#endif
    if (timeout_ms > 0) {
      sec = timeout_ms / 1000;
      nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
      rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec);
    } else if (timeout_ms == 0) {
      rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
    } else {
      rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
    }
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
exit_entry:
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len=mcr.ByteSizeLong();
    *reply=malloc(*reply_len);
    mcr.SerializePartialToArray(*reply,*reply_len);
#else
  min = strlen(errString) + 1;
  buf = malloc(min) ;
  memcpy(buf, errString, strlen(errString));
  *((char *)buf + min - 1) = '\0';
  *reply = buf;
  *reply_len = min;
#endif
  if (rv == 0)
    return true;
  return false;
}
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv = BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms);
  return rv;
}
int BHHeartbeatEasy(const int timeout_ms)
{
  return true;
}
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
#if defined(PRO_DE_SERIALIZE)
  struct _ProcInfo_proto
    {
        const char *proc_id;
        const char *name;
        const char *public_info;
        const char *private_info;
    }_input;
  ::bhome_msg::ProcInfo input;
    if(!input.ParseFromArray(proc_info,proc_info_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
    _input.proc_id = input.proc_id().c_str();
    _input.name = input.name().c_str();
    _input.public_info = input.public_info().c_str();
    _input.private_info = input.private_info().c_str();
  rv = 0;
  memset(errString, 0x00, sizeof(errString));
  strncpy(errString, bus_strerror(rv), sizeof(errString));
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len=mcr.ByteSizeLong();
    *reply=malloc(*reply_len);
    mcr.SerializePartialToArray(*reply,*reply_len);
#endif
  return true;
}
#if defined(PRO_DE_SERIALIZE)
int BHPublish(const char *msgpub, const char msgpub_len, const int timeout_ms)
#else
int BHPublish(const char *topic, const char *content, const int timeout_ms)
#endif
{
  int rv;
  int min;
  void *buf = NULL;
  net_node_t node_arr;
  int node_arr_len = 0;
#if defined(PRO_DE_SERIALIZE)
  struct _MsgPublish
    {
        const char *topic;
        const char *data;
    }_input;
  ::bhome_msg::MsgPublish input;
    if(!input.ParseFromArray(msgpub, msgpub_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
    _input.topic = input.topic().c_str();
    _input.data = input.data().c_str();
#else
  if ((topic == NULL) || (content == NULL)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    if (timeout_ms > 0) {
      rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data), timeout_ms);
    } else if (timeout_ms == 0) {
      rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
    } else {
      rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
    }
#else
    if (timeout_ms > 0) {
      rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content), timeout_ms);
    } else if (timeout_ms == 0) {
      rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
    } else {
      rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
    }
#endif
    pthread_mutex_unlock(&mutex);
    if (rv > 0)
      return true;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
  return false;
}
int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms)
{
  int rv;
  int len;
  void *buf;
  int key;
  int size;
  int sec, nsec;
  char topics_buf[MAX_STR_LEN] = { 0x00 };
  char data_buf[MAX_STR_LEN * 3] = { 0x00 };
  struct _ReadSubReply
  {
    std::string proc_id;
    std::string topic;
    std::string data;
  } rsr;
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  if (timeout_ms > 0) {
    sec = timeout_ms / 1000;
    nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
    rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
  } else if (timeout_ms == 0) {
    rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
  } else {
    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
  }
  if (rv == 0) {
    len = strlen((char *)buf);
    if (len > size) {
      len = size;
    }
    strncpy(topics_buf, (char *)buf, len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : len);
    if (len < size) {
      len = strlen(topics_buf) + 1;
      strncpy(data_buf, (char *)buf + len, size - len);
    }
    free(buf);
#if defined(PRO_DE_SERIALIZE)
    rsr.topic = topics_buf;
    rsr.data = data_buf;
    memset(topics_buf, 0x00, sizeof(topics_buf));
    sprintf(topics_buf, "%d", key);
    rsr.proc_id = topics_buf;
    *proc_id_len = rsr.proc_id.size();
    *proc_id = malloc(*proc_id_len);
    memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len);
    ::bhome_msg::MsgPublish Mp;
    Mp.set_topic(rsr.topic);
    Mp.set_data(rsr.data.data());
    *msgpub_len = Mp.ByteSizeLong();
    *msgpub = malloc(*msgpub_len);
    Mp.SerializePartialToArray(*msgpub, *msgpub_len);
#else
    void *ptr;
    if (len < size) {
      ptr = malloc(size - len);
      len = size - len;
      memcpy(ptr, data_buf, len);
    } else {
      ptr = malloc(len);
      memcpy(ptr, topics_buf, len);
    }
    *msgpub = ptr;
    *msgpub_len = len;
    memset(topics_buf, 0x00, sizeof(topics_buf));
    sprintf(topics_buf, "%d", key);
    *proc_id_len = strlen(topics_buf);
    *proc_id = malloc(*proc_id_len);
    memcpy(*proc_id, topics_buf, *proc_id_len);
#endif
    pthread_mutex_unlock(&mutex);
  } else {
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
  if (rv == 0)
    return true;
  return false;
}
int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len)
{
  int rv;
  void *buf;
  int size;
  int val;
  int len;
  int min;
  int sec, nsec;
  std::string MsgID;
  int timeout_ms = 3000;
  char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
#if defined(PRO_DE_SERIALIZE)
  struct _BHAddress
    {
        unsigned long long mq_id;
        long long abs_addr;
        const char *ip;
        int port;
    }_input0;
  struct MsgRequestTopic
    {
        const char *topic;
        const char *data;
    }_input1;
  ::bhome_msg::BHAddress input0;
    ::bhome_msg::MsgRequestTopic input1;
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
    _input0.mq_id = input0.mq_id();
    _input0.abs_addr = input0.abs_addr();
    _input0.ip = input0.ip().c_str();
    _input0.port = input0.port();
    _input1.topic = input1.topic().c_str();
    _input1.data = input1.data().c_str();
#else
  if ((request == NULL) || (request_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1));
#else
    strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1));
#endif
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
    if (rv == 0) {
      val = atoi((char *)buf);
      free(buf);
      if (val > 0) {
        len = strlen(topics_buf);
#if defined(PRO_DE_SERIALIZE)
        min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
        strncpy(topics_buf + len + 1, _input1.data, min);
        len += (min + 1);
#endif
        if (timeout_ms > 0) {
          sec = timeout_ms / 1000;
          nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
          rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec);
        } else if (timeout_ms == 0) {
          rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val);
        } else {
          rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val);
        }
      } else {
        rv = EBUS_RES_UNSUPPORT;
      }
    }
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
    if((msg_id == NULL) || (msg_id_len == NULL)) {
      if (rv == 0)
        return true;
      return false;
    }
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
  if (rv == 0) {
    memset(topics_buf, 0x00, sizeof(topics_buf));
    sprintf(topics_buf, "%d", val);
    MsgID = topics_buf;
    *msg_id_len = MsgID.size();
      *msg_id = malloc(*msg_id_len);
      memcpy(*msg_id, MsgID.data(), *msg_id_len);
    return true;
  }
  return false;
}
int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len,
              void **reply, int *reply_len, const int timeout_ms)
{
  int rv;
  void *buf;
  int size;
  int val;
  int min, len;
  net_node_t node;
  int node_size;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  int sec, nsec;
  char topics_buf[MAX_STR_LEN] = { 0x00 };
#if defined(PRO_DE_SERIALIZE)
  struct _BHAddress
    {
        unsigned long long mq_id;
        long long abs_addr;
        const char *ip;
        int port;
    }_input0;
    struct _MsgRequestTopic
    {
        const char *topic;
        const char *data;
    }_input1;
  ::bhome_msg::BHAddress input0;
    ::bhome_msg::MsgRequestTopic input1;
    if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
  _input0.mq_id = input0.mq_id();
    _input0.abs_addr = input0.abs_addr();
    _input0.ip = input0.ip().c_str();
    _input0.port = input0.port();
    _input1.topic = input1.topic().c_str();
    _input1.data = input1.data().c_str();
#else
  if ((request == NULL) || (request_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
        return false;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
#if defined(PRO_DE_SERIALIZE)
    strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic)  : (sizeof(topics_buf) - 1));
#else
    strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1));
#endif
    rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
    if (rv == 0) {
      val = atoi((char *)buf);
      free(buf);
      if (val > 0) {
        memset(&node, 0x00, sizeof(node));
        len = strlen(topics_buf);
#if defined(PRO_DE_SERIALIZE)
        min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
        strncpy(topics_buf + len + 1, _input1.data, min);
        len += (min + 1);
#endif
        node.key = val;
        rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size);
        if (rv > 0) {
          if (recv_arr_size > 0) {
            node.key = recv_arr[0].key;
            memset(topics_buf, 0x00, sizeof(topics_buf));
            size = recv_arr[0].content_length;
            buf = (char *)malloc(size);
            strncpy((char *)buf, (char *)recv_arr[0].content, size);
#if !defined(PRO_DE_SERIALIZE)
            *reply = buf;
            *reply_len = size;
#endif
          }
          net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
          if(errarr_size > 0) {
            free(errarr);
          }
          rv = 0;
        } else {
          rv = EBUS_TIMEOUT;
        }
      } else {
        rv = EBUS_RES_UNSUPPORT;
      }
    }
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    struct _RequestReply
    {
      std::string proc_id;
      std::string data;
    }rr;
    if (rv == 0) {
      memset(topics_buf, 0x00, sizeof(topics_buf));
      sprintf(topics_buf, "%d", node.key);
      rr.proc_id = topics_buf;
      *proc_id_len = rr.proc_id.size();
      *proc_id = malloc(*proc_id_len);
      memcpy(*proc_id, rr.proc_id.data(), *proc_id_len);
      memset(topics_buf, 0x00, sizeof(topics_buf));
      memcpy(topics_buf, buf, size);
      rr.data = topics_buf;
#if defined(PRO_DE_SERIALIZE)
      ::bhome_msg::MsgRequestTopicReply mrt;
      mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
      mrt.mutable_errmsg()->set_errstring(errString);
      mrt.set_data(rr.data.data());
      *reply_len = mrt.ByteSizeLong();
      *reply = malloc(*reply_len);
      mrt.SerializePartialToArray(*reply, *reply_len);
#endif
    }
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
   if (rv == 0)
    return true;
  return false;
}
int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms)
{
  int rv;
  void *buf;
  int key;
  int size;
  int sec, nsec;
  char topics_buf[MAX_STR_LEN] = { 0x00 };
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    if (timeout_ms > 0) {
      sec = timeout_ms / 1000;
      nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
      rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
    } else if (timeout_ms == 0) {
      rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
    } else {
      rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
    }
    if (rv == 0) {
      struct _ReadRequestReply
      {
        std::string proc_id;
        std::string topic;
        std::string data;
        void *src;
      } rrr;
      sprintf(topics_buf, "%d", key);
      rrr.proc_id = topics_buf;
      *proc_id_len = rrr.proc_id.size();
      *proc_id = malloc(*proc_id_len);
      memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len);
      memset(topics_buf, 0x00, sizeof(topics_buf));
      memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size);
      rrr.topic = topics_buf;
      rrr.data = topics_buf;
#if defined(PRO_DE_SERIALIZE)
      ::bhome_msg::MsgRequestTopic mrt;
      mrt.set_topic(rrr.topic);
      mrt.set_data(rrr.data.data());
      *request_len = mrt.ByteSizeLong();
      *request = malloc(*request_len);
      mrt.SerializePartialToArray(*request,*request_len);
#else
      *request = buf;
      *request_len = size;
#endif
      buf = malloc(sizeof(int));
      *(int *)buf = key;
      *src = buf;
    }
    pthread_mutex_unlock(&mutex);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
  if (rv == 0)
    return true;
  return false;
}
int BHSendReply(void *src, const void *reply, const int reply_len)
{
  int rv;
#if defined(PRO_DE_SERIALIZE)
  ::bhome_msg::MsgRequestTopicReply input;
  if (!input.ParseFromArray(reply, reply_len)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  const char *_input;
  _input = input.data().data();
#else
  if ((src == NULL) || (reply == NULL) || (reply_len == 0)) {
    rv = EBUS_INVALID_PARA;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
#endif
  if (gRun_stat == 0) {
    logger->error("the process has not been registered yet!\n");
    rv = EBUS_RES_NO;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    return false;
  }
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
    pthread_mutex_unlock(&mutex);
  } else {
    rv = EBUS_RES_BUSY;
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
  }
  if (rv == 0)
    return true;
  return false;
}
int BHCleanup() {
  return true;
}
void BHFree(void *buf, int size) {
  free(buf);
}
int BHGetLastError(void **msg, int *msg_len)
{
  void *buf = NULL;
  buf = malloc(strlen(errString) + 1);
  memset(buf, 0x00, strlen(errString) + 1);
  memcpy(buf, errString, strlen(errString));
  if ((msg != NULL) && (msg_len != NULL)) {
    *msg = buf;
    *msg_len = strlen(errString);
    return true;
  }
  return false;
}
src/bh_api.h
@@ -1,9 +1,11 @@
#ifndef BH_API
#define BH_API
#ifndef _BH_API_WRAPPER_
#define _BH_API_WRAPPER_
#ifdef __cplusplus
extern "C" {
#endif
#define PRO_DE_SERIALIZE    1
int BHRegister(const void *proc_info,
               const int proc_info_len,
@@ -17,15 +19,19 @@
                 int *reply_len,
                 const int timeout_ms);
int BHRegisterTopics(const void *topics,
                     const int topics_len,
                     void **reply,
                     int *reply_len,
                     const int timeout_ms);
int BHQueryTopicAddress(const void *remote, const int remote_len,
                        const void *topic, const int topic_len,
                        void **reply, int *reply_len,
int BHQueryTopicAddress(const void *remote,
                        const int remote_len,
                        const void *topics,
                        const int topics_len,
                        void **reply,
                        int *reply_len,
                        const int timeout_ms);
int BHQueryProcs(const void *remote,
@@ -41,6 +47,7 @@
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
int BHSubscribeNetTopics(const void *topics,
                         const int topics_len,
                         void **reply,
@@ -54,9 +61,13 @@
                int *reply_len,
                const int timeout_ms);
#if defined(PRO_DE_SERIALIZE)
int BHPublish(const void *msgpub,
              const int msgpub_len,
              const int timeout_ms);
#else
int BHPublish(const char *topic, const char *content, const int timeout_ms);
#endif
int BHReadSub(void **proc_id,
              int *proc_id_len,
@@ -96,7 +107,12 @@
void BHFree(void *buf, int size);
int BHGetLastError(void **msg, int *msg_len);
#ifdef __cplusplus
}
#endif
#endif
#endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */
src/bus_def.h
@@ -4,4 +4,11 @@
#define BUS_TIMEOUT_FLAG  1
#define BUS_NOWAIT_FLAG  1 << 1
#define SHM_RES_SIZE        512
#define SHM_BUS_PROC_MAP_KEY    10
#define SHM_BUS_PROC_TCS_MAP_KEY    11
#define SHM_BUS_TCS_MAP_KEY    20
#define SHM_BUS_PROC_PART_MAP_KEY   30
#endif
src/bus_error.cpp
@@ -20,7 +20,11 @@
  "Send to self error",
  "Receive from wrong end",
  "Service stoped",
  "Exceed resource limit"
  "Exceed resource limit",
  "Service not supported",
  "Resource busy",
  "Resource not provide",
  "Invalid parameters"
};
@@ -50,6 +54,10 @@
  char *buf;
  /* Make first caller allocate key for thread-specific data */
  if (err == 0) {
    err = EBUS_BASE;
  }
  s = pthread_once(&once, createKey);
  if (s != 0)
    err_exit(s, "pthread_once");
src/bus_error.h
@@ -13,6 +13,10 @@
#define EBUS_RECVFROM_WRONG_END 506
#define EBUS_STOPED 507
#define EBUS_EXCEED_LIMIT 508
#define EBUS_RES_UNSUPPORT  509
#define EBUS_RES_BUSY  510
#define EBUS_RES_NO  511
#define EBUS_INVALID_PARA  512
extern int bus_errno;
src/bus_proxy_start.cpp
New file
@@ -0,0 +1,127 @@
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <signal.h>
#include <limits.h>
#include <stdio.h>
#include <errno.h>
#include <getopt.h>
#include <stdlib.h>
#define SVR_PORT            5000
#define TOTAL_THREADS       2
static void *gBusServer_socket = NULL;
static void *gServer_socket = NULL;
static int gShm_size = -1;
static int gPort = -1;
static int gBusServer_act = 0;
static int gBusServer_stat = 0;
pthread_t tids[2];
void *res[2];
void *bus_start(void *skptr) {
  gBusServer_act = 1;
  gBusServer_socket = bus_server_socket_wrapper_open();
  if (bus_server_socket_wrapper_start_bus(gBusServer_socket) != 0) {
    printf("start bus failed\n");
    gBusServer_stat = -1;
  }
  return NULL;
}
void *svr_start(void *skptr) {
  int port = *(int *)skptr;
  gServer_socket  = net_mod_server_socket_open(port);
  if(net_mod_server_socket_start(gServer_socket) != 0) {
    printf("start net mod server failed\n");
  }
  return NULL;
}
int main(int argc, char *argv[])
{
  char *endptr;
  char i;
  int val;
  sigset_t mask_all, pre;
  sigfillset(&mask_all);
  sigprocmask(SIG_BLOCK, &mask_all, &pre);
  if (argc >= 4) {
    fprintf(stderr, "Usage: %s [shm size] [server port]\n", argv[0]);
    exit(0);
  };
  if (argc >= 2) {
    argc -= 2;
    for (i = 0; i <= argc; i++) {
      errno = 0;
      val = strtol(argv[i + 1], &endptr, 10);
      if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN))
             || (errno != 0 && val == 0)) {
        fprintf(stderr, "invalid parameter: %s\n", argv[i + 1]);
        exit(0);
      }
      if (endptr == argv[i + 1]) {
        fprintf(stderr, "invalid parameter %s: No digits were found\n", argv[i + 1]);
        exit(0);
      }
      if (i == 0) {
        gShm_size = val;
      } else {
        gPort = val;
      }
    }
  }
  if (gShm_size == -1) {
    gShm_size = SHM_RES_SIZE;
  }
  shm_mm_wrapper_init(SHM_RES_SIZE);
  pthread_create(&tids[0], NULL, bus_start, NULL);
  if (gPort == -1) {
    gPort = SVR_PORT;
  }
  while(gBusServer_act == 0) {
    sleep(1);
  }
  if (gBusServer_stat >= 0) {
    pthread_create(&tids[1], NULL, svr_start, (void *)&gPort);
  }
  for (i = 0; i< TOTAL_THREADS; i++) {
    if(pthread_join(tids[i], &res[i]) != 0) {
      perror("bus_proxy pthread_join");
    }
  }
  bus_server_socket_wrapper_close(gBusServer_socket);
  net_mod_socket_close(gServer_socket);
  shm_mm_wrapper_destroy();
  return 0;
}
src/net/net_mod_server_socket.cpp
@@ -181,7 +181,7 @@
    }
    if( ret != 0) {
      logger->error("转发失败 : NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key,  bus_strerror(ret));
      logger->error("fail: NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key,  bus_strerror(ret));
      // 转发失败
      response_head.code = ret;
      response_head.content_length = 0;
@@ -277,7 +277,6 @@
        FD_CLR(connfd, &pool.read_set); 
        pool.clientfd[i] = -1;
        logger->debug("===server close client %d\n", connfd);
    // printf("===server close client %d\n", connfd);
      }
    }
src/net/net_mod_socket.cpp
@@ -22,7 +22,7 @@
NetModSocket::~NetModSocket() {
}
@@ -46,6 +46,15 @@
  return shmModSocket.force_bind(key);
}
int NetModSocket::bind_proc_id(char *buf, int len) {
  return shmModSocket.bind_proc_id(buf, len);
}
int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
  return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
}
// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
//   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
//   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
@@ -67,7 +76,7 @@
  NetConnPool *mpool = (NetConnPool *)_pool;
  delete mpool;
  logger->debug("destory connPool");
}
 /* One-time key creation function */
@@ -343,9 +352,6 @@
  return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
}
// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
 int content_size, int  msec) {
  int i, connfd;
@@ -363,7 +369,7 @@
  net_mod_err_t err_msg;
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
  if ((node_arr == NULL) || (arrlen == 0)) {
    if(msec == 0) {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY,  NULL, BUS_NOWAIT_FLAG);
    } else if(msec > 0) {
@@ -525,7 +531,6 @@
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
                              const struct timespec *timeout , int flag, void * user_data ) {
@@ -764,23 +769,23 @@
  head.mod = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  tmp_ptr += sizeof(uint32_t);
  memcpy(head.host, tmp_ptr, sizeof(head.host));
 
  tmp_ptr += sizeof(head.host);
  head.port = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  tmp_ptr += sizeof(uint32_t);
  head.key = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  tmp_ptr += sizeof(uint32_t);
  head.content_length = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  tmp_ptr += sizeof(uint32_t);
  head.topic_length = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  tmp_ptr += sizeof(uint32_t);
  head.timeout = ntohl(GET_INT32(tmp_ptr));
 
  return head;
src/net/net_mod_socket.h
@@ -3,6 +3,7 @@
#include "usg_common.h"
#include "shm_mod_socket.h"
#include "socket_io.h"
#include "proc_def.h"
#include <poll.h>
#include "socket_def.h"
#include "net_conn_pool.h"
@@ -17,7 +18,7 @@
    int key;
};
#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 6 * sizeof(uint32_t))
#define NET_MODE_REQUEST_HEAD_LENGTH sizeof(net_mod_request_head_t)
 
// 请求头
@@ -118,8 +119,8 @@
  */
  int force_bind( int key);
  int bind_proc_id(char *buf, int len);
  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
  
  /**
   * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
@@ -166,7 +167,6 @@
  // 接受信息超时返回。 @sec 秒 , @nsec 纳秒
  int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec);
  int recvfrom_nowait( void **buf, int *size, int *key);
  /**
   * 本地发送请求信息并等待接收应答
   * @key 发送给谁
src/net/net_mod_socket_wrapper.cpp
@@ -44,6 +44,14 @@
    // return sockt->bind(key);
}
int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
{
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->reg(pData, len, buf, size, timeout_ms, flag);
}
/**
 * 发送信息
 * @key 发送给谁
@@ -51,20 +59,17 @@
 */
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto(buf, size, key);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto_timeout(buf, size, key, sec, nsec);
    // return sockt->sendto(buf, size, key);
}
// 发送信息立刻返回。
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto_nowait(buf, size, key);
}
@@ -77,22 +82,19 @@
    int rv;
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
    rv = sockt->recvfrom(buf, size, key);
    logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
    return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    // return sockt->recvfrom(buf, size, key);
    return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
}
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom_nowait(buf, size, key);
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -101,6 +103,11 @@
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
}
int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->bind_proc_id(proc_id, len);
}
/**
 * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
 * @timeout 等待时间,单位是千分之一秒
src/net/net_mod_socket_wrapper.h
@@ -12,6 +12,7 @@
#define __NET_MOD_SOCKET_H__
#include "net_mod_socket.h"
#include "proc_def.h"
#ifdef __cplusplus
extern "C" {
@@ -55,6 +56,8 @@
 */
int net_mod_socket_force_bind(void * _socket, int key);
int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
/**
 * @brief 发送信息,发送完成才返回
 *
src/proc_def.h
New file
@@ -0,0 +1,70 @@
#ifndef __PROC_DEF_
#define __PROC_DEF_
#ifdef __cplusplus
extern "C" {
#endif
#define MAX_STR_LEN     128 //keep the same with serializer in proc check
#define MIN_STR_LEN      10
#define MAX_PROC_NUM    128
#define MAX_TOPICS_NUN  60
#define PROC_REG        1
#define PROC_UNREG      2
#define PROC_REG_TCS    3
#define PROC_QUE_TCS    4
#define PROC_QUE_STCS   5
#define PROC_QUE_ATCS   6
#define STR_MAGIC       ","
typedef struct _ProcInfo {
#if 0
  char ServerID[MAX_STR_LEN];  // 机器ID
  char BoardID[MAX_STR_LEN]; // 板卡ID
  char ServerIP[MAX_STR_LEN];  // 机器IP
  char ProcID[MAX_STR_LEN]; // 进程唯一标识
  char ProcName[MAX_STR_LEN];  // 进程名称
  char ProcLabel[MAX_STR_LEN];  // 进程的描述信息,用于区分同一进程名称下多个进程
#else
  char proc_id[MAX_STR_LEN];
  char name[MAX_STR_LEN];
  char public_info[MAX_STR_LEN];
  char private_info[MAX_STR_LEN];
#endif
} ProcInfo;
typedef struct _ProcInfo_sum {
  ProcInfo procData;
  int stat;
  char reg_info[MAX_STR_LEN];
  char local_info[MAX_STR_LEN];
  char net_info[MAX_STR_LEN];
  int list_num;
} ProcInfo_sum;
typedef struct _ProcInfo_query {
  char name[MAX_STR_LEN];
  int num;
  ProcInfo procData;
} ProcInfo_query;
#ifdef __cplusplus
}
#endif
#endif  //end of file
src/queue/lock_free_queue.h
@@ -15,7 +15,7 @@
#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
#define LOCK_FREE_Q_DEFAULT_SIZE 320
#define LOCK_FREE_Q_ST_OPENED 0
@@ -177,6 +177,7 @@
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
  //std::cout << "LockFreeQueue init reference=" << reference << std::endl;
  if (sem_init(&slots, 1, qsize) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  if (sem_init(&items, 1, 0) == -1)
@@ -211,6 +212,7 @@
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
  // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
  if (sem_destroy(&slots) == -1) {
    err_exit(errno, "LockFreeQueue sem_destroy");
  }
@@ -249,10 +251,10 @@
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
  // sigset_t mask_all, pre;
  // sigfillset(&mask_all);
  sigset_t mask_all, pre;
  sigfillset(&mask_all);
  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
  sigprocmask(SIG_BLOCK, &mask_all, &pre);
  if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    if (psem_trywait(&slots) == -1) {
@@ -271,12 +273,12 @@
  if (m_qImpl.push(a_data)) {
    psem_post(&items);
    // sigprocmask(SIG_SETMASK, &pre, NULL);
    sigprocmask(SIG_SETMASK, &pre, NULL);
    return 0;
  }
  LABEL_FAILTURE:
  // sigprocmask(SIG_SETMASK, &pre, NULL);
  sigprocmask(SIG_SETMASK, &pre, NULL);
  return errno;
}
@@ -285,10 +287,10 @@
  template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
  // sigset_t mask_all, pre;
  // sigfillset(&mask_all);
  sigset_t mask_all, pre;
  sigfillset(&mask_all);
  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
  sigprocmask(SIG_BLOCK, &mask_all, &pre);
  if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    if (psem_trywait(&items) == -1) {
@@ -306,13 +308,13 @@
  if (m_qImpl.pop(a_data)) {
    psem_post(&slots);
    // sigprocmask(SIG_SETMASK, &pre, NULL);
    sigprocmask(SIG_SETMASK, &pre, NULL);
    return 0;
  }
  LABEL_FAILTURE:
  // sigprocmask(SIG_SETMASK, &pre, NULL);
  sigprocmask(SIG_SETMASK, &pre, NULL);
  return errno;
}
src/queue/shm_queue.h
@@ -40,8 +40,8 @@
  bool full();
  bool empty();
  int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
  int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
  ELEM_T &operator[](unsigned i);
src/shm/hashtable.h
@@ -5,7 +5,7 @@
#include <functional>
#include <set>
#define MAPSIZE 1024
#define MAPSIZE 4096
// 创建Queue数量的上限
#define QUEUE_COUNT_LIMIT 300
src/shm/mm.cpp
@@ -256,6 +256,7 @@
    first = false;
    shmid  = shmget(SHM_KEY, 0, 0);
  }
  if (shmid == -1)
    err_exit(errno, "mm_init shmget");
  shmp = shmat(shmid, key_addr, 0);
@@ -338,8 +339,6 @@
      else 
         LoggerFactory::getLogger()->debug("shared memory destroy\n");
      LoggerFactory::getLogger()->debug( "mm_destroy: real destroy.");
      SemUtil::inc(mutex);
      SemUtil::remove(mutex);
      return true;
@@ -363,6 +362,7 @@
void mm_free_by_key(int key) {
  void *ptr;
  ptr = hashtable_get(hashtable, key);
  if(ptr != NULL) {
    mm_free(ptr);
src/shm/shm_mm.h
@@ -8,6 +8,8 @@
#define SHM_QUEUE_ST_CLOSED 1
#define SHM_QUEUE_ST_RECYCLED 2
#define SHM_QUEUE_ST_SET    50
struct shm_queue_status_t {
  int status;
@@ -49,4 +51,6 @@
int shm_mm_alloc_key();
typedef std::map<SHMString, int, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, int> > > ProcDataZone;
#endif 
src/socket/bus_server_socket.cpp
@@ -1,6 +1,7 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -12,10 +13,10 @@
    SHMTopicSubMap::iterator map_iter;
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
                for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
                    cb(subscripter_set, *set_iter);
                }
            }
@@ -35,7 +36,7 @@
        SHMTopicSubMap::iterator map_iter;
        if(topic_sub_map != NULL) {
            for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
                subscripter_set = map_iter->second;
                if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
                    subscripter_set->erase(set_iter);
@@ -50,7 +51,6 @@
BusServerSocket::BusServerSocket() {
    logger->debug("BusServerSocket Init");
    shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    topic_sub_map = NULL;
@@ -80,10 +80,13 @@
 * @return 0 成功, 其他值 失败的错误码
 */
int  BusServerSocket::start(){
  int rv;
    topic_sub_map =    shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 
    _run_proxy_();
    return 0;
    rv = _run_proxy_();
    return rv;
}
@@ -114,7 +117,7 @@
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                subscripter_set->clear();
@@ -126,8 +129,8 @@
        shm_mm_free_by_key(SHM_BUS_MAP_KEY);
    }
    shm_socket_close(shm_socket);
    logger->debug("BusServerSocket destory 3");
    return 0;
  return 0;
}
/*
@@ -135,10 +138,10 @@
*/
void BusServerSocket::_proxy_sub( char *topic, int key) {
    SHMKeySet *subscripter_set;
  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
//printf("_proxy_sub topic = %s\n", topic);
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
@@ -147,6 +150,7 @@
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(key);
}
/*
@@ -173,7 +177,7 @@
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
            subscripter_set = map_iter->second;
            subscripter_set->erase(key);
    }
@@ -182,7 +186,7 @@
/*
 * 处理发布,代理转发
*/
void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
@@ -198,7 +202,7 @@
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
            send_key = *set_iter;
            rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
            if(rv == 0) {
@@ -209,7 +213,7 @@
        }
        // 删除已关闭的端
        for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
        for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
            if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
                subscripter_set->erase(set_iter);
                logger->debug("remove closed subscripter %d \n", send_key);
@@ -218,13 +222,458 @@
        subscripter_to_del.clear();
    }
}
ProcInfo_query *Qurey_object(const char *object, int *length) {
  int flag = 0;
  int val;
  int len;
  int total = 0;
  ProcInfo *Proc_ptr = NULL;
  ProcInfo Data_stru;
  ProcInfo_query *dataBuf = NULL;
  SvrProc *SvrSub_ele;
  SvrTcs::iterator svr_tcs_iter;
  SvrProc::iterator svr_proc_iter;
  ProcZone::iterator proc_iter;
  SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
    SvrSub_ele = svr_tcs_iter->second;
    for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
      val = *svr_proc_iter;
      if ((proc_iter = proc->find(val)) != proc->end()) {
        if (dataBuf == NULL) {
          dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
          if (dataBuf == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          total = sizeof(ProcInfo_query);
        }
        if (flag == 0) {
          memset(dataBuf, 0x00, sizeof(ProcInfo_query));
          dataBuf->num = 1;
          strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
          flag = 1;
        } else {
          dataBuf->num++;
          len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
          dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
          if (dataBuf == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          total += sizeof(ProcInfo);
          memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
        }
        memset(&Data_stru, 0x00, sizeof(ProcInfo));
        Data_stru = proc_iter->second;
        Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
        strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
        strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
        strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
        strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
        if (length != NULL)
          *length = total;
      }
    }
  }
  return dataBuf;
}
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  int count = 0;
  int i = 0;
  int len = 0;
  char *data_ptr;
  ProcInfo Data_stru;
  ProcZone::iterator proc_iter;
  TcsZone *TcsSub_ele;
  ProcDataZone::iterator proc_que_iter;
  ProcTcsMap::iterator proc_tcs_iter;
  SvrProc *SvrSub_ele;
  SvrProc::iterator svr_proc_iter;
  SvrTcs::iterator svr_tcs_iter;
  TcsZone::iterator tcssub_iter;
  ProcPartZone::iterator proc_part_iter;
  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
    memset(&Data_stru, 0x00, sizeof(ProcInfo));
    if (buf != NULL) {
      memcpy(Data_stru.proc_id, buf, strlen(buf) + 1);
      count = strlen(buf) + 1;
      memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
    }
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
    ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
    if (flag == PROC_REG) {
      if ((proc_iter = proc->find(key)) == proc->end()) {
        proc->insert({key, Data_stru});
      }
      if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
        procPart->insert({key, Data_stru.proc_id});
      }
      if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
        procQuePart->insert({Data_stru.proc_id, key});
      }
    } else {
      SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
      for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
        SvrSub_ele = svr_tcs_iter->second;
        SvrSub_ele->erase(key);
      }
      if ((proc_iter = proc->find(key)) != proc->end()) {
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
      }
      if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
        procPart->erase(key);
      }
      if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
        procQuePart->erase(buf_temp);
      }
    }
  } else if (flag == PROC_REG_TCS) {
    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
      TcsSub_ele = proc_tcs_iter->second;
    } else {
      void *ptr_set = mm_malloc(sizeof(TcsZone));
      TcsSub_ele = new(ptr_set) TcsZone;
      proc->insert({key, TcsSub_ele});
    }
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      TcsSub_ele->insert(data_ptr);
      if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
        SvrSub_ele = svr_tcs_iter->second;
      } else {
        void *ptr_set = mm_malloc(sizeof(SvrProc));
        SvrSub_ele = new(ptr_set) SvrProc;
        SvrData->insert({data_ptr, SvrSub_ele});
      }
      SvrSub_ele->insert(key);
      data_ptr = strtok(NULL, STR_MAGIC);
    }
  } else if (flag == PROC_QUE_TCS) {
    struct _temp_store {
      void *ptr;
      int total;
    } *temp_store = NULL;
    int num = 0;
    int sum = 0;
    ProcInfo_query *ret = NULL;
    ProcInfo_query *ret_store = NULL;
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      ret = Qurey_object(data_ptr, &len);
      if (ret != NULL) {
        if (temp_store == NULL) {
          temp_store = (_temp_store *)malloc(sizeof(_temp_store));
          if (temp_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          temp_store->ptr = ret;
          temp_store->total = len;
          num = 1;
        } else {
          num++;
          temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
          if (temp_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          (temp_store + num - 1)->ptr = ret;
          (temp_store + num - 1)->total = len;
        }
      }
      data_ptr = strtok(NULL, STR_MAGIC);
    }
    if (num > 0) {
      for (count = 0; count < num; count++) {
        if (ret_store == NULL) {
          ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
          if (ret_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          sum = (temp_store + count)->total;
          memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
        } else {
          ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
          if (ret_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
          sum += (temp_store + count)->total;
        }
        free((temp_store + count)->ptr);
      }
      free(temp_store);
    }
    void *last_buf = malloc(sum + sizeof(int));
    if (last_buf == NULL) {
      logger->error("in proxy_reg: Out of memory!\n");
      exit(1);
    }
    *(int *)last_buf = num;
    if (num > 0) {
      memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
      free(ret_store);
    }
    shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  } else if (flag == PROC_QUE_STCS) {
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
      SvrSub_ele = svr_tcs_iter->second;
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
        count = *svr_proc_iter;
        break;
      }
    } else {
      count = 0;
    }
    memset(buf_temp, 0x00, sizeof(buf_temp));
    sprintf(buf_temp, "%d", count);
    shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
  } else {
    int val;
    int temp = 0;
    int pos = 0;
    int size = 0;
    ProcInfo_sum *Data_sum = NULL;
    SHMKeySet *subs_proc;
    SHMKeySet::iterator subs_proc_iter;
    SHMTopicSubMap::iterator subs_iter;
    ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
      memset(&Data_stru, 0x00, sizeof(Data_stru));
      if (count == 0) {
        Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
        if (Data_sum == NULL) {
          logger->error("in proxy_reg: Out of memory!\n");
          exit(1);
        }
        count++;
        memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
      } else {
        count++;
        len = sizeof(ProcInfo_sum) * count;
        Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
        if (Data_sum == NULL) {
          logger->error("in proxy_reg: Out of memory!\n");
          exit(1);
        }
        memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
      }
      Data_stru = proc_iter->second;
      memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
      memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
      memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
      memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
      (Data_sum + count - 1)->stat = 1;
      (Data_sum + count - 1)->list_num = 3;
      val = proc_iter->first;
      if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
        TcsSub_ele = proc_tcs_iter->second;
        temp = 0;
        pos = 0;
        len = sizeof(Data_sum->reg_info) - 1;
        for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
          if (temp == 0) {
            strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
            pos += strlen((Data_sum + count - 1)->reg_info);
            len -= strlen((Data_sum + count - 1)->reg_info);
            temp++;
          } else {
            if (len > 0) {
              strcat((Data_sum + count - 1)->reg_info, ",");
              pos += 1;
              len -= 1;
            }
            if (len > 0) {
              size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
              strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
              pos += size;
              len -= size;
            }
          }
        }
        pos = 0;
        temp = 0;
        len = sizeof(Data_sum->local_info) - 1;
        for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
          subs_proc = subs_iter->second;
          if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
            if ((temp == 0)) {
              strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
              pos += strlen((Data_sum + count - 1)->local_info);
              len -= strlen((Data_sum + count - 1)->local_info);
              temp++;
            } else {
              if (len > 0) {
                strcat((Data_sum + count - 1)->local_info, ",");
                pos += 1;
                len -= 1;
              }
              if (len > 0) {
                size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
                strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
                pos += size;
                len -= size;
              }
            }
          }
        }
      }
    }
    temp = count * sizeof(ProcInfo_sum);
    void *last_buf = malloc(temp + sizeof(int));
    if (last_buf == NULL) {
      logger->error("in proxy_reg: Out of memory!\n");
      exit(1);
    }
    *(int *)last_buf = count;
    if (count > 0) {
      memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
      free(Data_sum);
    }
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
  }
}
// 运行代理
void * BusServerSocket::_run_proxy_() {
int BusServerSocket::_run_proxy_() {
    int size;
    int key;
  int flag;
    char * action, *topic, *topics, *buf, *content;
    size_t head_len;
    char resp_buf[128];
@@ -233,25 +682,21 @@
  int rv;
  char send_buf[512] = { 0x00 };
    const char *topic_delim = ",";
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
  const char *topic_delim = ",";
    while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
        head = ShmModSocket::decode_bus_head(buf);
        topics = buf + BUS_HEAD_SIZE;
        action = head.action;
        if(strcmp(action, "sub") == 0) {
            // 订阅支持多主题订阅
            topic = strtok(topics, topic_delim);
          while(topic) {
       _proxy_sub(trim(topic, 0), key);
        topic =  strtok(NULL, topic_delim);
          }
        } 
        else if(strcmp(action, "desub") == 0) {
            if(strcmp(trim(topics, 0), "") == 0) {
                // 取消所有订阅
                _proxy_desub_all(key);
@@ -259,7 +704,6 @@
             
                topic = strtok(topics, topic_delim);
              while(topic) {
           _proxy_desub(trim(topic, 0), key);
            topic =  strtok(NULL, topic_delim);
              }
@@ -267,10 +711,45 @@
            
        } 
        else if(strcmp(action, "pub") == 0) {
             content = topics + head.topic_size;
            _proxy_pub(topics, content, head.content_size, key);
      topics[head.topic_size - 1] = '\0';
          content = topics + head.topic_size;
        }
            _proxy_pub(topics, topics, head.topic_size + head.content_size, key);
        }
    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
      content = topics + head.topic_size;
      if (strcmp(action, "reg") == 0) {
        flag = PROC_REG;
      } else if (strcmp(action, "unreg") == 0) {
        flag = PROC_UNREG;
      } else if (strcmp(action, "tcsreg") == 0) {
        flag = PROC_REG_TCS;
      } else if (strcmp(action, "tcsque") == 0) {
        flag = PROC_QUE_TCS;
      } else if (strcmp(action, "stcsque") == 0) {
        flag = PROC_QUE_STCS;
      } else {
        flag = PROC_QUE_ATCS;
      }
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
        else if (strncmp(buf, "request", strlen("request")) == 0) {
      sprintf(send_buf, "%4d", key);
      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
@@ -281,7 +760,6 @@
      }
    }
    else if(strcmp(action, "stop") == 0) {
            logger->info( "Stopping Bus...");
            free(buf);
            break;
        } else {
@@ -291,5 +769,5 @@
    }
    return NULL;
    return rv;
}
src/socket/bus_server_socket.h
@@ -18,7 +18,6 @@
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
class BusServerSocket {
private:
    shm_socket_t *shm_socket;
@@ -29,14 +28,16 @@
private:
    int  destroy();
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, void *buf, size_t size, int key);
    void *_run_proxy_();
    void _proxy_pub( char *topic, char *buf, size_t size, int key);
    int _run_proxy_();
    // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
      
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
    static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
  void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag);
  static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    // static bool include_in_keys(int key, int keys[], size_t length);
public:
src/socket/bus_server_socket_wrapper.cpp
@@ -7,7 +7,6 @@
 * 创建
 */
void * bus_server_socket_wrapper_open() {
    logger->debug("===bus_server_socket_wrapper_open\n");
    BusServerSocket *sockt = new BusServerSocket;
    return (void *)sockt;
}
@@ -19,7 +18,6 @@
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    delete sockt;
    logger->debug("===bus_server_socket_wrapper_close\n");
}
int bus_server_socket_wrapper_stop(void *_socket) {
@@ -35,7 +33,7 @@
    int ret;
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    if( (ret = sockt->force_bind(SHM_BUS_KEY)) == 0) {
    if( (ret = sockt->bind(SHM_BUS_KEY)) == 0) {
        return sockt->start();
    } else {
        logger->error("start bus failed");
src/socket/shm_mod_socket.cpp
@@ -38,6 +38,129 @@
    return shm_socket_force_bind(shm_socket, key);
}
int ShmModSocket::bind_proc_id(char *buf, int len) {
  return shm_socket_bind_proc_id(shm_socket, buf, len);
}
int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
{
  int ret;
  struct timespec ts;
  bus_head_t head = {};
  if (flag == PROC_REG) {
    memcpy(head.action, "reg", sizeof(head.action));
  } else if (flag == PROC_UNREG) {
    memcpy(head.action, "unreg", sizeof(head.action));
  } else if (flag == PROC_REG_TCS) {
    memcpy(head.action, "tcsreg", sizeof(head.action));
  } else if (flag == PROC_QUE_TCS) {
    memcpy(head.action, "tcsque", sizeof(head.action));
  } else if (flag == PROC_QUE_STCS) {
    memcpy(head.action, "stcsque", sizeof(head.action));
  } else  if (flag == PROC_QUE_ATCS) {
    memcpy(head.action, "atcsque", sizeof(head.action));
  } else {
    return -1;
  }
  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
    head.topic_size = 0;
    if (pData != NULL) {
      head.content_size = sizeof(ProcInfo);
    } else {
      head.content_size = 0;
    }
  } else {
    head.topic_size = len;
    head.content_size = 0;
  }
  void *buf_temp;
  int buf_size;
  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
    buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
  } else {
    buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
  }
  if (timeout_ms > 0) {
    ts.tv_sec = timeout_ms /1000;
    ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
    } else {
      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
    }
  } else if (timeout_ms == 0) {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
    } else {
      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
    }
  } else {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
    } else {
      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
    }
  }
  free(buf_temp);
  return ret;
}
/**
 * 发送信息
 * @key 发送给谁
@@ -60,7 +183,8 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
    int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
    if(rv == 0) {
    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
@@ -77,7 +201,7 @@
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
    void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
    int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
@@ -183,8 +307,8 @@
    memcpy(head.action, "pub", sizeof(head.action));
    head.topic_size = topic_size = strlen(topic) + 1;
    head.content_size = content_size;
    void *buf;
  void *buf;
    int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
    if(size > 0) {
        ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
@@ -216,6 +340,7 @@
  char *buf;
  int  max_buf_size;
  void *buf_ptr;
  int count = 0;
  if((buf = (char *) malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
@@ -223,7 +348,7 @@
    max_buf_size = MAXBUF;
  }
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  buf_size = BUS_HEAD_SIZE + content_size + topic_size;
  if(max_buf_size < buf_size) {
    
    if((buf = (char *) realloc(buf, buf_size)) == NULL) {
@@ -238,8 +363,19 @@
  memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
  if(topic_size != 0 ) 
    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
  if(content_size != 0)
      memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
                              (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
       memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  } else {
    if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
                                strlen("unreg")) == 0)) && (content_buf != NULL)) {
      proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
      request_head.content_size = count;
      buf_size -= (content_size - count);
    }
  }
 
  *retbuf = buf;
  free(buf_ptr);
src/socket/shm_mod_socket.h
@@ -5,6 +5,7 @@
#include "shm_allocator.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "proc_def.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
@@ -60,6 +61,9 @@
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int key);
  int bind_proc_id(char *buf, int len);
  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
    /**
     * 发送信息
     * @key 发送给谁
@@ -75,9 +79,7 @@
     * @key 从谁哪里收到的信息
     * @return 0 成功, 其他值 失败的错误码
    */
    int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
    /**
     * 发送请求信息并等待接收应答
     * @key 发送给谁
@@ -128,7 +130,14 @@
     */
    int get_key() ;
  int get_procid(char *buf, int len);
};
typedef std::map<int, ProcInfo, std::less<int>, SHM_STL_Allocator<std::pair<int, ProcInfo> > > ProcZone;
typedef std::set<SHMString,  std::less<SHMString>, SHM_STL_Allocator<SHMString> > TcsZone;
typedef std::map<int, TcsZone *, std::less<int>, SHM_STL_Allocator<std::pair<const int, TcsZone *> > > ProcTcsMap;
#endif
src/socket/shm_socket.cpp
@@ -1,4 +1,5 @@
#include "shm_socket.h"
#include "socket_def.h"
#include "hashtable.h"
#include "logger_factory.h"
#include <map>
@@ -108,8 +109,6 @@
  
  int rv, i;
  hashtable_t *hashtable = mm_get_hashtable();
  logger->debug("shm_socket_close\n");
  // if(sockt->key != 0) {
  //   auto it =  shmQueueStMap->find(sockt->key);
@@ -118,8 +117,6 @@
  //     it->second.closeTime = time(NULL);
  //   }
  // }
  if(sockt->queue != NULL) {
    sockt->queue->close();
@@ -169,8 +166,20 @@
  return 0;
}
int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
  strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
  return 0;
}
int shm_socket_get_key(shm_socket_t *sockt){
  return sockt->key;
}
int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
  strncpy(buf, sockt->proc_id, len);
  return 0;
}
// 短连接方式发送
@@ -297,8 +306,6 @@
{
  int rv;
  logger->debug("%lu destroy threadlocal socket\n", pthread_self());
  if(tmp_socket == NULL)
    return;
  
@@ -411,8 +418,6 @@
  
 
  int rv = 0, tryn = 16;
  static int Counter_suc = 0;
  static int Counter_fail = 0;
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<int, shm_packet_t>::iterator recvbufIter;
@@ -430,12 +435,13 @@
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
    if ( rv != 0) {
      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
      exit(1);
    }
  }
  rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
  if ( rv != 0) {
    logger->error(rv, "shm_sendandrecv : pthread_setspecific");
    exit(1);
  }
 
  sendpak.key = tmp_socket->key;
@@ -473,6 +479,7 @@
    } else {
      // 答非所问,放到缓存里
      tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
      exit(0);
      continue;
    }
  }
@@ -481,7 +488,6 @@
  return EBUS_RECVFROM_WRONG_END;
 
LABLE_SUC:
  sockt->key = tmp_socket->key;
  if(recv_buf != NULL) {
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
@@ -520,8 +526,8 @@
     
      // 超时导致接发送对象,与返回对象不对应的情况
      if(send_key != recv_key) {
        // logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
        // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        
        continue;
      }
@@ -640,7 +646,6 @@
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
 
    if (sockt->key == 0) {
      sockt->key = hashtable_alloc_key(hashtable);
    }  
@@ -664,7 +669,6 @@
  
LABEL_POP:
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if(rv != 0) {
    if(rv == ETIMEDOUT) {
@@ -680,4 +684,23 @@
  *_recvpak = recvpak;
  return 0;
}
void proc_copy(char *dst, void *src, int *counter) {
  int count = 0;
  ProcInfo *ptr = static_cast<ProcInfo *>(src);
  memcpy(dst, ptr->proc_id, strlen(ptr->proc_id) + 1);
  count = strlen(ptr->proc_id) + 1;
  memcpy(dst + count, ptr->name, strlen(ptr->name) + 1);
  count += strlen(ptr->name) + 1;
  memcpy(dst + count, ptr->public_info, strlen(ptr->public_info) + 1);
  count += strlen(ptr->public_info) + 1;
  memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1);
  count += strlen(ptr->private_info) + 1;
  *counter = count;
}
src/socket/shm_socket.h
@@ -4,6 +4,7 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#include "proc_def.h"
#include "lock_free_queue.h"
#include <functional>
@@ -18,7 +19,8 @@
#define BUS_ACTION_STOP 1 
typedef struct shm_packet_t {
    int key;
  int key;
    size_t size;
    void * buf;
    char uuid[64];
@@ -31,8 +33,8 @@
typedef struct shm_socket_t {
    shm_socket_type_t socket_type;
    // 本地key
    int key;
  char proc_id[MAX_STR_LEN];
    bool force_bind;
    pthread_mutex_t mutex;
@@ -59,7 +61,8 @@
int shm_socket_bind(shm_socket_t * socket, int key) ;
int shm_socket_force_bind(shm_socket_t * socket, int key) ;
int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len);
/**
 * @flags : BUS_NOWAIT_FLAG
 */
@@ -70,6 +73,9 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,  
    const struct timespec * timeout = NULL,  int flags = 0);
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SvrProc;
typedef std::map<SHMString, SvrProc *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SvrProc *> > > SvrTcs;
typedef std::map<int, SHMString, std::less<int>, SHM_STL_Allocator<std::pair<int, const SHMString> > > ProcPartZone;
/**
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *                  @recvbuf 收到的数据
@@ -83,7 +89,6 @@
                    const struct timespec *timeout = NULL, int flag = 0,  void * user_data = NULL);
void proc_copy(char *dst, void *src, int *count);
#endif
src/svsem.cpp
@@ -11,7 +11,6 @@
    union semun arg;
    struct sembuf sop;
    arg.val = 0; /* So initialize it to 0 */
    if (semctl(semid, 0, SETVAL, arg) == -1)
      err_exit(errno, "semctl 1");
test_socket/CMakeLists.txt
@@ -6,7 +6,19 @@
                             ${EXTRA_INCLUDES}
                            )
add_executable(bus_test_inter bus_test_inter.cpp)
target_link_libraries(bus_test_inter PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(bus_test_inter PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
add_executable(bus_test_server_mode bus_test_server_mode.cpp)
target_link_libraries(bus_test_server_mode PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(bus_test_server_mode PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
add_custom_command(
  OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
test_socket/bus_test_inter.cpp
New file
@@ -0,0 +1,502 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include "mm.h"
#include "logger_factory.h"
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "bus_error.h"
#include "bh_api.h"
#include "proc_def.h"
#define TOTAL_REG_UNREG         2
#define MAGIC_STR       "INTER_"
#define STR_LEN         30
pthread_t tids;
void *res;
static ProcInfo proc_desc;
char *genStr(int length, char *buf)
{
  int flag, i;
  char *str;
  if (length < (strlen(buf) + 10)) {
    length = strlen(buf) + 10;
  }
  srand((unsigned) time(NULL));
  if ((str = (char *)malloc(length + 1)) == NULL) {
    printf("out of memory!\n");
    exit(0);
  }
  memset(str, 0x00, length + 1);
  memcpy(str, MAGIC_STR, strlen(MAGIC_STR));
  strcat(str, buf);
  for (i = strlen(str); i < length; i++) {
    flag = rand() % 3;
    switch (flag) {
      case 0:
        str[i] = 'A' + rand() % 26;
        break;
      case 1:
        str[i] = 'a' + rand() % 26;
        break;
      default:
        str[i] = '0' + rand() % 10;
        break;
    }
  }
  str[length] = '\0';
  return str;
}
void *client_recv(void *skptr) {
  pthread_detach(pthread_self());
  void *recvbuf = NULL;
  int recv_len;
  void *proc_id = NULL;
  int id_len;
  int rv;
  void *errBuf = NULL;
  int len;
  char proc_data[200] = { 0x00 };
  char topics[200] = { 0x00 };
  struct timespec timeout = {2, 0};
  while (true) {
    rv = BHReadSub(&proc_id, &id_len, &recvbuf, &recv_len, -1);
    if(rv == true) {
      memset(topics, 0x00, sizeof(topics));
      memset(proc_data, 0x00, sizeof(proc_data));
      memcpy(proc_data, proc_id, (sizeof(proc_data) - 1) > id_len ? id_len : (sizeof(proc_data) - 1));
      memcpy(topics, recvbuf, (sizeof(topics) - 1) > recv_len ? recv_len : (sizeof(topics) - 1));
      printf("Get the sub topics data(%s) from proc id(%s)\n", (char *)topics, (char *)proc_id);
      BHFree(recvbuf, len);
      BHFree(proc_id, id_len);
    } else {
      BHGetLastError(&errBuf, &len);
      printf("the thread recv fail with error: %s\n", (char *)errBuf);
      BHFree(errBuf, len);
    }
  }
}
void parseQueryTopicsBuf(void *buf, int len)
{
  if (buf == NULL)
    return;
  int total_topics = *(int *)buf;
  int i, j;
  int buf_pos = 0;
  void *ptr_temp = NULL;
  ProcInfo_query *ptr = NULL;
  ProcInfo *Proc_ptr = NULL;
  buf_pos = sizeof(ProcInfo_query);
  ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
  ptr_temp = (void *)ptr;
  for (i = 0; i < total_topics; i++) {
    printf("topic %s:\n", ptr->name);
    for (j = 0; j < ptr->num; j++) {
      printf("the %dst process info:\n", j + 1);
      Proc_ptr = &(ptr->procData) +  j;
      printf("proc_id: %s\n", Proc_ptr->proc_id);
      printf("name: %s\n", Proc_ptr->name);
      printf("public_info: %s\n", Proc_ptr->public_info);
      printf("private_info: %s\n", Proc_ptr->private_info);
    }
    if (ptr->num > 1) {
      buf_pos += sizeof(ProcInfo) * (ptr->num - 1);
    }
    ptr = (ProcInfo_query *)((char *)ptr_temp + buf_pos);
  }
}
void parseQueryProcBuf(void *buf, int len)
{
  if (buf == NULL)
    return;
  int total = *(int *)buf;
  int i;
  ProcInfo_sum *Proc_ptr = NULL;
  Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
  for (i = 0; i < total; i++) {
    printf("proc_id: %s\n", (Proc_ptr + i)->procData.proc_id);
    printf("name: %s\n", (Proc_ptr + i)->procData.name);
    printf("public_info: %s\n", (Proc_ptr + i)->procData.public_info);
    printf("private_info: %s\n", (Proc_ptr + i)->procData.private_info);
    printf("service: %s\n", (Proc_ptr + i)->reg_info);
    printf("sub: %s\n", (Proc_ptr + i)->local_info);
  }
}
int main(int argc, char *argv[]) {
  int ret;
  char *ptr = NULL;
  char buf[] = "Process";
  void *buf_temp = NULL;
  void *errBuf = NULL;
  void *proc_id = NULL;
  int size;
  int id_len;
  int i;
  char data_buf[200] = { 0x00 };
  const int timeout_ms = 3000;
  memset(&proc_desc, 0x00, sizeof(ProcInfo));
  ptr = genStr(STR_LEN, buf);
  strncpy(proc_desc.proc_id, ptr, strlen(ptr) + 1);
  //strncpy(proc_desc.proc_id, "Hello", strlen("Hello") + 1);
  free(ptr);
  sleep(2); //make rand change
  ptr = genStr(STR_LEN, buf);
  strncpy(proc_desc.name, ptr, strlen(ptr) + 1);
  //strncpy(proc_desc.name, "World", strlen("World") + 1);
  free(ptr);
  sleep(2);
  ptr = genStr(STR_LEN, buf);
  //strncpy(proc_desc.public_info, ptr, strlen(ptr) + 1);
  strncpy(proc_desc.public_info, "Good", strlen("Good") + 1);
  free(ptr);
  sleep(2);
  ptr = genStr(STR_LEN, buf);
  //strncpy(proc_desc.private_info, ptr, strlen(ptr) + 1);
  //strncpy(proc_desc.private_info, "Bye", strlen("Bye") + 1);
  free(ptr);
  printf("before the registered, process info:\n");
  printf("proc_id: %s\n", proc_desc.proc_id);
  printf("name: %s\n", proc_desc.name);
  printf("public_info: %s\n", proc_desc.public_info);
  printf("private_info: %s\n", proc_desc.private_info);
  for (i = 0; i < TOTAL_REG_UNREG; i++) {
    ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
    if (ret == true) {
      printf("the process registered OKay\n");
    } else {
      BHGetLastError(&errBuf, &size);
      printf("the process registered fail with error: %s\n", (char *)errBuf);
      BHFree(errBuf, size);
      printf("the second way to get the error log: %s\n", buf_temp);
    };
    BHFree(buf_temp, size);
    ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms);
    if (ret == true) {
      printf("the process unregistered OKay\n");
    } else {
      BHGetLastError(&errBuf, &size);
      printf("the process unregistered fail with error: %s\n", (char *)errBuf);
      BHFree(errBuf, size);
      printf("the second way to get the error log: %s\n", buf_temp);
    };
    BHFree(buf_temp, size);
  }
  //const char *topics_reg_buf1[] = {"topics demo1"};
  const char *topics_reg_buf1 = "topics demo1";
  const char *topics_query_buf1 = "topics demo1";
  const char *topics_query_buf2 = "Hello World,So,Great,Good"; //No space between each other
  //const char *topics_reg_buf2[] = {"Hello World", "So", "Great", "Good"};
  const char *topics_reg_buf2 = "Hello World,So,Great,Good";
  //const char *topics_sub_buf1[] = {"news"};
  //const char *topics_sub_buf2[] = {"sports", "balls", "topics demo1"};
  const char *topics_sub_buf1 = "news";
  const char *topics_sub_buf2 = "sports,balls,topics demo1";
  const char *topics_pub_topic1 = "news";
  const char *topics_pub_topic1_data = "boob news";
  const char *topics_pub_topic2 = "balls";
  const char *topics_pub_topic2_data = "Great volleyballs and basketballs";
  ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered2 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHQueryTopicAddress(NULL, 0, topics_query_buf1, strlen(topics_query_buf1), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process query topics OKay\n");
    parseQueryTopicsBuf(buf_temp, size);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process query3 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHQueryTopicAddress(NULL, 0, topics_query_buf2, strlen(topics_query_buf2), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process query topics OKay\n");
    parseQueryTopicsBuf(buf_temp, size);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process query4 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  pthread_create(&tids, NULL, client_recv, NULL);
  ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHSubscribeTopics(topics_sub_buf1, strlen(topics_sub_buf1), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process subscribe topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process sub1 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHSubscribeTopics(topics_sub_buf2, strlen(topics_sub_buf2), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("tthe process subscribe topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process sub2 fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered10 fail with errorL %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1";
  const char *topics_server_specific_reg_buf2 = "Server Specific Hello World";
  void *msgID = NULL;
  int msg_id_len;
  ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &msgID, &msg_id_len);
  if (ret == true) {
    printf("the process BHAsyncRequest topics OKay\n");
    BHFree(msgID, msg_id_len);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process BHAsyncRequest1 topics fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), NULL, 0);
  if (ret == true) {
    printf("the process BHAsyncRequest topics OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process BHAsyncRequest2 topics fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  ret = BHRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &proc_id, &id_len,
              &buf_temp, &size, timeout_ms);
  if (ret == true) {
    memset(data_buf, 0x00, sizeof(data_buf));
    strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1));
    printf("the process BHRequest topics OKay\n");
    printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process BHRequest topics fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
  ret = BHRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &proc_id, &id_len,
              &buf_temp, &size, timeout_ms);
  if (ret == true) {
    memset(data_buf, 0x00, sizeof(data_buf));
    strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1));
    printf("the process BHRequest topics OKay\n");
    printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process BHRequest2 topics fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
#if !defined(PRO_DE_SERIALIZE)
  ret = BHPublish(topics_pub_topic1, topics_pub_topic1_data, timeout_ms);
  if (ret == true) {
    printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic1, topics_pub_topic1_data);
  } else {
    printf("the process published1 fail\n");
  };
  ret = BHPublish(topics_pub_topic2, topics_pub_topic2_data, timeout_ms);
  if (ret == true) {
    printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic2, topics_pub_topic2_data);
  } else {
    printf("the process published2 fail\n");
  };
#endif
  memset(data_buf, 0x00, sizeof(data_buf));
  strcpy(data_buf, "query the process");
  ret = BHQueryProcs(NULL, 0, data_buf, strlen(data_buf), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process query all the process data OKay\n");
    parseQueryProcBuf(buf_temp, size);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process query proc fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
#if 1
  while(1) {
    sleep(1);
  }
#else
  /*if the process will exit finally, we must call BHUnregister to release the resources*/
  ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process unregistered OKay\n");
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process unregistered fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  BHFree(buf_temp, size);
#endif
  return 0;
}
test_socket/bus_test_server_mode.cpp
New file
@@ -0,0 +1,122 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include "mm.h"
#include "logger_factory.h"
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "bus_error.h"
#include "bh_api.h"
#include "proc_def.h"
static ProcInfo proc_desc;
int main(int argc, char *argv[]) {
  int ret;
  void *buf_temp = NULL;
  void *errBuf = NULL;
  void *proc_id = NULL;
  void *src = NULL;
  void *data_buf = NULL;
  char topics_buf[200] = { 0x00 };
  int size;
  int id_len;
  int count = 0;
  const int timeout_ms = 3000;
  memset(&proc_desc, 0x00, sizeof(ProcInfo));
  strncpy(proc_desc.proc_id, "Hello", strlen("Hello"));
  strncpy(proc_desc.name, "World", strlen("World"));
  strncpy(proc_desc.public_info, "Good", strlen("Good") + 1);
  const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1";
  const char *topics_server_specific_reg_buf2 = "Server Specific Hello World";
  ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered OKay\n");
    BHFree(buf_temp, size);
  } else {
      BHGetLastError(&errBuf, &size);
    printf("the process registered fail with error: %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  ret = BHRegisterTopics(topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered topics OKay\n");
    BHFree(buf_temp, size);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered1 fail with errorL %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  ret = BHRegisterTopics(topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &buf_temp, &size, timeout_ms);
  if (ret == true) {
    printf("the process registered topics OKay\n");
    BHFree(buf_temp, size);
  } else {
    BHGetLastError(&errBuf, &size);
    printf("the process registered1 fail with errorL %s\n", (char *)errBuf);
    BHFree(errBuf, size);
  };
  while(true) {
    ret = BHReadRequest(&proc_id, &id_len, &buf_temp, &size, &src, -1);
    if (ret == true) {
      strncpy(topics_buf, (char *)buf_temp, (sizeof(topics_buf) - 1) > size ? size : (sizeof(topics_buf) - 1));
      printf("Get data(%s)", topics_buf);
      memset(topics_buf, 0x00, sizeof(topics_buf));
      strncpy(topics_buf, (char *)proc_id, (sizeof(topics_buf) - 1) > id_len ? id_len : (sizeof(topics_buf) - 1));
      printf("proc id(%s)", topics_buf);
      BHFree(buf_temp, size);
      BHFree(proc_id, id_len);
      memset(topics_buf, 0x00, sizeof(topics_buf));
      sprintf(topics_buf, "Get data count: %d", ++count);
      ret = BHSendReply(src, topics_buf, strlen(topics_buf));
      if (ret == true) {
        printf("the process send reply OKay\n");
      } else {
        BHGetLastError(&errBuf, &size);
        printf("the process send reply fail with errorL %s\n", (char *)errBuf);
        BHFree(errBuf, size);
      };
      BHFree(src, size);
    } else {
      BHGetLastError(&errBuf, &size);
      printf("BHReadRequest fail with error(%s)\n", (char *)errBuf);
      BHFree(errBuf, size);
    };
  }
  return 0;
}