From b861de29176891657cc96631ddbfb4ea9e114a42 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期一, 30 八月 2021 17:52:23 +0800
Subject: [PATCH] re-structure the communication work flow.
---
src/shm/hashtable.h | 2
CMakeLists.txt | 8
src/bh_api.h | 28
src/socket/bus_server_socket.h | 9
src/proc_def.h | 70 +
src/socket/shm_mod_socket.cpp | 150 ++
src/bus_proxy_start.cpp | 127 ++
src/queue/lock_free_queue.h | 24
src/socket/bus_server_socket_wrapper.cpp | 4
src/shm/mm.cpp | 4
src/socket/shm_socket.h | 15
src/svsem.cpp | 1
build.sh | 2
src/bus_error.cpp | 10
src/net/net_mod_server_socket.cpp | 3
src/net/net_mod_socket.cpp | 29
src/socket/shm_socket.cpp | 63
src/bus_def.h | 7
test_socket/bus_test_inter.cpp | 502 ++++++++
test_socket/CMakeLists.txt | 14
src/socket/shm_mod_socket.h | 13
src/queue/shm_queue.h | 4
src/socket/bus_server_socket.cpp | 534 ++++++++
test_socket/bus_test_server_mode.cpp | 122 ++
src/shm/shm_mm.h | 4
src/net/net_mod_socket_wrapper.cpp | 27
src/net/net_mod_socket_wrapper.h | 3
src/net/net_mod_socket.h | 8
src/CMakeLists.txt | 20
src/bus_error.h | 4
src/bh_api.cpp | 1619 +++++++++++++++++++++++++++
31 files changed, 3,300 insertions(+), 130 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9af847a..049e0b2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -15,6 +15,8 @@
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+add_compile_options(-fPIC)
+
option(BUILD_DOC "Build doc" OFF)
@@ -30,5 +32,9 @@
add_subdirectory(${PROJECT_SOURCE_DIR}/test)
add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
-# add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
+ include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto)
+ #add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
endif()
+
+add_definitions("-DPROTOBUF_USS_DLLS")
+
diff --git a/build.sh b/build.sh
index 455436e..ea2dcde 100755
--- a/build.sh
+++ b/build.sh
@@ -2,7 +2,7 @@
BUILD_TYPE="Debug"
BUILD_DOC="OFF"
-BUILD_SHARED_LIBS="OFF"
+BUILD_SHARED_LIBS="ON"
function usage() {
echo "build.sh [release | debug | doc]"
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 61fbc17..eb3f4c9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,6 +5,9 @@
# to the source code
configure_file(bus_config.h.in bus_config.h)
+#set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON)
+add_compile_options(-fPIC)
+#target_compile_options(shm_queue PRIVATE -fPIC)
list(APPEND _SOURCES_
./logger_factory.cpp
@@ -16,31 +19,33 @@
./bus_error.cpp
./futex_sem.cpp
./svsem.cpp
+./bh_api.cpp
./net/net_conn_pool.cpp
./net/net_mod_server_socket_wrapper.cpp
./net/net_mod_socket_wrapper.cpp
./net/net_mod_socket.cpp
./net/net_mod_socket_io.cpp
./net/net_mod_server_socket.cpp
+./proto/bhome_msg_api.pb.cc
+./proto/bhome_msg.pb.cc
+./proto/error_msg.pb.cc
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
./shm/shm_mm.cpp
-./bh_api.cc
-./proto/bhome_msg.pb.cc
-./proto/bhome_msg_api.pb.cc
-./proto/error_msg.pb.cc
)
if (BUILD_SHARED_LIBS)
add_library(shm_queue SHARED ${_SOURCES_})
+ target_compile_options(shm_queue PRIVATE -fPIC)
+ set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON)
else()
add_library(shm_queue STATIC ${_SOURCES_})
endif()
# STATIC SHARED
-# add_library(shm_queue ${_SOURCES_})
+#add_library(shm_queue ${_SOURCES_})
target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} )
@@ -48,11 +53,14 @@
${PROJECT_BINARY_DIR}/src
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/shm
+ ${CMAKE_CURRENT_SOURCE_DIR}/proto
${CMAKE_CURRENT_SOURCE_DIR}/queue
${CMAKE_CURRENT_SOURCE_DIR}/socket
${CMAKE_CURRENT_SOURCE_DIR}/net
)
+add_executable(bus_proxy_start bus_proxy_start.cpp)
+target_link_libraries(bus_proxy_start PRIVATE shm_queue ${EXTRA_LIBS} )
target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} )
@@ -74,6 +82,7 @@
./bus_def.h
./logger_factory.h
./sole.h
+./proc_def.h
./queue/linked_lock_free_queue.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
@@ -91,7 +100,6 @@
./shm/shm_mm_wrapper.h
./shm/shm_allocator.h
./shm/shm_mm.h
-./bh_api.h
DESTINATION include)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
new file mode 100644
index 0000000..9875cfa
--- /dev/null
+++ b/src/bh_api.cpp
@@ -0,0 +1,1619 @@
+#include "net_mod_socket_wrapper.h"
+#include "net_mod_server_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+#include "shm_mm_wrapper.h"
+#include "proc_def.h"
+#include "usg_common.h"
+#include "bh_api.h"
+#include <pthread.h>
+#include <getopt.h>
+#include "bhome_msg_api.pb.h"
+#include "bhome_msg.pb.h"
+#include "error_msg.pb.h"
+#include "proto/bhome_msg.pb.h"
+#include "proto/bhome_msg_api.pb.h"
+
+static Logger *logger = LoggerFactory::getLogger();
+
+static int gRun_stat = 0;
+static void *gNetmod_socket = NULL;
+
+static pthread_mutex_t mutex;
+
+static char errString[100] = { 0x00 };
+
+int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ int key;
+ int count = 0;
+ void *buf = NULL;
+ int min = 0;
+ ProcInfo pData;
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _ProcInfo_proto
+ {
+ const char *proc_id;
+ const char *name;
+ const char *public_info;
+ const char *private_info;
+ }_input;
+
+ ::bhome_msg::ProcInfo input;
+ if(!input.ParseFromArray(proc_info, proc_info_len)) {
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ _input.proc_id = input.proc_id().c_str();
+ _input.name = input.name().c_str();
+ _input.public_info = input.public_info().c_str();
+ _input.private_info = input.private_info().c_str();
+
+#else
+ if ((proc_info == NULL) || (proc_info_len == 0)) {
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x90, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+#endif
+
+ memset(&pData, 0x00, sizeof(ProcInfo));
+ if (gRun_stat == 0) {
+ pthread_mutex_init(&mutex, NULL);
+
+ } else {
+ logger->error("the process has already registered!\n");
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+
+ gRun_stat = 1;
+ shm_mm_wrapper_init(SHM_RES_SIZE);
+
+#if defined(PRO_DE_SERIALIZE)
+ if (_input.proc_id != NULL) {
+ count = strlen(_input.proc_id) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.proc_id, _input.proc_id, min);
+ }
+
+ if (_input.name != NULL) {
+ count = strlen(_input.name) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.name, _input.name, min);
+ }
+
+ if (_input.public_info != NULL) {
+ count = strlen(_input.public_info) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.public_info, _input.public_info, min);
+ }
+
+ if (_input.private_info != NULL) {
+ count = strlen(_input.private_info) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.private_info, _input.private_info, min);
+ }
+#else
+ if (strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) > 0) {
+ count = strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.proc_id, ((ProcInfo *)proc_info)->proc_id, min);
+ }
+
+ if (strlen((const char *)(((ProcInfo *)proc_info)->name)) > 0) {
+ count = strlen((const char *)(((ProcInfo *)proc_info)->name)) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.name, ((ProcInfo *)proc_info)->name, min);
+ }
+
+ if (strlen((const char *)(((ProcInfo *)proc_info)->public_info)) > 0) {
+ count = strlen((const char *)(((ProcInfo *)proc_info)->public_info)) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.public_info, ((ProcInfo *)proc_info)->public_info, min);
+ }
+
+ if (strlen((const char *)(((ProcInfo *)proc_info)->private_info)) > 0) {
+ count = strlen((const char *)(((ProcInfo *)proc_info)->private_info)) + 1;
+ min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
+ strncpy(pData.private_info, ((ProcInfo *)proc_info)->private_info, min);
+ }
+#endif
+
+ gNetmod_socket = net_mod_socket_open();
+ hashtable_t *hashtable = mm_get_hashtable();
+ key = hashtable_alloc_key(hashtable);
+ net_mod_socket_bind(gNetmod_socket, key);
+
+ rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgCommonReply mcr;
+ mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mcr.mutable_errmsg()->set_errstring(errString);
+ *reply_len = mcr.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mcr.SerializePartialToArray(*reply, *reply_len);
+#else
+ min = strlen(errString) + 1;
+ buf = malloc(min) ;
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
+
+ *reply = buf;
+ *reply_len = min;
+
+#endif
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ int min;
+ void *buf = NULL;
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _ProcInfo_proto
+ {
+ const char *proc_id;
+ const char *name;
+ const char *public_info;
+ const char *private_info;
+ }_input;
+
+ ::bhome_msg::ProcInfo input;
+
+ if(!input.ParseFromArray(proc_info, proc_info_len)) {
+
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ _input.proc_id = input.proc_id().c_str();
+ _input.name = input.name().c_str();
+ _input.public_info = input.public_info().c_str();
+ _input.private_info = input.private_info().c_str();
+#endif
+
+ if (gRun_stat == 0) {
+
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+ rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG);
+ if (rv == 0) {
+
+ net_mod_socket_close(gNetmod_socket);
+
+ gNetmod_socket = NULL;
+
+ gRun_stat = 0;
+
+ }
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgCommonReply mcr;
+ mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mcr.mutable_errmsg()->set_errstring(errString);
+ *reply_len = mcr.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mcr.SerializePartialToArray(*reply, *reply_len);
+#else
+ min = strlen(errString) + 1;
+ buf = malloc(min) ;
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
+
+ *reply = buf;
+ *reply_len = min;
+#endif
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ int min, i;
+ void *buf = NULL;
+ int total = 0;
+ int count = 0;
+ char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _MsgTopicList
+ {
+ int amount;
+ const char *topics[MAX_STR_LEN];
+ }_input;
+
+ ::bhome_msg::MsgTopicList input;
+ if(!input.ParseFromArray(topics, topics_len)) {
+
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ _input.amount = input.topic_list_size();
+ if (_input.amount > MAX_STR_LEN) {
+ _input.amount = MAX_STR_LEN;
+ }
+
+ for(int i = 0; i < _input.amount; i++) {
+ _input.topics[i] = input.topic_list(i).c_str();
+ }
+#else
+ if ((topics == NULL) || (topics_len == 0)) {
+
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+ total = sizeof(topics_buf) / sizeof(char);
+ for (i = 0; i < _input.amount; i++) {
+ min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
+ if (min > 0) {
+ strncpy(topics_buf + count, _input.topics[i], min);
+ count += min;
+
+ if (total >= strlen(_input.topics[i])) {
+ total -= strlen(_input.topics[i]);
+ }
+
+ if ((_input.amount > 1) && (i < (_input.amount - 1))) {
+ strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
+ total -= 1;
+ count++;
+ }
+ } else {
+ topics_buf[strlen(topics_buf) - 1] = '\0';
+ }
+ }
+
+ logger->debug("the parsed compound register topics: %s!\n", topics_buf);
+#else
+ memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
+#endif
+
+ rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgCommonReply mcr;
+ mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mcr.mutable_errmsg()->set_errstring(errString);
+ *reply_len = mcr.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mcr.SerializePartialToArray(*reply, *reply_len);
+#else
+ min = strlen(errString) + 1;
+ buf = malloc(min) ;
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
+
+ *reply = buf;
+ *reply_len = min;
+#endif
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ int min;
+ void *buf = NULL;
+ int size;
+ char topics_buf[MAX_STR_LEN] = { 0x00 };
+ ProcInfo_query *ptr = NULL;
+ ProcInfo *Proc_ptr = NULL;
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _BHAddress
+ {
+ unsigned long long mq_id;
+ long long abs_addr;
+ const char *ip;
+ int port;
+ }_input0;
+
+ const char *_input1;
+
+ ::bhome_msg::BHAddress input0;
+ ::bhome_msg::MsgQueryTopic input1;
+ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len)) {
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ _input0.mq_id = input0.mq_id();
+ _input0.abs_addr = input0.abs_addr();
+ _input0.ip = input0.ip().c_str();
+ _input0.port = input0.port();
+ _input1 = input1.topic().c_str();
+
+#else
+ if ((topic == NULL) || (topic_len == 0)) {
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+
+#if defined(PRO_DE_SERIALIZE)
+ min = (strlen(_input1) > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : strlen(_input1));
+ strncpy(topics_buf, _input1, min);
+#else
+ min = (topic_len > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : topic_len);
+ buf = const_cast<void *>(topic);
+ strncpy(topics_buf, (const char *)buf, min);
+#endif
+ rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS);
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+
+ struct _MsgQueryTopicReply
+ {
+ std::string proc_id;
+
+ unsigned long long mq_id;
+ long long abs_addr;
+ std::string ip;
+ int port;
+ }mtr_list[128];
+ int mtr_list_num = 0;
+
+ if (rv == 0) {
+
+ ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
+ mtr_list_num = ptr->num;
+
+ if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
+ mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
+ }
+
+ for(int i = 0; i < mtr_list_num; i++) {
+ mtr_list[i].proc_id = ptr->procData.proc_id;
+ mtr_list[i].mq_id = 0x00;
+ mtr_list[i].abs_addr = 0x00;
+ mtr_list[i].ip = "192.168.1.1";
+ mtr_list[i].port = 5000;
+ }
+ }
+
+ ::bhome_msg::MsgQueryTopicReply mtr;
+ mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mtr.mutable_errmsg()->set_errstring(errString);
+ for(int i = 0; i < mtr_list_num; i++)
+ {
+ ::bhome_msg::MsgQueryTopicReply_BHNodeAddress *mtrb = mtr.add_node_address();
+ mtrb->set_proc_id(mtr_list[i].proc_id);
+ mtrb->mutable_addr()->set_mq_id(mtr_list[i].mq_id);
+ mtrb->mutable_addr()->set_abs_addr(mtr_list[i].abs_addr);
+ mtrb->mutable_addr()->set_ip(mtr_list[i].ip);
+ mtrb->mutable_addr()->set_port(mtr_list[i].port);
+ }
+
+ *reply_len = mtr.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mtr.SerializePartialToArray(*reply, *reply_len);
+#else
+ if (rv == 0) {
+ *reply = buf;
+ *reply_len = size;
+
+ } else {
+ min = strlen(errString) + 1;
+ buf = malloc(min) ;
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
+
+ *reply = buf;
+ *reply_len = min;
+ }
+
+#endif
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ void *buf = NULL;
+ int size;
+ int min;
+ ProcInfo_sum *Proc_ptr = NULL;
+ char data_buf[MAX_STR_LEN] = { 0x00 };
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _BHAddress
+ {
+ unsigned long long mq_id;
+ long long abs_addr;
+ const char *ip;
+ int port;
+ }_input0;
+
+ const char *_input1;
+
+ ::bhome_msg::BHAddress input0;
+ ::bhome_msg::MsgQueryProc input1;
+ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ _input0.mq_id = input0.mq_id();
+ _input0.abs_addr = input0.abs_addr();
+ _input0.ip = input0.ip().c_str();
+ _input0.port = input0.port();
+ _input1 = input1.proc_id().c_str();
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+
+ if (query != NULL) {
+ strncpy(data_buf, (char *)query, (sizeof(data_buf) - 1) > query_len ? query_len : (sizeof(data_buf) - 1));
+ }
+
+ rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+ struct _MsgQueryProcReply
+ {
+ std::string proc_id;
+ std::string name;
+ std::string public_info;
+ std::string private_info;
+
+ bool online;
+
+ std::string topic_list[128];
+ int topic_list_num;
+
+ } mpr_list[128];
+ int mpr_list_num = 0;
+
+ if (rv == 0) {
+
+ mpr_list_num = *(int *)buf;
+
+ if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
+ mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
+ }
+
+ Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
+ for(int i = 0; i < mpr_list_num; i++) {
+ mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
+ mpr_list[i].name = (Proc_ptr + i)->procData.name;
+ mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
+ mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
+ mpr_list[i].online = (Proc_ptr + i)->stat;
+ mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
+
+ for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+ {
+ if (j == 0) {
+ mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
+ } else if (j == 1) {
+ mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
+ } else if (j == 2) {
+ mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
+ }
+ }
+ }
+
+ ::bhome_msg::MsgQueryProcReply mpr;
+ mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mpr.mutable_errmsg()->set_errstring(errString);
+
+ for(int i = 0; i < mpr_list_num; i++)
+ {
+ ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list();
+ mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id);
+ mpri->mutable_proc()->set_name(mpr_list[i].name);
+ mpri->mutable_proc()->set_public_info(mpr_list[i].public_info);
+ mpri->mutable_proc()->set_private_info(mpr_list[i].private_info);
+ mpri->set_online(mpr_list[i].online);
+ for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+ {
+ mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]);
+ }
+ }
+
+ *reply_len = mpr.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mpr.SerializePartialToArray(*reply,*reply_len);
+ }
+#else
+ if (rv == 0) {
+ *reply = buf;
+ *reply_len = size;
+ } else {
+ min = strlen(errString) + 1;
+ buf = malloc(min) ;
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
+
+ *reply = buf;
+ *reply_len = min;
+ }
+#endif
+
+ if (rv == 0)
+ return true;
+
+ return false;
+
+}
+
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ int sec, nsec;
+ int total = 0;
+ int count = 0;
+ int min, i;
+ void *buf = NULL;
+ char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _MsgTopicList
+ {
+ int amount;
+ const char *topics[MAX_STR_LEN];
+ }_input;
+
+ ::bhome_msg::MsgTopicList input;
+ if(!input.ParseFromArray(topics, topics_len)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ _input.amount = input.topic_list_size();
+
+ if (_input.amount > MAX_STR_LEN) {
+ _input.amount = MAX_STR_LEN;
+ }
+
+ for(int i = 0; i < _input.amount; i++)
+ _input.topics[i] = input.topic_list(i).c_str();
+
+#else
+ if ((topics == NULL) || (topics_len == 0)) {
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ goto exit_entry;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+ total = sizeof(topics_buf) / sizeof(char);
+ for (i = 0; i < _input.amount; i++) {
+ min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
+ if (min > 0) {
+ strncpy(topics_buf + count, _input.topics[i], min);
+ count += min;
+
+ if (total >= strlen(_input.topics[i])) {
+ total -= strlen(_input.topics[i]);
+ }
+
+ if ((_input.amount > 1) && (i < (_input.amount - 1))) {
+ strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
+ total -= 1;
+ count++;
+ }
+ } else {
+ topics_buf[strlen(topics_buf) - 1] = '\0';
+ }
+ }
+ logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
+#else
+ memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
+#endif
+
+ if (timeout_ms > 0) {
+
+ sec = timeout_ms / 1000;
+ nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+ rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec);
+
+ } else if (timeout_ms == 0) {
+
+ rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
+
+ } else {
+
+ rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
+
+ }
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+exit_entry:
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgCommonReply mcr;
+ mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mcr.mutable_errmsg()->set_errstring(errString);
+ *reply_len=mcr.ByteSizeLong();
+ *reply=malloc(*reply_len);
+ mcr.SerializePartialToArray(*reply,*reply_len);
+#else
+ min = strlen(errString) + 1;
+ buf = malloc(min) ;
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
+
+ *reply = buf;
+ *reply_len = min;
+#endif
+
+ if (rv == 0)
+ return true;
+
+ return false;
+
+}
+
+int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv = BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms);
+
+ return rv;
+}
+
+int BHHeartbeatEasy(const int timeout_ms)
+{
+
+ return true;
+}
+
+int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _ProcInfo_proto
+ {
+ const char *proc_id;
+ const char *name;
+ const char *public_info;
+ const char *private_info;
+ }_input;
+
+ ::bhome_msg::ProcInfo input;
+ if(!input.ParseFromArray(proc_info,proc_info_len)) {
+
+ rv = EBUS_INVALID_PARA;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ _input.proc_id = input.proc_id().c_str();
+ _input.name = input.name().c_str();
+ _input.public_info = input.public_info().c_str();
+ _input.private_info = input.private_info().c_str();
+
+ rv = 0;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ ::bhome_msg::MsgCommonReply mcr;
+ mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mcr.mutable_errmsg()->set_errstring(errString);
+ *reply_len=mcr.ByteSizeLong();
+ *reply=malloc(*reply_len);
+ mcr.SerializePartialToArray(*reply,*reply_len);
+#endif
+
+ return true;
+}
+
+#if defined(PRO_DE_SERIALIZE)
+int BHPublish(const char *msgpub, const char msgpub_len, const int timeout_ms)
+#else
+int BHPublish(const char *topic, const char *content, const int timeout_ms)
+#endif
+{
+ int rv;
+ int min;
+ void *buf = NULL;
+ net_node_t node_arr;
+ int node_arr_len = 0;
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _MsgPublish
+ {
+ const char *topic;
+ const char *data;
+ }_input;
+
+ ::bhome_msg::MsgPublish input;
+ if(!input.ParseFromArray(msgpub, msgpub_len)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ _input.topic = input.topic().c_str();
+ _input.data = input.data().c_str();
+#else
+ if ((topic == NULL) || (content == NULL)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+ if (timeout_ms > 0) {
+ rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data), timeout_ms);
+ } else if (timeout_ms == 0) {
+ rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
+
+ } else {
+
+ rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
+ }
+#else
+ if (timeout_ms > 0) {
+ rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content), timeout_ms);
+
+ } else if (timeout_ms == 0) {
+ rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
+
+ } else {
+ rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
+ }
+#endif
+
+ pthread_mutex_unlock(&mutex);
+
+ if (rv > 0)
+ return true;
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+ return false;
+}
+
+int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms)
+{
+ int rv;
+ int len;
+ void *buf;
+ int key;
+ int size;
+ int sec, nsec;
+ char topics_buf[MAX_STR_LEN] = { 0x00 };
+ char data_buf[MAX_STR_LEN * 3] = { 0x00 };
+
+ struct _ReadSubReply
+ {
+ std::string proc_id;
+ std::string topic;
+ std::string data;
+ } rsr;
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ if (timeout_ms > 0) {
+ sec = timeout_ms / 1000;
+ nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+
+ rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
+
+ } else if (timeout_ms == 0) {
+
+ rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
+
+ } else {
+
+ rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
+ }
+
+ if (rv == 0) {
+
+ len = strlen((char *)buf);
+ if (len > size) {
+ len = size;
+ }
+ strncpy(topics_buf, (char *)buf, len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : len);
+
+ if (len < size) {
+ len = strlen(topics_buf) + 1;
+
+ strncpy(data_buf, (char *)buf + len, size - len);
+ }
+
+ free(buf);
+
+#if defined(PRO_DE_SERIALIZE)
+ rsr.topic = topics_buf;
+ rsr.data = data_buf;
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ sprintf(topics_buf, "%d", key);
+
+ rsr.proc_id = topics_buf;
+ *proc_id_len = rsr.proc_id.size();
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len);
+
+ ::bhome_msg::MsgPublish Mp;
+ Mp.set_topic(rsr.topic);
+ Mp.set_data(rsr.data.data());
+ *msgpub_len = Mp.ByteSizeLong();
+ *msgpub = malloc(*msgpub_len);
+ Mp.SerializePartialToArray(*msgpub, *msgpub_len);
+#else
+ void *ptr;
+ if (len < size) {
+ ptr = malloc(size - len);
+ len = size - len;
+ memcpy(ptr, data_buf, len);
+ } else {
+ ptr = malloc(len);
+ memcpy(ptr, topics_buf, len);
+ }
+ *msgpub = ptr;
+ *msgpub_len = len;
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ sprintf(topics_buf, "%d", key);
+
+ *proc_id_len = strlen(topics_buf);
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, topics_buf, *proc_id_len);
+
+#endif
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len)
+{
+ int rv;
+ void *buf;
+ int size;
+ int val;
+ int len;
+ int min;
+ int sec, nsec;
+ std::string MsgID;
+ int timeout_ms = 3000;
+ char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _BHAddress
+ {
+ unsigned long long mq_id;
+ long long abs_addr;
+ const char *ip;
+ int port;
+ }_input0;
+
+ struct MsgRequestTopic
+ {
+ const char *topic;
+ const char *data;
+ }_input1;
+
+ ::bhome_msg::BHAddress input0;
+ ::bhome_msg::MsgRequestTopic input1;
+ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ _input0.mq_id = input0.mq_id();
+ _input0.abs_addr = input0.abs_addr();
+ _input0.ip = input0.ip().c_str();
+ _input0.port = input0.port();
+ _input1.topic = input1.topic().c_str();
+ _input1.data = input1.data().c_str();
+
+#else
+ if ((request == NULL) || (request_len == 0)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+ strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1));
+#else
+ strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1));
+#endif
+
+ rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
+ if (rv == 0) {
+
+ val = atoi((char *)buf);
+
+ free(buf);
+
+ if (val > 0) {
+
+ len = strlen(topics_buf);
+#if defined(PRO_DE_SERIALIZE)
+ min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
+ strncpy(topics_buf + len + 1, _input1.data, min);
+ len += (min + 1);
+#endif
+ if (timeout_ms > 0) {
+
+ sec = timeout_ms / 1000;
+ nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+
+ rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec);
+
+ } else if (timeout_ms == 0) {
+
+ rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val);
+
+ } else {
+
+ rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val);
+ }
+ } else {
+
+ rv = EBUS_RES_UNSUPPORT;
+
+ }
+ }
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+
+ if((msg_id == NULL) || (msg_id_len == NULL)) {
+ if (rv == 0)
+ return true;
+
+ return false;
+ }
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+ if (rv == 0) {
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ sprintf(topics_buf, "%d", val);
+ MsgID = topics_buf;
+
+ *msg_id_len = MsgID.size();
+ *msg_id = malloc(*msg_id_len);
+ memcpy(*msg_id, MsgID.data(), *msg_id_len);
+
+ return true;
+ }
+
+ return false;
+
+}
+
+int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len,
+ void **reply, int *reply_len, const int timeout_ms)
+{
+ int rv;
+ void *buf;
+ int size;
+ int val;
+ int min, len;
+ net_node_t node;
+ int node_size;
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
+ int sec, nsec;
+ char topics_buf[MAX_STR_LEN] = { 0x00 };
+
+#if defined(PRO_DE_SERIALIZE)
+ struct _BHAddress
+ {
+ unsigned long long mq_id;
+ long long abs_addr;
+ const char *ip;
+ int port;
+ }_input0;
+
+ struct _MsgRequestTopic
+ {
+ const char *topic;
+ const char *data;
+ }_input1;
+
+ ::bhome_msg::BHAddress input0;
+ ::bhome_msg::MsgRequestTopic input1;
+ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ _input0.mq_id = input0.mq_id();
+ _input0.abs_addr = input0.abs_addr();
+ _input0.ip = input0.ip().c_str();
+ _input0.port = input0.port();
+ _input1.topic = input1.topic().c_str();
+ _input1.data = input1.data().c_str();
+
+#else
+ if ((request == NULL) || (request_len == 0)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+#if defined(PRO_DE_SERIALIZE)
+ strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1));
+#else
+ strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1));
+#endif
+
+ rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
+ if (rv == 0) {
+ val = atoi((char *)buf);
+
+ free(buf);
+
+ if (val > 0) {
+ memset(&node, 0x00, sizeof(node));
+
+ len = strlen(topics_buf);
+#if defined(PRO_DE_SERIALIZE)
+ min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
+ strncpy(topics_buf + len + 1, _input1.data, min);
+ len += (min + 1);
+#endif
+
+ node.key = val;
+ rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size);
+ if (rv > 0) {
+ if (recv_arr_size > 0) {
+
+ node.key = recv_arr[0].key;
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ size = recv_arr[0].content_length;
+ buf = (char *)malloc(size);
+ strncpy((char *)buf, (char *)recv_arr[0].content, size);
+#if !defined(PRO_DE_SERIALIZE)
+ *reply = buf;
+ *reply_len = size;
+#endif
+ }
+
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+
+ if(errarr_size > 0) {
+ free(errarr);
+ }
+
+ rv = 0;
+
+ } else {
+ rv = EBUS_TIMEOUT;
+ }
+
+ } else {
+ rv = EBUS_RES_UNSUPPORT;
+ }
+ }
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ struct _RequestReply
+ {
+ std::string proc_id;
+ std::string data;
+ }rr;
+
+ if (rv == 0) {
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ sprintf(topics_buf, "%d", node.key);
+
+ rr.proc_id = topics_buf;
+ *proc_id_len = rr.proc_id.size();
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, rr.proc_id.data(), *proc_id_len);
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ memcpy(topics_buf, buf, size);
+ rr.data = topics_buf;
+
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgRequestTopicReply mrt;
+ mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mrt.mutable_errmsg()->set_errstring(errString);
+ mrt.set_data(rr.data.data());
+ *reply_len = mrt.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mrt.SerializePartialToArray(*reply, *reply_len);
+#endif
+ }
+
+ pthread_mutex_unlock(&mutex);
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms)
+{
+ int rv;
+ void *buf;
+ int key;
+ int size;
+ int sec, nsec;
+ char topics_buf[MAX_STR_LEN] = { 0x00 };
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+ if (timeout_ms > 0) {
+
+ sec = timeout_ms / 1000;
+ nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
+
+ rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
+
+ } else if (timeout_ms == 0) {
+
+ rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
+
+ } else {
+
+ rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
+ }
+
+ if (rv == 0) {
+ struct _ReadRequestReply
+ {
+ std::string proc_id;
+ std::string topic;
+ std::string data;
+ void *src;
+ } rrr;
+
+ sprintf(topics_buf, "%d", key);
+ rrr.proc_id = topics_buf;
+
+ *proc_id_len = rrr.proc_id.size();
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len);
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size);
+ rrr.topic = topics_buf;
+ rrr.data = topics_buf;
+
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgRequestTopic mrt;
+ mrt.set_topic(rrr.topic);
+ mrt.set_data(rrr.data.data());
+ *request_len = mrt.ByteSizeLong();
+ *request = malloc(*request_len);
+ mrt.SerializePartialToArray(*request,*request_len);
+#else
+ *request = buf;
+ *request_len = size;
+#endif
+
+ buf = malloc(sizeof(int));
+ *(int *)buf = key;
+ *src = buf;
+ }
+
+ pthread_mutex_unlock(&mutex);
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHSendReply(void *src, const void *reply, const int reply_len)
+{
+ int rv;
+
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgRequestTopicReply input;
+ if (!input.ParseFromArray(reply, reply_len)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ const char *_input;
+ _input = input.data().data();
+
+#else
+ if ((src == NULL) || (reply == NULL) || (reply_len == 0)) {
+
+ rv = EBUS_INVALID_PARA;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+#endif
+
+ if (gRun_stat == 0) {
+ logger->error("the process has not been registered yet!\n");
+
+ rv = EBUS_RES_NO;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ return false;
+ }
+
+ rv = pthread_mutex_trylock(&mutex);
+ if (rv == 0) {
+
+ rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src);
+
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+
+ pthread_mutex_unlock(&mutex);
+ } else {
+
+ rv = EBUS_RES_BUSY;
+ memset(errString, 0x00, sizeof(errString));
+ strncpy(errString, bus_strerror(rv), sizeof(errString));
+ }
+
+ if (rv == 0)
+ return true;
+
+ return false;
+}
+
+int BHCleanup() {
+
+ return true;
+}
+
+void BHFree(void *buf, int size) {
+ free(buf);
+}
+
+int BHGetLastError(void **msg, int *msg_len)
+{
+ void *buf = NULL;
+
+ buf = malloc(strlen(errString) + 1);
+
+ memset(buf, 0x00, strlen(errString) + 1);
+ memcpy(buf, errString, strlen(errString));
+
+ if ((msg != NULL) && (msg_len != NULL)) {
+ *msg = buf;
+ *msg_len = strlen(errString);
+
+ return true;
+ }
+
+ return false;
+
+}
diff --git a/src/bh_api.h b/src/bh_api.h
index 75a9c17..40d9ffa 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -1,9 +1,11 @@
-#ifndef BH_API
-#define BH_API
+#ifndef _BH_API_WRAPPER_
+#define _BH_API_WRAPPER_
#ifdef __cplusplus
extern "C" {
#endif
+
+#define PRO_DE_SERIALIZE 1
int BHRegister(const void *proc_info,
const int proc_info_len,
@@ -17,15 +19,19 @@
int *reply_len,
const int timeout_ms);
+
int BHRegisterTopics(const void *topics,
const int topics_len,
void **reply,
int *reply_len,
const int timeout_ms);
-int BHQueryTopicAddress(const void *remote, const int remote_len,
- const void *topic, const int topic_len,
- void **reply, int *reply_len,
+int BHQueryTopicAddress(const void *remote,
+ const int remote_len,
+ const void *topics,
+ const int topics_len,
+ void **reply,
+ int *reply_len,
const int timeout_ms);
int BHQueryProcs(const void *remote,
@@ -41,6 +47,7 @@
void **reply,
int *reply_len,
const int timeout_ms);
+
int BHSubscribeNetTopics(const void *topics,
const int topics_len,
void **reply,
@@ -54,9 +61,13 @@
int *reply_len,
const int timeout_ms);
+#if defined(PRO_DE_SERIALIZE)
int BHPublish(const void *msgpub,
const int msgpub_len,
const int timeout_ms);
+#else
+int BHPublish(const char *topic, const char *content, const int timeout_ms);
+#endif
int BHReadSub(void **proc_id,
int *proc_id_len,
@@ -96,7 +107,12 @@
void BHFree(void *buf, int size);
+int BHGetLastError(void **msg, int *msg_len);
+
#ifdef __cplusplus
}
#endif
-#endif
+#endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */
+
+
+
diff --git a/src/bus_def.h b/src/bus_def.h
index 78a7eb9..9634883 100644
--- a/src/bus_def.h
+++ b/src/bus_def.h
@@ -4,4 +4,11 @@
#define BUS_TIMEOUT_FLAG 1
#define BUS_NOWAIT_FLAG 1 << 1
+#define SHM_RES_SIZE 512
+
+#define SHM_BUS_PROC_MAP_KEY 10
+#define SHM_BUS_PROC_TCS_MAP_KEY 11
+#define SHM_BUS_TCS_MAP_KEY 20
+#define SHM_BUS_PROC_PART_MAP_KEY 30
+
#endif
\ No newline at end of file
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index 913a771..49244cd 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -20,7 +20,11 @@
"Send to self error",
"Receive from wrong end",
"Service stoped",
- "Exceed resource limit"
+ "Exceed resource limit",
+ "Service not supported",
+ "Resource busy",
+ "Resource not provide",
+ "Invalid parameters"
};
@@ -50,6 +54,10 @@
char *buf;
/* Make first caller allocate key for thread-specific data */
+ if (err == 0) {
+ err = EBUS_BASE;
+ }
+
s = pthread_once(&once, createKey);
if (s != 0)
err_exit(s, "pthread_once");
diff --git a/src/bus_error.h b/src/bus_error.h
index 769aa36..84f2e89 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -13,6 +13,10 @@
#define EBUS_RECVFROM_WRONG_END 506
#define EBUS_STOPED 507
#define EBUS_EXCEED_LIMIT 508
+#define EBUS_RES_UNSUPPORT 509
+#define EBUS_RES_BUSY 510
+#define EBUS_RES_NO 511
+#define EBUS_INVALID_PARA 512
extern int bus_errno;
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
new file mode 100644
index 0000000..a04edad
--- /dev/null
+++ b/src/bus_proxy_start.cpp
@@ -0,0 +1,127 @@
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include <signal.h>
+#include <limits.h>
+#include <stdio.h>
+#include <errno.h>
+#include <getopt.h>
+#include <stdlib.h>
+
+#define SVR_PORT 5000
+
+#define TOTAL_THREADS 2
+
+static void *gBusServer_socket = NULL;
+static void *gServer_socket = NULL;
+
+static int gShm_size = -1;
+static int gPort = -1;
+
+static int gBusServer_act = 0;
+static int gBusServer_stat = 0;
+
+pthread_t tids[2];
+void *res[2];
+
+void *bus_start(void *skptr) {
+
+ gBusServer_act = 1;
+ gBusServer_socket = bus_server_socket_wrapper_open();
+ if (bus_server_socket_wrapper_start_bus(gBusServer_socket) != 0) {
+ printf("start bus failed\n");
+ gBusServer_stat = -1;
+ }
+
+ return NULL;
+}
+
+
+
+
+void *svr_start(void *skptr) {
+ int port = *(int *)skptr;
+
+ gServer_socket = net_mod_server_socket_open(port);
+ if(net_mod_server_socket_start(gServer_socket) != 0) {
+ printf("start net mod server failed\n");
+ }
+
+ return NULL;
+}
+
+int main(int argc, char *argv[])
+{
+ char *endptr;
+ char i;
+ int val;
+
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
+
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
+
+ if (argc >= 4) {
+ fprintf(stderr, "Usage: %s [shm size] [server port]\n", argv[0]);
+ exit(0);
+ };
+
+ if (argc >= 2) {
+ argc -= 2;
+ for (i = 0; i <= argc; i++) {
+ errno = 0;
+ val = strtol(argv[i + 1], &endptr, 10);
+ if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN))
+ || (errno != 0 && val == 0)) {
+ fprintf(stderr, "invalid parameter: %s\n", argv[i + 1]);
+ exit(0);
+ }
+
+ if (endptr == argv[i + 1]) {
+ fprintf(stderr, "invalid parameter %s: No digits were found\n", argv[i + 1]);
+ exit(0);
+ }
+
+ if (i == 0) {
+ gShm_size = val;
+ } else {
+ gPort = val;
+ }
+ }
+ }
+
+ if (gShm_size == -1) {
+ gShm_size = SHM_RES_SIZE;
+ }
+ shm_mm_wrapper_init(SHM_RES_SIZE);
+
+ pthread_create(&tids[0], NULL, bus_start, NULL);
+
+ if (gPort == -1) {
+ gPort = SVR_PORT;
+ }
+
+ while(gBusServer_act == 0) {
+ sleep(1);
+ }
+
+ if (gBusServer_stat >= 0) {
+ pthread_create(&tids[1], NULL, svr_start, (void *)&gPort);
+ }
+
+ for (i = 0; i< TOTAL_THREADS; i++) {
+ if(pthread_join(tids[i], &res[i]) != 0) {
+ perror("bus_proxy pthread_join");
+ }
+ }
+
+ bus_server_socket_wrapper_close(gBusServer_socket);
+ net_mod_socket_close(gServer_socket);
+ shm_mm_wrapper_destroy();
+
+ return 0;
+}
+
diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index 10b0aac..2f733f8 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -181,7 +181,7 @@
}
if( ret != 0) {
- logger->error("杞彂澶辫触 : NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key, bus_strerror(ret));
+ logger->error("fail: NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key, bus_strerror(ret));
// 杞彂澶辫触
response_head.code = ret;
response_head.content_length = 0;
@@ -277,7 +277,6 @@
FD_CLR(connfd, &pool.read_set);
pool.clientfd[i] = -1;
logger->debug("===server close client %d\n", connfd);
- // printf("===server close client %d\n", connfd);
}
}
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 6a541d6..ab065eb 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -22,7 +22,7 @@
NetModSocket::~NetModSocket() {
-
+
}
@@ -46,6 +46,15 @@
return shmModSocket.force_bind(key);
}
+int NetModSocket::bind_proc_id(char *buf, int len) {
+ return shmModSocket.bind_proc_id(buf, len);
+}
+
+int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) {
+
+ return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
+}
+
// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
@@ -67,7 +76,7 @@
NetConnPool *mpool = (NetConnPool *)_pool;
delete mpool;
- logger->debug("destory connPool");
+
}
/* One-time key creation function */
@@ -343,9 +352,6 @@
return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
}
-
-// int pub(char *topic, int topic_size, void *content, int content_size, int port);
-
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
int content_size, int msec) {
int i, connfd;
@@ -363,7 +369,7 @@
net_mod_err_t err_msg;
// 鏈湴鍙戦��
- if(node_arr == NULL || arrlen == 0) {
+ if ((node_arr == NULL) || (arrlen == 0)) {
if(msec == 0) {
ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG);
} else if(msec > 0) {
@@ -525,7 +531,6 @@
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
-
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
const struct timespec *timeout , int flag, void * user_data ) {
@@ -764,23 +769,23 @@
head.mod = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(uint32_t);
memcpy(head.host, tmp_ptr, sizeof(head.host));
tmp_ptr += sizeof(head.host);
head.port = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(uint32_t);
head.key = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(uint32_t);
head.content_length = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(uint32_t);
head.topic_length = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(uint32_t);
head.timeout = ntohl(GET_INT32(tmp_ptr));
return head;
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index 6289fa6..d8e53ae 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -3,6 +3,7 @@
#include "usg_common.h"
#include "shm_mod_socket.h"
#include "socket_io.h"
+#include "proc_def.h"
#include <poll.h>
#include "socket_def.h"
#include "net_conn_pool.h"
@@ -17,7 +18,7 @@
int key;
};
-#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 6 * sizeof(uint32_t))
+#define NET_MODE_REQUEST_HEAD_LENGTH sizeof(net_mod_request_head_t)
// 璇锋眰澶�
@@ -118,8 +119,8 @@
*/
int force_bind( int key);
-
-
+ int bind_proc_id(char *buf, int len);
+ int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
/**
* 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
@@ -166,7 +167,6 @@
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec);
int recvfrom_nowait( void **buf, int *size, int *key);
-
/**
* 鏈湴鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index abdcbb7..ab4d59d 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -44,6 +44,14 @@
// return sockt->bind(key);
}
+int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
+{
+ NetModSocket *sockt = (NetModSocket *)_socket;
+
+ return sockt->reg(pData, len, buf, size, timeout_ms, flag);
+
+}
+
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
@@ -51,20 +59,17 @@
*/
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
NetModSocket *sockt = (NetModSocket *)_socket;
- logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key);
return sockt->sendto(buf, size, key);
}
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
NetModSocket *sockt = (NetModSocket *)_socket;
- logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key);
return sockt->sendto_timeout(buf, size, key, sec, nsec);
// return sockt->sendto(buf, size, key);
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
NetModSocket *sockt = (NetModSocket *)_socket;
- logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key);
return sockt->sendto_nowait(buf, size, key);
}
@@ -77,22 +82,19 @@
int rv;
NetModSocket *sockt = (NetModSocket *)_socket;
- logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
rv = sockt->recvfrom(buf, size, key);
- logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
return rv;
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
- NetModSocket *sockt = (NetModSocket *)_socket;
- // return sockt->recvfrom(buf, size, key);
- return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
}
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
- NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->recvfrom_nowait(buf, size, key);
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
@@ -101,6 +103,11 @@
return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1);
}
+int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ return sockt->bind_proc_id(proc_id, len);
+}
+
/**
* 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
* @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉�
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index b4941c0..b869510 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -12,6 +12,7 @@
#define __NET_MOD_SOCKET_H__
#include "net_mod_socket.h"
+#include "proc_def.h"
#ifdef __cplusplus
extern "C" {
@@ -55,6 +56,8 @@
*/
int net_mod_socket_force_bind(void * _socket, int key);
+int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
+
/**
* @brief 鍙戦�佷俊鎭�,鍙戦�佸畬鎴愭墠杩斿洖
*
diff --git a/src/proc_def.h b/src/proc_def.h
new file mode 100644
index 0000000..cb4dc0a
--- /dev/null
+++ b/src/proc_def.h
@@ -0,0 +1,70 @@
+#ifndef __PROC_DEF_
+#define __PROC_DEF_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_STR_LEN 128 //keep the same with serializer in proc check
+#define MIN_STR_LEN 10
+#define MAX_PROC_NUM 128
+#define MAX_TOPICS_NUN 60
+
+#define PROC_REG 1
+#define PROC_UNREG 2
+#define PROC_REG_TCS 3
+#define PROC_QUE_TCS 4
+#define PROC_QUE_STCS 5
+#define PROC_QUE_ATCS 6
+
+#define STR_MAGIC ","
+
+typedef struct _ProcInfo {
+#if 0
+ char ServerID[MAX_STR_LEN]; // 鏈哄櫒ID
+ char BoardID[MAX_STR_LEN]; // 鏉垮崱ID
+ char ServerIP[MAX_STR_LEN]; // 鏈哄櫒IP
+ char ProcID[MAX_STR_LEN]; // 杩涚▼鍞竴鏍囪瘑
+ char ProcName[MAX_STR_LEN]; // 杩涚▼鍚嶇О
+ char ProcLabel[MAX_STR_LEN]; // 杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋�
+#else
+ char proc_id[MAX_STR_LEN];
+ char name[MAX_STR_LEN];
+ char public_info[MAX_STR_LEN];
+ char private_info[MAX_STR_LEN];
+#endif
+} ProcInfo;
+
+typedef struct _ProcInfo_sum {
+
+ ProcInfo procData;
+
+ int stat;
+ char reg_info[MAX_STR_LEN];
+ char local_info[MAX_STR_LEN];
+ char net_info[MAX_STR_LEN];
+
+ int list_num;
+
+} ProcInfo_sum;
+
+typedef struct _ProcInfo_query {
+
+ char name[MAX_STR_LEN];
+
+ int num;
+
+ ProcInfo procData;
+
+} ProcInfo_query;
+
+#ifdef __cplusplus
+}
+#endif
+
+
+#endif //end of file
+
+
+
+
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index f536208..ada70c6 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -15,7 +15,7 @@
#include "bus_def.h"
// default Queue size
-#define LOCK_FREE_Q_DEFAULT_SIZE 16
+#define LOCK_FREE_Q_DEFAULT_SIZE 320
#define LOCK_FREE_Q_ST_OPENED 0
@@ -177,6 +177,7 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) {
+ //std::cout << "LockFreeQueue init reference=" << reference << std::endl;
if (sem_init(&slots, 1, qsize) == -1)
err_exit(errno, "LockFreeQueue sem_init");
if (sem_init(&items, 1, 0) == -1)
@@ -211,6 +212,7 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
+ // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
if (sem_destroy(&slots) == -1) {
err_exit(errno, "LockFreeQueue sem_destroy");
}
@@ -249,10 +251,10 @@
typename Allocator,
template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
- // sigset_t mask_all, pre;
- // sigfillset(&mask_all);
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
- // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
if (psem_trywait(&slots) == -1) {
@@ -271,12 +273,12 @@
if (m_qImpl.push(a_data)) {
psem_post(&items);
- // sigprocmask(SIG_SETMASK, &pre, NULL);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
LABEL_FAILTURE:
- // sigprocmask(SIG_SETMASK, &pre, NULL);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return errno;
}
@@ -285,10 +287,10 @@
template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
- // sigset_t mask_all, pre;
- // sigfillset(&mask_all);
+ sigset_t mask_all, pre;
+ sigfillset(&mask_all);
- // sigprocmask(SIG_BLOCK, &mask_all, &pre);
+ sigprocmask(SIG_BLOCK, &mask_all, &pre);
if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
if (psem_trywait(&items) == -1) {
@@ -306,13 +308,13 @@
if (m_qImpl.pop(a_data)) {
psem_post(&slots);
- // sigprocmask(SIG_SETMASK, &pre, NULL);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return 0;
}
LABEL_FAILTURE:
- // sigprocmask(SIG_SETMASK, &pre, NULL);
+ sigprocmask(SIG_SETMASK, &pre, NULL);
return errno;
}
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 74b9b33..f5d64db 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -40,8 +40,8 @@
bool full();
bool empty();
- int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
- int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
+ int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
+ int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
ELEM_T &operator[](unsigned i);
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 6c3cd27..7bb6eac 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -5,7 +5,7 @@
#include <functional>
#include <set>
-#define MAPSIZE 1024
+#define MAPSIZE 4096
// 鍒涘缓Queue鏁伴噺鐨勪笂闄�
#define QUEUE_COUNT_LIMIT 300
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index f23fa08..e4ef672 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -256,6 +256,7 @@
first = false;
shmid = shmget(SHM_KEY, 0, 0);
}
+
if (shmid == -1)
err_exit(errno, "mm_init shmget");
shmp = shmat(shmid, key_addr, 0);
@@ -338,8 +339,6 @@
else
LoggerFactory::getLogger()->debug("shared memory destroy\n");
- LoggerFactory::getLogger()->debug( "mm_destroy: real destroy.");
-
SemUtil::inc(mutex);
SemUtil::remove(mutex);
return true;
@@ -363,6 +362,7 @@
void mm_free_by_key(int key) {
void *ptr;
+
ptr = hashtable_get(hashtable, key);
if(ptr != NULL) {
mm_free(ptr);
diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h
index 5339d8a..0ca0d08 100644
--- a/src/shm/shm_mm.h
+++ b/src/shm/shm_mm.h
@@ -8,6 +8,8 @@
#define SHM_QUEUE_ST_CLOSED 1
#define SHM_QUEUE_ST_RECYCLED 2
+#define SHM_QUEUE_ST_SET 50
+
struct shm_queue_status_t {
int status;
@@ -49,4 +51,6 @@
int shm_mm_alloc_key();
+typedef std::map<SHMString, int, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, int> > > ProcDataZone;
+
#endif
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 7a45696..1646da5 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -1,6 +1,7 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
+#include "shm_socket.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -12,10 +13,10 @@
SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
- for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
if(subscripter_set != NULL) {
- for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
cb(subscripter_set, *set_iter);
}
}
@@ -35,7 +36,7 @@
SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
- for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
subscripter_set->erase(set_iter);
@@ -50,7 +51,6 @@
BusServerSocket::BusServerSocket() {
- logger->debug("BusServerSocket Init");
shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
topic_sub_map = NULL;
@@ -80,10 +80,13 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int BusServerSocket::start(){
+ int rv;
+
topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
- _run_proxy_();
- return 0;
+ rv = _run_proxy_();
+
+ return rv;
}
@@ -114,7 +117,7 @@
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
if(topic_sub_map != NULL) {
- for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
if(subscripter_set != NULL) {
subscripter_set->clear();
@@ -126,8 +129,8 @@
shm_mm_free_by_key(SHM_BUS_MAP_KEY);
}
shm_socket_close(shm_socket);
- logger->debug("BusServerSocket destory 3");
- return 0;
+
+ return 0;
}
/*
@@ -135,10 +138,10 @@
*/
void BusServerSocket::_proxy_sub( char *topic, int key) {
SHMKeySet *subscripter_set;
-
+
+ struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
SHMTopicSubMap::iterator map_iter;
SHMKeySet::iterator set_iter;
-//printf("_proxy_sub topic = %s\n", topic);
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
} else {
@@ -147,6 +150,7 @@
topic_sub_map->insert({topic, subscripter_set});
}
subscripter_set->insert(key);
+
}
/*
@@ -173,7 +177,7 @@
SHMTopicSubMap::iterator map_iter;
// SHMKeySet::iterator set_iter;
- for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
+ for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
subscripter_set = map_iter->second;
subscripter_set->erase(key);
}
@@ -182,7 +186,7 @@
/*
* 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
*/
-void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
+void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
@@ -198,7 +202,7 @@
if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
subscripter_set = map_iter->second;
- for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
+ for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
send_key = *set_iter;
rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
if(rv == 0) {
@@ -209,7 +213,7 @@
}
// 鍒犻櫎宸插叧闂殑绔�
- for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
+ for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
subscripter_set->erase(set_iter);
logger->debug("remove closed subscripter %d \n", send_key);
@@ -218,13 +222,458 @@
subscripter_to_del.clear();
}
+
}
+ProcInfo_query *Qurey_object(const char *object, int *length) {
+ int flag = 0;
+ int val;
+ int len;
+ int total = 0;
+ ProcInfo *Proc_ptr = NULL;
+ ProcInfo Data_stru;
+ ProcInfo_query *dataBuf = NULL;
+ SvrProc *SvrSub_ele;
+ SvrTcs::iterator svr_tcs_iter;
+ SvrProc::iterator svr_proc_iter;
+ ProcZone::iterator proc_iter;
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
+ SvrSub_ele = svr_tcs_iter->second;
+ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
+ val = *svr_proc_iter;
+
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+
+ if (dataBuf == NULL) {
+ dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
+ if (dataBuf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ total = sizeof(ProcInfo_query);
+ }
+
+ if (flag == 0) {
+ memset(dataBuf, 0x00, sizeof(ProcInfo_query));
+
+ dataBuf->num = 1;
+ strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
+
+ flag = 1;
+
+ } else {
+ dataBuf->num++;
+ len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
+ dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
+ if (dataBuf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ total += sizeof(ProcInfo);
+ memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
+ }
+
+ memset(&Data_stru, 0x00, sizeof(ProcInfo));
+ Data_stru = proc_iter->second;
+
+ Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
+ strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
+ strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
+ strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
+ strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
+
+ if (length != NULL)
+ *length = total;
+ }
+ }
+ }
+
+ return dataBuf;
+}
+
+void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
+{
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
+ int count = 0;
+ int i = 0;
+ int len = 0;
+ char *data_ptr;
+ ProcInfo Data_stru;
+ ProcZone::iterator proc_iter;
+ TcsZone *TcsSub_ele;
+ ProcDataZone::iterator proc_que_iter;
+ ProcTcsMap::iterator proc_tcs_iter;
+ SvrProc *SvrSub_ele;
+ SvrProc::iterator svr_proc_iter;
+ SvrTcs::iterator svr_tcs_iter;
+ TcsZone::iterator tcssub_iter;
+ ProcPartZone::iterator proc_part_iter;
+
+ struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+ memset(&Data_stru, 0x00, sizeof(ProcInfo));
+
+ if (buf != NULL) {
+
+ memcpy(Data_stru.proc_id, buf, strlen(buf) + 1);
+ count = strlen(buf) + 1;
+
+ memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+ }
+
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
+ ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
+ if (flag == PROC_REG) {
+ if ((proc_iter = proc->find(key)) == proc->end()) {
+ proc->insert({key, Data_stru});
+ }
+
+ if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
+ procPart->insert({key, Data_stru.proc_id});
+ }
+
+ if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
+ procQuePart->insert({Data_stru.proc_id, key});
+ }
+
+ } else {
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+ for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
+ SvrSub_ele = svr_tcs_iter->second;
+
+ SvrSub_ele->erase(key);
+ }
+
+ if ((proc_iter = proc->find(key)) != proc->end()) {
+
+ len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
+ strncpy(buf_temp, (proc_iter->second).proc_id, len);
+ proc->erase(proc_iter);
+
+ }
+
+ if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
+
+ procPart->erase(key);
+ }
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ procQuePart->erase(buf_temp);
+ }
+
+ }
+ } else if (flag == PROC_REG_TCS) {
+ ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+ if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
+ TcsSub_ele = proc_tcs_iter->second;
+ } else {
+
+ void *ptr_set = mm_malloc(sizeof(TcsZone));
+ TcsSub_ele = new(ptr_set) TcsZone;
+ proc->insert({key, TcsSub_ele});
+ }
+
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+ data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+ while(data_ptr) {
+ TcsSub_ele->insert(data_ptr);
+ if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
+ SvrSub_ele = svr_tcs_iter->second;
+ } else {
+
+ void *ptr_set = mm_malloc(sizeof(SvrProc));
+ SvrSub_ele = new(ptr_set) SvrProc;
+ SvrData->insert({data_ptr, SvrSub_ele});
+ }
+ SvrSub_ele->insert(key);
+ data_ptr = strtok(NULL, STR_MAGIC);
+ }
+
+ } else if (flag == PROC_QUE_TCS) {
+
+ struct _temp_store {
+ void *ptr;
+ int total;
+ } *temp_store = NULL;
+
+ int num = 0;
+ int sum = 0;
+
+ ProcInfo_query *ret = NULL;
+ ProcInfo_query *ret_store = NULL;
+
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+ data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
+ while(data_ptr) {
+ ret = Qurey_object(data_ptr, &len);
+ if (ret != NULL) {
+
+ if (temp_store == NULL) {
+ temp_store = (_temp_store *)malloc(sizeof(_temp_store));
+ if (temp_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ temp_store->ptr = ret;
+ temp_store->total = len;
+ num = 1;
+
+ } else {
+ num++;
+ temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
+ if (temp_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ (temp_store + num - 1)->ptr = ret;
+ (temp_store + num - 1)->total = len;
+ }
+
+ }
+ data_ptr = strtok(NULL, STR_MAGIC);
+ }
+
+ if (num > 0) {
+ for (count = 0; count < num; count++) {
+
+ if (ret_store == NULL) {
+ ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
+ if (ret_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ sum = (temp_store + count)->total;
+ memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
+
+ } else {
+
+ ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
+ if (ret_store == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
+
+ sum += (temp_store + count)->total;
+
+ }
+
+ free((temp_store + count)->ptr);
+
+ }
+
+ free(temp_store);
+ }
+
+ void *last_buf = malloc(sum + sizeof(int));
+ if (last_buf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ *(int *)last_buf = num;
+ if (num > 0) {
+ memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
+ free(ret_store);
+ }
+
+ shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+ free(last_buf);
+ } else if (flag == PROC_QUE_STCS) {
+ SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+
+ strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+ if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
+ SvrSub_ele = svr_tcs_iter->second;
+
+ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
+ count = *svr_proc_iter;
+
+ break;
+ }
+ } else {
+ count = 0;
+ }
+
+ memset(buf_temp, 0x00, sizeof(buf_temp));
+ sprintf(buf_temp, "%d", count);
+ shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
+
+ } else {
+
+ int val;
+ int temp = 0;
+ int pos = 0;
+ int size = 0;
+ ProcInfo_sum *Data_sum = NULL;
+ SHMKeySet *subs_proc;
+ SHMKeySet::iterator subs_proc_iter;
+ SHMTopicSubMap::iterator subs_iter;
+
+ ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
+
+ memset(&Data_stru, 0x00, sizeof(Data_stru));
+
+ if (count == 0) {
+ Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
+ if (Data_sum == NULL) {
+
+ logger->error("in proxy_reg: Out of memory!\n");
+
+ exit(1);
+ }
+
+ count++;
+
+ memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
+
+ } else {
+
+ count++;
+ len = sizeof(ProcInfo_sum) * count;
+ Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
+ if (Data_sum == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+
+ exit(1);
+ }
+
+ memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
+ }
+
+ Data_stru = proc_iter->second;
+
+ memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
+ memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
+ memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
+ memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
+
+ (Data_sum + count - 1)->stat = 1;
+ (Data_sum + count - 1)->list_num = 3;
+
+ val = proc_iter->first;
+ if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
+ TcsSub_ele = proc_tcs_iter->second;
+
+ temp = 0;
+ pos = 0;
+ len = sizeof(Data_sum->reg_info) - 1;
+ for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
+
+ if (temp == 0) {
+ strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
+ pos += strlen((Data_sum + count - 1)->reg_info);
+ len -= strlen((Data_sum + count - 1)->reg_info);
+
+ temp++;
+ } else {
+
+ if (len > 0) {
+ strcat((Data_sum + count - 1)->reg_info, ",");
+
+ pos += 1;
+ len -= 1;
+ }
+
+ if (len > 0) {
+ size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
+ strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
+
+ pos += size;
+ len -= size;
+ }
+ }
+ }
+
+ pos = 0;
+ temp = 0;
+ len = sizeof(Data_sum->local_info) - 1;
+ for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
+ subs_proc = subs_iter->second;
+
+ if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
+
+ if ((temp == 0)) {
+
+ strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
+ pos += strlen((Data_sum + count - 1)->local_info);
+ len -= strlen((Data_sum + count - 1)->local_info);
+
+ temp++;
+ } else {
+
+ if (len > 0) {
+ strcat((Data_sum + count - 1)->local_info, ",");
+
+ pos += 1;
+ len -= 1;
+ }
+
+ if (len > 0) {
+ size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
+ strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
+
+ pos += size;
+ len -= size;
+ }
+ }
+
+ }
+ }
+
+ }
+ }
+
+ temp = count * sizeof(ProcInfo_sum);
+ void *last_buf = malloc(temp + sizeof(int));
+ if (last_buf == NULL) {
+ logger->error("in proxy_reg: Out of memory!\n");
+ exit(1);
+ }
+
+ *(int *)last_buf = count;
+ if (count > 0) {
+ memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
+ free(Data_sum);
+ }
+
+ shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
+
+ }
+}
// 杩愯浠g悊
-void * BusServerSocket::_run_proxy_() {
+int BusServerSocket::_run_proxy_() {
int size;
int key;
+ int flag;
char * action, *topic, *topics, *buf, *content;
size_t head_len;
char resp_buf[128];
@@ -233,25 +682,21 @@
int rv;
char send_buf[512] = { 0x00 };
- const char *topic_delim = ",";
-
- while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
+ const char *topic_delim = ",";
+ while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
head = ShmModSocket::decode_bus_head(buf);
topics = buf + BUS_HEAD_SIZE;
action = head.action;
-
if(strcmp(action, "sub") == 0) {
// 璁㈤槄鏀寔澶氫富棰樿闃�
topic = strtok(topics, topic_delim);
while(topic) {
-
_proxy_sub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
}
}
else if(strcmp(action, "desub") == 0) {
-
if(strcmp(trim(topics, 0), "") == 0) {
// 鍙栨秷鎵�鏈夎闃�
_proxy_desub_all(key);
@@ -259,7 +704,6 @@
topic = strtok(topics, topic_delim);
while(topic) {
-
_proxy_desub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
}
@@ -267,10 +711,45 @@
}
else if(strcmp(action, "pub") == 0) {
- content = topics + head.topic_size;
- _proxy_pub(topics, content, head.content_size, key);
+ topics[head.topic_size - 1] = '\0';
+ content = topics + head.topic_size;
- }
+ _proxy_pub(topics, topics, head.topic_size + head.content_size, key);
+
+ }
+ else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
+ || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
+ || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+ content = topics + head.topic_size;
+ if (strcmp(action, "reg") == 0) {
+
+ flag = PROC_REG;
+
+ } else if (strcmp(action, "unreg") == 0) {
+
+ flag = PROC_UNREG;
+
+ } else if (strcmp(action, "tcsreg") == 0) {
+
+ flag = PROC_REG_TCS;
+
+ } else if (strcmp(action, "tcsque") == 0) {
+
+ flag = PROC_QUE_TCS;
+
+ } else if (strcmp(action, "stcsque") == 0) {
+
+ flag = PROC_QUE_STCS;
+
+ } else {
+
+ flag = PROC_QUE_ATCS;
+
+ }
+
+ _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
+
+ }
else if (strncmp(buf, "request", strlen("request")) == 0) {
sprintf(send_buf, "%4d", key);
strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
@@ -281,7 +760,6 @@
}
}
else if(strcmp(action, "stop") == 0) {
- logger->info( "Stopping Bus...");
free(buf);
break;
} else {
@@ -291,5 +769,5 @@
}
- return NULL;
+ return rv;
}
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 956271b..3052c8b 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -18,7 +18,6 @@
typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
-
class BusServerSocket {
private:
shm_socket_t *shm_socket;
@@ -29,14 +28,16 @@
private:
int destroy();
void _proxy_sub( char *topic, int key);
- void _proxy_pub( char *topic, void *buf, size_t size, int key);
- void *_run_proxy_();
+ void _proxy_pub( char *topic, char *buf, size_t size, int key);
+ int _run_proxy_();
// int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
void _proxy_desub( char *topic, int key);
void _proxy_desub_all(int key);
- static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb);
+ void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag);
+
+ static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb);
// static bool include_in_keys(int key, int keys[], size_t length);
public:
diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp
index d2f5a8e..6b730a9 100644
--- a/src/socket/bus_server_socket_wrapper.cpp
+++ b/src/socket/bus_server_socket_wrapper.cpp
@@ -7,7 +7,6 @@
* 鍒涘缓
*/
void * bus_server_socket_wrapper_open() {
- logger->debug("===bus_server_socket_wrapper_open\n");
BusServerSocket *sockt = new BusServerSocket;
return (void *)sockt;
}
@@ -19,7 +18,6 @@
BusServerSocket *sockt = (BusServerSocket *)_socket;
delete sockt;
- logger->debug("===bus_server_socket_wrapper_close\n");
}
int bus_server_socket_wrapper_stop(void *_socket) {
@@ -35,7 +33,7 @@
int ret;
BusServerSocket *sockt = (BusServerSocket *)_socket;
- if( (ret = sockt->force_bind(SHM_BUS_KEY)) == 0) {
+ if( (ret = sockt->bind(SHM_BUS_KEY)) == 0) {
return sockt->start();
} else {
logger->error("start bus failed");
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index abd9477..a94b9c3 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -38,6 +38,129 @@
return shm_socket_force_bind(shm_socket, key);
}
+int ShmModSocket::bind_proc_id(char *buf, int len) {
+ return shm_socket_bind_proc_id(shm_socket, buf, len);
+}
+
+int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
+{
+ int ret;
+ struct timespec ts;
+
+ bus_head_t head = {};
+
+ if (flag == PROC_REG) {
+
+ memcpy(head.action, "reg", sizeof(head.action));
+
+ } else if (flag == PROC_UNREG) {
+
+ memcpy(head.action, "unreg", sizeof(head.action));
+
+ } else if (flag == PROC_REG_TCS) {
+
+ memcpy(head.action, "tcsreg", sizeof(head.action));
+
+ } else if (flag == PROC_QUE_TCS) {
+
+ memcpy(head.action, "tcsque", sizeof(head.action));
+
+ } else if (flag == PROC_QUE_STCS) {
+
+ memcpy(head.action, "stcsque", sizeof(head.action));
+
+ } else if (flag == PROC_QUE_ATCS) {
+
+ memcpy(head.action, "atcsque", sizeof(head.action));
+
+ } else {
+
+ return -1;
+
+ }
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+ head.topic_size = 0;
+
+ if (pData != NULL) {
+
+ head.content_size = sizeof(ProcInfo);
+
+ } else {
+
+ head.content_size = 0;
+
+ }
+ } else {
+
+ head.topic_size = len;
+
+ head.content_size = 0;
+
+ }
+
+ void *buf_temp;
+ int buf_size;
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+
+ buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
+
+ } else {
+
+ buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
+
+ }
+
+ if (timeout_ms > 0) {
+
+ ts.tv_sec = timeout_ms /1000;
+
+ ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+ ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
+
+ } else {
+
+ ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
+
+ }
+
+ } else if (timeout_ms == 0) {
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+ ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
+
+ } else {
+
+ ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
+
+ }
+
+ } else {
+
+ if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+ ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
+
+ } else {
+
+ ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
+
+ }
+
+ }
+
+ free(buf_temp);
+
+ return ret;
+
+}
+
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
@@ -60,7 +183,8 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
if(rv == 0) {
logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
@@ -77,7 +201,7 @@
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
@@ -183,8 +307,8 @@
memcpy(head.action, "pub", sizeof(head.action));
head.topic_size = topic_size = strlen(topic) + 1;
head.content_size = content_size;
-
- void *buf;
+
+ void *buf;
int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf);
if(size > 0) {
ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
@@ -216,6 +340,7 @@
char *buf;
int max_buf_size;
void *buf_ptr;
+ int count = 0;
if((buf = (char *) malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
exit(1);
@@ -223,7 +348,7 @@
max_buf_size = MAXBUF;
}
- buf_size = BUS_HEAD_SIZE + content_size + topic_size ;
+ buf_size = BUS_HEAD_SIZE + content_size + topic_size;
if(max_buf_size < buf_size) {
if((buf = (char *) realloc(buf, buf_size)) == NULL) {
@@ -238,8 +363,19 @@
memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
if(topic_size != 0 )
memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
- if(content_size != 0)
- memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+ if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
+ (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
+ memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+ } else {
+ if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
+ strlen("unreg")) == 0)) && (content_buf != NULL)) {
+ proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
+
+ request_head.content_size = count;
+ buf_size -= (content_size - count);
+
+ }
+ }
*retbuf = buf;
free(buf_ptr);
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 9890aef..da02fab 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -5,6 +5,7 @@
#include "shm_allocator.h"
#include "shm_mm.h"
#include "hashtable.h"
+#include "proc_def.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
@@ -60,6 +61,9 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int force_bind(int key);
+
+ int bind_proc_id(char *buf, int len);
+ int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
@@ -75,9 +79,7 @@
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-
int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0);
-
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
@@ -128,7 +130,14 @@
*/
int get_key() ;
+ int get_procid(char *buf, int len);
+
};
+
+typedef std::map<int, ProcInfo, std::less<int>, SHM_STL_Allocator<std::pair<int, ProcInfo> > > ProcZone;
+typedef std::set<SHMString, std::less<SHMString>, SHM_STL_Allocator<SHMString> > TcsZone;
+typedef std::map<int, TcsZone *, std::less<int>, SHM_STL_Allocator<std::pair<const int, TcsZone *> > > ProcTcsMap;
+
#endif
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 918aef6..84bf77e 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -1,4 +1,5 @@
#include "shm_socket.h"
+#include "socket_def.h"
#include "hashtable.h"
#include "logger_factory.h"
#include <map>
@@ -108,8 +109,6 @@
int rv, i;
hashtable_t *hashtable = mm_get_hashtable();
- logger->debug("shm_socket_close\n");
-
// if(sockt->key != 0) {
// auto it = shmQueueStMap->find(sockt->key);
@@ -118,8 +117,6 @@
// it->second.closeTime = time(NULL);
// }
// }
-
-
if(sockt->queue != NULL) {
sockt->queue->close();
@@ -169,8 +166,20 @@
return 0;
}
+int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) {
+ strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len);
+
+ return 0;
+}
+
int shm_socket_get_key(shm_socket_t *sockt){
return sockt->key;
+}
+
+int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) {
+ strncpy(buf, sockt->proc_id, len);
+
+ return 0;
}
// 鐭繛鎺ユ柟寮忓彂閫�
@@ -297,8 +306,6 @@
{
int rv;
- logger->debug("%lu destroy threadlocal socket\n", pthread_self());
-
if(tmp_socket == NULL)
return;
@@ -411,8 +418,6 @@
int rv = 0, tryn = 16;
- static int Counter_suc = 0;
- static int Counter_fail = 0;
shm_packet_t sendpak;
shm_packet_t recvpak;
std::map<int, shm_packet_t>::iterator recvbufIter;
@@ -430,12 +435,13 @@
{
/* If first call from this thread, allocate buffer for thread, and save its location */
tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
-
- rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
- if ( rv != 0) {
- logger->error(rv, "shm_sendandrecv : pthread_setspecific");
- exit(1);
- }
+
+ }
+
+ rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
+ if ( rv != 0) {
+ logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+ exit(1);
}
sendpak.key = tmp_socket->key;
@@ -473,6 +479,7 @@
} else {
// 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
+ exit(0);
continue;
}
}
@@ -481,7 +488,6 @@
return EBUS_RECVFROM_WRONG_END;
LABLE_SUC:
- sockt->key = tmp_socket->key;
if(recv_buf != NULL) {
void *_buf = malloc(recvpak.size);
memcpy(_buf, recvpak.buf, recvpak.size);
@@ -520,8 +526,8 @@
// 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
if(send_key != recv_key) {
- // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
- // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+
+ logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
continue;
}
@@ -640,7 +646,6 @@
if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
-
if (sockt->key == 0) {
sockt->key = hashtable_alloc_key(hashtable);
}
@@ -664,7 +669,6 @@
LABEL_POP:
-
rv = sockt->queue->pop(recvpak, timeout, flag);
if(rv != 0) {
if(rv == ETIMEDOUT) {
@@ -680,4 +684,23 @@
*_recvpak = recvpak;
return 0;
}
-
+
+void proc_copy(char *dst, void *src, int *counter) {
+ int count = 0;
+ ProcInfo *ptr = static_cast<ProcInfo *>(src);
+
+ memcpy(dst, ptr->proc_id, strlen(ptr->proc_id) + 1);
+ count = strlen(ptr->proc_id) + 1;
+ memcpy(dst + count, ptr->name, strlen(ptr->name) + 1);
+ count += strlen(ptr->name) + 1;
+ memcpy(dst + count, ptr->public_info, strlen(ptr->public_info) + 1);
+ count += strlen(ptr->public_info) + 1;
+ memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1);
+ count += strlen(ptr->private_info) + 1;
+
+ *counter = count;
+}
+
+
+
+
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 97d9f2c..8e874d1 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -4,6 +4,7 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
+#include "proc_def.h"
#include "lock_free_queue.h"
#include <functional>
@@ -18,7 +19,8 @@
#define BUS_ACTION_STOP 1
typedef struct shm_packet_t {
- int key;
+ int key;
+
size_t size;
void * buf;
char uuid[64];
@@ -31,8 +33,8 @@
typedef struct shm_socket_t {
shm_socket_type_t socket_type;
- // 鏈湴key
int key;
+ char proc_id[MAX_STR_LEN];
bool force_bind;
pthread_mutex_t mutex;
@@ -59,7 +61,8 @@
int shm_socket_bind(shm_socket_t * socket, int key) ;
int shm_socket_force_bind(shm_socket_t * socket, int key) ;
-
+
+int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len);
/**
* @flags : BUS_NOWAIT_FLAG
*/
@@ -70,6 +73,9 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,
const struct timespec * timeout = NULL, int flags = 0);
+typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SvrProc;
+typedef std::map<SHMString, SvrProc *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SvrProc *> > > SvrTcs;
+typedef std::map<int, SHMString, std::less<int>, SHM_STL_Allocator<std::pair<int, const SHMString> > > ProcPartZone;
/**
* @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
* @recvbuf 鏀跺埌鐨勬暟鎹�
@@ -83,7 +89,6 @@
const struct timespec *timeout = NULL, int flag = 0, void * user_data = NULL);
-
-
+void proc_copy(char *dst, void *src, int *count);
#endif
\ No newline at end of file
diff --git a/src/svsem.cpp b/src/svsem.cpp
index 26bde2c..00e6dbc 100644
--- a/src/svsem.cpp
+++ b/src/svsem.cpp
@@ -11,7 +11,6 @@
union semun arg;
struct sembuf sop;
-
arg.val = 0; /* So initialize it to 0 */
if (semctl(semid, 0, SETVAL, arg) == -1)
err_exit(errno, "semctl 1");
diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt
index 18bcf4c..a096136 100644
--- a/test_socket/CMakeLists.txt
+++ b/test_socket/CMakeLists.txt
@@ -6,7 +6,19 @@
${EXTRA_INCLUDES}
)
-
+add_executable(bus_test_inter bus_test_inter.cpp)
+target_link_libraries(bus_test_inter PRIVATE shm_queue ${EXTRA_LIBS} )
+target_include_directories(bus_test_inter PRIVATE
+ "${PROJECT_BINARY_DIR}"
+ ${EXTRA_INCLUDES}
+ )
+
+add_executable(bus_test_server_mode bus_test_server_mode.cpp)
+target_link_libraries(bus_test_server_mode PRIVATE shm_queue ${EXTRA_LIBS} )
+target_include_directories(bus_test_server_mode PRIVATE
+ "${PROJECT_BINARY_DIR}"
+ ${EXTRA_INCLUDES}
+ )
add_custom_command(
OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
diff --git a/test_socket/bus_test_inter.cpp b/test_socket/bus_test_inter.cpp
new file mode 100644
index 0000000..8abcb46
--- /dev/null
+++ b/test_socket/bus_test_inter.cpp
@@ -0,0 +1,502 @@
+#include "bus_server_socket.h"
+#include "shm_mod_socket.h"
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include "mm.h"
+#include "logger_factory.h"
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#include "bus_error.h"
+
+#include "bh_api.h"
+#include "proc_def.h"
+
+#define TOTAL_REG_UNREG 2
+
+#define MAGIC_STR "INTER_"
+#define STR_LEN 30
+
+pthread_t tids;
+void *res;
+
+static ProcInfo proc_desc;
+
+char *genStr(int length, char *buf)
+{
+ int flag, i;
+ char *str;
+
+ if (length < (strlen(buf) + 10)) {
+ length = strlen(buf) + 10;
+ }
+
+ srand((unsigned) time(NULL));
+ if ((str = (char *)malloc(length + 1)) == NULL) {
+ printf("out of memory!\n");
+
+ exit(0);
+ }
+
+ memset(str, 0x00, length + 1);
+ memcpy(str, MAGIC_STR, strlen(MAGIC_STR));
+ strcat(str, buf);
+ for (i = strlen(str); i < length; i++) {
+ flag = rand() % 3;
+ switch (flag) {
+ case 0:
+ str[i] = 'A' + rand() % 26;
+ break;
+
+ case 1:
+ str[i] = 'a' + rand() % 26;
+ break;
+
+ default:
+ str[i] = '0' + rand() % 10;
+ break;
+ }
+ }
+
+ str[length] = '\0';
+
+ return str;
+
+}
+
+void *client_recv(void *skptr) {
+
+ pthread_detach(pthread_self());
+
+ void *recvbuf = NULL;
+ int recv_len;
+ void *proc_id = NULL;
+ int id_len;
+ int rv;
+ void *errBuf = NULL;
+ int len;
+ char proc_data[200] = { 0x00 };
+ char topics[200] = { 0x00 };
+
+ struct timespec timeout = {2, 0};
+
+ while (true) {
+
+ rv = BHReadSub(&proc_id, &id_len, &recvbuf, &recv_len, -1);
+ if(rv == true) {
+
+ memset(topics, 0x00, sizeof(topics));
+ memset(proc_data, 0x00, sizeof(proc_data));
+
+ memcpy(proc_data, proc_id, (sizeof(proc_data) - 1) > id_len ? id_len : (sizeof(proc_data) - 1));
+
+ memcpy(topics, recvbuf, (sizeof(topics) - 1) > recv_len ? recv_len : (sizeof(topics) - 1));
+
+ printf("Get the sub topics data(%s) from proc id(%s)\n", (char *)topics, (char *)proc_id);
+
+ BHFree(recvbuf, len);
+ BHFree(proc_id, id_len);
+
+ } else {
+ BHGetLastError(&errBuf, &len);
+ printf("the thread recv fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, len);
+ }
+
+ }
+
+}
+
+void parseQueryTopicsBuf(void *buf, int len)
+{
+ if (buf == NULL)
+ return;
+
+ int total_topics = *(int *)buf;
+ int i, j;
+ int buf_pos = 0;
+ void *ptr_temp = NULL;
+ ProcInfo_query *ptr = NULL;
+ ProcInfo *Proc_ptr = NULL;
+
+ buf_pos = sizeof(ProcInfo_query);
+ ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
+ ptr_temp = (void *)ptr;
+ for (i = 0; i < total_topics; i++) {
+ printf("topic %s:\n", ptr->name);
+ for (j = 0; j < ptr->num; j++) {
+ printf("the %dst process info:\n", j + 1);
+ Proc_ptr = &(ptr->procData) + j;
+ printf("proc_id: %s\n", Proc_ptr->proc_id);
+ printf("name: %s\n", Proc_ptr->name);
+ printf("public_info: %s\n", Proc_ptr->public_info);
+ printf("private_info: %s\n", Proc_ptr->private_info);
+
+ }
+
+ if (ptr->num > 1) {
+ buf_pos += sizeof(ProcInfo) * (ptr->num - 1);
+ }
+
+ ptr = (ProcInfo_query *)((char *)ptr_temp + buf_pos);
+ }
+
+}
+
+void parseQueryProcBuf(void *buf, int len)
+{
+ if (buf == NULL)
+ return;
+
+ int total = *(int *)buf;
+ int i;
+ ProcInfo_sum *Proc_ptr = NULL;
+
+ Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
+ for (i = 0; i < total; i++) {
+ printf("proc_id: %s\n", (Proc_ptr + i)->procData.proc_id);
+ printf("name: %s\n", (Proc_ptr + i)->procData.name);
+ printf("public_info: %s\n", (Proc_ptr + i)->procData.public_info);
+ printf("private_info: %s\n", (Proc_ptr + i)->procData.private_info);
+
+ printf("service: %s\n", (Proc_ptr + i)->reg_info);
+ printf("sub: %s\n", (Proc_ptr + i)->local_info);
+ }
+}
+
+int main(int argc, char *argv[]) {
+ int ret;
+ char *ptr = NULL;
+ char buf[] = "Process";
+ void *buf_temp = NULL;
+ void *errBuf = NULL;
+ void *proc_id = NULL;
+ int size;
+ int id_len;
+ int i;
+ char data_buf[200] = { 0x00 };
+ const int timeout_ms = 3000;
+
+ memset(&proc_desc, 0x00, sizeof(ProcInfo));
+ ptr = genStr(STR_LEN, buf);
+ strncpy(proc_desc.proc_id, ptr, strlen(ptr) + 1);
+ //strncpy(proc_desc.proc_id, "Hello", strlen("Hello") + 1);
+ free(ptr);
+
+ sleep(2); //make rand change
+ ptr = genStr(STR_LEN, buf);
+ strncpy(proc_desc.name, ptr, strlen(ptr) + 1);
+ //strncpy(proc_desc.name, "World", strlen("World") + 1);
+ free(ptr);
+
+ sleep(2);
+ ptr = genStr(STR_LEN, buf);
+ //strncpy(proc_desc.public_info, ptr, strlen(ptr) + 1);
+ strncpy(proc_desc.public_info, "Good", strlen("Good") + 1);
+ free(ptr);
+
+ sleep(2);
+ ptr = genStr(STR_LEN, buf);
+ //strncpy(proc_desc.private_info, ptr, strlen(ptr) + 1);
+ //strncpy(proc_desc.private_info, "Bye", strlen("Bye") + 1);
+ free(ptr);
+
+ printf("before the registered, process info:\n");
+ printf("proc_id: %s\n", proc_desc.proc_id);
+ printf("name: %s\n", proc_desc.name);
+ printf("public_info: %s\n", proc_desc.public_info);
+ printf("private_info: %s\n", proc_desc.private_info);
+
+ for (i = 0; i < TOTAL_REG_UNREG; i++) {
+ ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered fail with error: %s\n", (char *)errBuf);
+ BHFree(errBuf, size);
+
+ printf("the second way to get the error log: %s\n", buf_temp);
+
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process unregistered OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process unregistered fail with error: %s\n", (char *)errBuf);
+ BHFree(errBuf, size);
+
+ printf("the second way to get the error log: %s\n", buf_temp);
+
+ };
+ BHFree(buf_temp, size);
+ }
+
+ //const char *topics_reg_buf1[] = {"topics demo1"};
+ const char *topics_reg_buf1 = "topics demo1";
+ const char *topics_query_buf1 = "topics demo1";
+ const char *topics_query_buf2 = "Hello World,So,Great,Good"; //No space between each other
+ //const char *topics_reg_buf2[] = {"Hello World", "So", "Great", "Good"};
+ const char *topics_reg_buf2 = "Hello World,So,Great,Good";
+
+ //const char *topics_sub_buf1[] = {"news"};
+ //const char *topics_sub_buf2[] = {"sports", "balls", "topics demo1"};
+ const char *topics_sub_buf1 = "news";
+ const char *topics_sub_buf2 = "sports,balls,topics demo1";
+
+ const char *topics_pub_topic1 = "news";
+ const char *topics_pub_topic1_data = "boob news";
+
+ const char *topics_pub_topic2 = "balls";
+ const char *topics_pub_topic2_data = "Great volleyballs and basketballs";
+
+ ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered topics OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered topics OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered2 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHQueryTopicAddress(NULL, 0, topics_query_buf1, strlen(topics_query_buf1), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process query topics OKay\n");
+
+ parseQueryTopicsBuf(buf_temp, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process query3 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHQueryTopicAddress(NULL, 0, topics_query_buf2, strlen(topics_query_buf2), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process query topics OKay\n");
+
+ parseQueryTopicsBuf(buf_temp, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process query4 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ pthread_create(&tids, NULL, client_recv, NULL);
+
+ ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered topics OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHSubscribeTopics(topics_sub_buf1, strlen(topics_sub_buf1), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process subscribe topics OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process sub1 fail with error: %s(%s)\n", (char *)errBuf, buf_temp);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHSubscribeTopics(topics_sub_buf2, strlen(topics_sub_buf2), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("tthe process subscribe topics OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process sub2 fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered topics OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered10 fail with errorL %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1";
+ const char *topics_server_specific_reg_buf2 = "Server Specific Hello World";
+ void *msgID = NULL;
+ int msg_id_len;
+
+ ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &msgID, &msg_id_len);
+ if (ret == true) {
+ printf("the process BHAsyncRequest topics OKay\n");
+
+ BHFree(msgID, msg_id_len);
+
+ } else {
+
+ BHGetLastError(&errBuf, &size);
+ printf("the process BHAsyncRequest1 topics fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+
+ ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), NULL, 0);
+ if (ret == true) {
+ printf("the process BHAsyncRequest topics OKay\n");
+
+ } else {
+
+ BHGetLastError(&errBuf, &size);
+ printf("the process BHAsyncRequest2 topics fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+
+ ret = BHRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &proc_id, &id_len,
+ &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+
+ memset(data_buf, 0x00, sizeof(data_buf));
+ strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1));
+ printf("the process BHRequest topics OKay\n");
+
+ printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id);
+
+
+ } else {
+
+ BHGetLastError(&errBuf, &size);
+ printf("the process BHRequest topics fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+ ret = BHRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &proc_id, &id_len,
+ &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+
+ memset(data_buf, 0x00, sizeof(data_buf));
+ strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1));
+ printf("the process BHRequest topics OKay\n");
+
+ printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id);
+
+ } else {
+
+ BHGetLastError(&errBuf, &size);
+ printf("the process BHRequest2 topics fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+#if !defined(PRO_DE_SERIALIZE)
+ ret = BHPublish(topics_pub_topic1, topics_pub_topic1_data, timeout_ms);
+ if (ret == true) {
+ printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic1, topics_pub_topic1_data);
+
+ } else {
+ printf("the process published1 fail\n");
+ };
+
+ ret = BHPublish(topics_pub_topic2, topics_pub_topic2_data, timeout_ms);
+ if (ret == true) {
+ printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic2, topics_pub_topic2_data);
+
+ } else {
+ printf("the process published2 fail\n");
+ };
+#endif
+
+ memset(data_buf, 0x00, sizeof(data_buf));
+ strcpy(data_buf, "query the process");
+ ret = BHQueryProcs(NULL, 0, data_buf, strlen(data_buf), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process query all the process data OKay\n");
+
+ parseQueryProcBuf(buf_temp, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process query proc fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+#if 1
+ while(1) {
+ sleep(1);
+ }
+#else
+ /*if the process will exit finally, we must call BHUnregister to release the resources*/
+ ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process unregistered OKay\n");
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process unregistered fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ BHFree(buf_temp, size);
+
+#endif
+
+ return 0;
+
+}
+
+
diff --git a/test_socket/bus_test_server_mode.cpp b/test_socket/bus_test_server_mode.cpp
new file mode 100644
index 0000000..71f13c9
--- /dev/null
+++ b/test_socket/bus_test_server_mode.cpp
@@ -0,0 +1,122 @@
+#include "bus_server_socket.h"
+#include "shm_mod_socket.h"
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include "mm.h"
+#include "logger_factory.h"
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#include "bus_error.h"
+
+#include "bh_api.h"
+#include "proc_def.h"
+
+static ProcInfo proc_desc;
+
+int main(int argc, char *argv[]) {
+ int ret;
+ void *buf_temp = NULL;
+ void *errBuf = NULL;
+ void *proc_id = NULL;
+ void *src = NULL;
+ void *data_buf = NULL;
+ char topics_buf[200] = { 0x00 };
+ int size;
+ int id_len;
+ int count = 0;
+ const int timeout_ms = 3000;
+
+ memset(&proc_desc, 0x00, sizeof(ProcInfo));
+ strncpy(proc_desc.proc_id, "Hello", strlen("Hello"));
+ strncpy(proc_desc.name, "World", strlen("World"));
+ strncpy(proc_desc.public_info, "Good", strlen("Good") + 1);
+
+ const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1";
+ const char *topics_server_specific_reg_buf2 = "Server Specific Hello World";
+
+ ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered OKay\n");
+
+ BHFree(buf_temp, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+
+ printf("the process registered fail with error: %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+
+ ret = BHRegisterTopics(topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered topics OKay\n");
+
+ BHFree(buf_temp, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered1 fail with errorL %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+
+ ret = BHRegisterTopics(topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &buf_temp, &size, timeout_ms);
+ if (ret == true) {
+ printf("the process registered topics OKay\n");
+
+ BHFree(buf_temp, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("the process registered1 fail with errorL %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+
+ while(true) {
+ ret = BHReadRequest(&proc_id, &id_len, &buf_temp, &size, &src, -1);
+ if (ret == true) {
+
+ strncpy(topics_buf, (char *)buf_temp, (sizeof(topics_buf) - 1) > size ? size : (sizeof(topics_buf) - 1));
+ printf("Get data(%s)", topics_buf);
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ strncpy(topics_buf, (char *)proc_id, (sizeof(topics_buf) - 1) > id_len ? id_len : (sizeof(topics_buf) - 1));
+ printf("proc id(%s)", topics_buf);
+
+ BHFree(buf_temp, size);
+ BHFree(proc_id, id_len);
+
+ memset(topics_buf, 0x00, sizeof(topics_buf));
+ sprintf(topics_buf, "Get data count: %d", ++count);
+ ret = BHSendReply(src, topics_buf, strlen(topics_buf));
+ if (ret == true) {
+
+ printf("the process send reply OKay\n");
+
+ } else {
+
+ BHGetLastError(&errBuf, &size);
+ printf("the process send reply fail with errorL %s\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+
+ BHFree(src, size);
+
+ } else {
+ BHGetLastError(&errBuf, &size);
+ printf("BHReadRequest fail with error(%s)\n", (char *)errBuf);
+
+ BHFree(errBuf, size);
+ };
+ }
+
+ return 0;
+
+}
+
+
--
Gitblit v1.8.0