From b861de29176891657cc96631ddbfb4ea9e114a42 Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期一, 30 八月 2021 17:52:23 +0800 Subject: [PATCH] re-structure the communication work flow. --- src/shm/hashtable.h | 2 CMakeLists.txt | 8 src/bh_api.h | 28 src/socket/bus_server_socket.h | 9 src/proc_def.h | 70 + src/socket/shm_mod_socket.cpp | 150 ++ src/bus_proxy_start.cpp | 127 ++ src/queue/lock_free_queue.h | 24 src/socket/bus_server_socket_wrapper.cpp | 4 src/shm/mm.cpp | 4 src/socket/shm_socket.h | 15 src/svsem.cpp | 1 build.sh | 2 src/bus_error.cpp | 10 src/net/net_mod_server_socket.cpp | 3 src/net/net_mod_socket.cpp | 29 src/socket/shm_socket.cpp | 63 src/bus_def.h | 7 test_socket/bus_test_inter.cpp | 502 ++++++++ test_socket/CMakeLists.txt | 14 src/socket/shm_mod_socket.h | 13 src/queue/shm_queue.h | 4 src/socket/bus_server_socket.cpp | 534 ++++++++ test_socket/bus_test_server_mode.cpp | 122 ++ src/shm/shm_mm.h | 4 src/net/net_mod_socket_wrapper.cpp | 27 src/net/net_mod_socket_wrapper.h | 3 src/net/net_mod_socket.h | 8 src/CMakeLists.txt | 20 src/bus_error.h | 4 src/bh_api.cpp | 1619 +++++++++++++++++++++++++++ 31 files changed, 3,300 insertions(+), 130 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9af847a..049e0b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,8 @@ option(BUILD_SHARED_LIBS "Build using shared libraries" ON) +add_compile_options(-fPIC) + option(BUILD_DOC "Build doc" OFF) @@ -30,5 +32,9 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/test) add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) -# add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) + include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto) + #add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) endif() + +add_definitions("-DPROTOBUF_USS_DLLS") + diff --git a/build.sh b/build.sh index 455436e..ea2dcde 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ BUILD_TYPE="Debug" BUILD_DOC="OFF" -BUILD_SHARED_LIBS="OFF" +BUILD_SHARED_LIBS="ON" function usage() { echo "build.sh [release | debug | doc]" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 61fbc17..eb3f4c9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,9 @@ # to the source code configure_file(bus_config.h.in bus_config.h) +#set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON) +add_compile_options(-fPIC) +#target_compile_options(shm_queue PRIVATE -fPIC) list(APPEND _SOURCES_ ./logger_factory.cpp @@ -16,31 +19,33 @@ ./bus_error.cpp ./futex_sem.cpp ./svsem.cpp +./bh_api.cpp ./net/net_conn_pool.cpp ./net/net_mod_server_socket_wrapper.cpp ./net/net_mod_socket_wrapper.cpp ./net/net_mod_socket.cpp ./net/net_mod_socket_io.cpp ./net/net_mod_server_socket.cpp +./proto/bhome_msg_api.pb.cc +./proto/bhome_msg.pb.cc +./proto/error_msg.pb.cc ./shm/shm_mm_wrapper.cpp ./shm/mm.cpp ./shm/hashtable.cpp ./shm/shm_mm.cpp -./bh_api.cc -./proto/bhome_msg.pb.cc -./proto/bhome_msg_api.pb.cc -./proto/error_msg.pb.cc ) if (BUILD_SHARED_LIBS) add_library(shm_queue SHARED ${_SOURCES_}) + target_compile_options(shm_queue PRIVATE -fPIC) + set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON) else() add_library(shm_queue STATIC ${_SOURCES_}) endif() # STATIC SHARED -# add_library(shm_queue ${_SOURCES_}) +#add_library(shm_queue ${_SOURCES_}) target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} ) @@ -48,11 +53,14 @@ ${PROJECT_BINARY_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/shm + ${CMAKE_CURRENT_SOURCE_DIR}/proto ${CMAKE_CURRENT_SOURCE_DIR}/queue ${CMAKE_CURRENT_SOURCE_DIR}/socket ${CMAKE_CURRENT_SOURCE_DIR}/net ) +add_executable(bus_proxy_start bus_proxy_start.cpp) +target_link_libraries(bus_proxy_start PRIVATE shm_queue ${EXTRA_LIBS} ) target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} ) @@ -74,6 +82,7 @@ ./bus_def.h ./logger_factory.h ./sole.h +./proc_def.h ./queue/linked_lock_free_queue.h ./queue/array_lock_free_queue.h ./queue/shm_queue.h @@ -91,7 +100,6 @@ ./shm/shm_mm_wrapper.h ./shm/shm_allocator.h ./shm/shm_mm.h -./bh_api.h DESTINATION include) diff --git a/src/bh_api.cpp b/src/bh_api.cpp new file mode 100644 index 0000000..9875cfa --- /dev/null +++ b/src/bh_api.cpp @@ -0,0 +1,1619 @@ +#include "net_mod_socket_wrapper.h" +#include "net_mod_server_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" +#include "shm_mm_wrapper.h" +#include "proc_def.h" +#include "usg_common.h" +#include "bh_api.h" +#include <pthread.h> +#include <getopt.h> +#include "bhome_msg_api.pb.h" +#include "bhome_msg.pb.h" +#include "error_msg.pb.h" +#include "proto/bhome_msg.pb.h" +#include "proto/bhome_msg_api.pb.h" + +static Logger *logger = LoggerFactory::getLogger(); + +static int gRun_stat = 0; +static void *gNetmod_socket = NULL; + +static pthread_mutex_t mutex; + +static char errString[100] = { 0x00 }; + +int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + int key; + int count = 0; + void *buf = NULL; + int min = 0; + ProcInfo pData; + +#if defined(PRO_DE_SERIALIZE) + struct _ProcInfo_proto + { + const char *proc_id; + const char *name; + const char *public_info; + const char *private_info; + }_input; + + ::bhome_msg::ProcInfo input; + if(!input.ParseFromArray(proc_info, proc_info_len)) { + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + _input.proc_id = input.proc_id().c_str(); + _input.name = input.name().c_str(); + _input.public_info = input.public_info().c_str(); + _input.private_info = input.private_info().c_str(); + +#else + if ((proc_info == NULL) || (proc_info_len == 0)) { + rv = EBUS_INVALID_PARA; + + memset(errString, 0x90, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } +#endif + + memset(&pData, 0x00, sizeof(ProcInfo)); + if (gRun_stat == 0) { + pthread_mutex_init(&mutex, NULL); + + } else { + logger->error("the process has already registered!\n"); + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { + + gRun_stat = 1; + shm_mm_wrapper_init(SHM_RES_SIZE); + +#if defined(PRO_DE_SERIALIZE) + if (_input.proc_id != NULL) { + count = strlen(_input.proc_id) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.proc_id, _input.proc_id, min); + } + + if (_input.name != NULL) { + count = strlen(_input.name) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.name, _input.name, min); + } + + if (_input.public_info != NULL) { + count = strlen(_input.public_info) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.public_info, _input.public_info, min); + } + + if (_input.private_info != NULL) { + count = strlen(_input.private_info) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.private_info, _input.private_info, min); + } +#else + if (strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) > 0) { + count = strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.proc_id, ((ProcInfo *)proc_info)->proc_id, min); + } + + if (strlen((const char *)(((ProcInfo *)proc_info)->name)) > 0) { + count = strlen((const char *)(((ProcInfo *)proc_info)->name)) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.name, ((ProcInfo *)proc_info)->name, min); + } + + if (strlen((const char *)(((ProcInfo *)proc_info)->public_info)) > 0) { + count = strlen((const char *)(((ProcInfo *)proc_info)->public_info)) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.public_info, ((ProcInfo *)proc_info)->public_info, min); + } + + if (strlen((const char *)(((ProcInfo *)proc_info)->private_info)) > 0) { + count = strlen((const char *)(((ProcInfo *)proc_info)->private_info)) + 1; + min = count > MAX_STR_LEN ? MAX_STR_LEN : count; + strncpy(pData.private_info, ((ProcInfo *)proc_info)->private_info, min); + } +#endif + + gNetmod_socket = net_mod_socket_open(); + hashtable_t *hashtable = mm_get_hashtable(); + key = hashtable_alloc_key(hashtable); + net_mod_socket_bind(gNetmod_socket, key); + + rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + } + +exit_entry: +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgCommonReply mcr; + mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mcr.mutable_errmsg()->set_errstring(errString); + *reply_len = mcr.ByteSizeLong(); + *reply = malloc(*reply_len); + mcr.SerializePartialToArray(*reply, *reply_len); +#else + min = strlen(errString) + 1; + buf = malloc(min) ; + memcpy(buf, errString, strlen(errString)); + *((char *)buf + min - 1) = '\0'; + + *reply = buf; + *reply_len = min; + +#endif + + if (rv == 0) + return true; + + return false; +} + +int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + int min; + void *buf = NULL; + +#if defined(PRO_DE_SERIALIZE) + struct _ProcInfo_proto + { + const char *proc_id; + const char *name; + const char *public_info; + const char *private_info; + }_input; + + ::bhome_msg::ProcInfo input; + + if(!input.ParseFromArray(proc_info, proc_info_len)) { + + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + _input.proc_id = input.proc_id().c_str(); + _input.name = input.name().c_str(); + _input.public_info = input.public_info().c_str(); + _input.private_info = input.private_info().c_str(); +#endif + + if (gRun_stat == 0) { + + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { + rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG); + if (rv == 0) { + + net_mod_socket_close(gNetmod_socket); + + gNetmod_socket = NULL; + + gRun_stat = 0; + + } + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + +exit_entry: +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgCommonReply mcr; + mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mcr.mutable_errmsg()->set_errstring(errString); + *reply_len = mcr.ByteSizeLong(); + *reply = malloc(*reply_len); + mcr.SerializePartialToArray(*reply, *reply_len); +#else + min = strlen(errString) + 1; + buf = malloc(min) ; + memcpy(buf, errString, strlen(errString)); + *((char *)buf + min - 1) = '\0'; + + *reply = buf; + *reply_len = min; +#endif + + if (rv == 0) + return true; + + return false; +} + +int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + int min, i; + void *buf = NULL; + int total = 0; + int count = 0; + char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; + +#if defined(PRO_DE_SERIALIZE) + struct _MsgTopicList + { + int amount; + const char *topics[MAX_STR_LEN]; + }_input; + + ::bhome_msg::MsgTopicList input; + if(!input.ParseFromArray(topics, topics_len)) { + + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + _input.amount = input.topic_list_size(); + if (_input.amount > MAX_STR_LEN) { + _input.amount = MAX_STR_LEN; + } + + for(int i = 0; i < _input.amount; i++) { + _input.topics[i] = input.topic_list(i).c_str(); + } +#else + if ((topics == NULL) || (topics_len == 0)) { + + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { +#if defined(PRO_DE_SERIALIZE) + total = sizeof(topics_buf) / sizeof(char); + for (i = 0; i < _input.amount; i++) { + min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i])); + if (min > 0) { + strncpy(topics_buf + count, _input.topics[i], min); + count += min; + + if (total >= strlen(_input.topics[i])) { + total -= strlen(_input.topics[i]); + } + + if ((_input.amount > 1) && (i < (_input.amount - 1))) { + strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC)); + total -= 1; + count++; + } + } else { + topics_buf[strlen(topics_buf) - 1] = '\0'; + } + } + + logger->debug("the parsed compound register topics: %s!\n", topics_buf); +#else + memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len); +#endif + + rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS); + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + } else { + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + +exit_entry: +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgCommonReply mcr; + mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mcr.mutable_errmsg()->set_errstring(errString); + *reply_len = mcr.ByteSizeLong(); + *reply = malloc(*reply_len); + mcr.SerializePartialToArray(*reply, *reply_len); +#else + min = strlen(errString) + 1; + buf = malloc(min) ; + memcpy(buf, errString, strlen(errString)); + *((char *)buf + min - 1) = '\0'; + + *reply = buf; + *reply_len = min; +#endif + + if (rv == 0) + return true; + + return false; +} + +int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + int min; + void *buf = NULL; + int size; + char topics_buf[MAX_STR_LEN] = { 0x00 }; + ProcInfo_query *ptr = NULL; + ProcInfo *Proc_ptr = NULL; + +#if defined(PRO_DE_SERIALIZE) + struct _BHAddress + { + unsigned long long mq_id; + long long abs_addr; + const char *ip; + int port; + }_input0; + + const char *_input1; + + ::bhome_msg::BHAddress input0; + ::bhome_msg::MsgQueryTopic input1; + if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len)) { + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + _input0.mq_id = input0.mq_id(); + _input0.abs_addr = input0.abs_addr(); + _input0.ip = input0.ip().c_str(); + _input0.port = input0.port(); + _input1 = input1.topic().c_str(); + +#else + if ((topic == NULL) || (topic_len == 0)) { + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { + +#if defined(PRO_DE_SERIALIZE) + min = (strlen(_input1) > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : strlen(_input1)); + strncpy(topics_buf, _input1, min); +#else + min = (topic_len > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : topic_len); + buf = const_cast<void *>(topic); + strncpy(topics_buf, (const char *)buf, min); +#endif + rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS); + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + +exit_entry: +#if defined(PRO_DE_SERIALIZE) + + struct _MsgQueryTopicReply + { + std::string proc_id; + + unsigned long long mq_id; + long long abs_addr; + std::string ip; + int port; + }mtr_list[128]; + int mtr_list_num = 0; + + if (rv == 0) { + + ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); + mtr_list_num = ptr->num; + + if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { + mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); + } + + for(int i = 0; i < mtr_list_num; i++) { + mtr_list[i].proc_id = ptr->procData.proc_id; + mtr_list[i].mq_id = 0x00; + mtr_list[i].abs_addr = 0x00; + mtr_list[i].ip = "192.168.1.1"; + mtr_list[i].port = 5000; + } + } + + ::bhome_msg::MsgQueryTopicReply mtr; + mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mtr.mutable_errmsg()->set_errstring(errString); + for(int i = 0; i < mtr_list_num; i++) + { + ::bhome_msg::MsgQueryTopicReply_BHNodeAddress *mtrb = mtr.add_node_address(); + mtrb->set_proc_id(mtr_list[i].proc_id); + mtrb->mutable_addr()->set_mq_id(mtr_list[i].mq_id); + mtrb->mutable_addr()->set_abs_addr(mtr_list[i].abs_addr); + mtrb->mutable_addr()->set_ip(mtr_list[i].ip); + mtrb->mutable_addr()->set_port(mtr_list[i].port); + } + + *reply_len = mtr.ByteSizeLong(); + *reply = malloc(*reply_len); + mtr.SerializePartialToArray(*reply, *reply_len); +#else + if (rv == 0) { + *reply = buf; + *reply_len = size; + + } else { + min = strlen(errString) + 1; + buf = malloc(min) ; + memcpy(buf, errString, strlen(errString)); + *((char *)buf + min - 1) = '\0'; + + *reply = buf; + *reply_len = min; + } + +#endif + + if (rv == 0) + return true; + + return false; +} + +int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + void *buf = NULL; + int size; + int min; + ProcInfo_sum *Proc_ptr = NULL; + char data_buf[MAX_STR_LEN] = { 0x00 }; + +#if defined(PRO_DE_SERIALIZE) + struct _BHAddress + { + unsigned long long mq_id; + long long abs_addr; + const char *ip; + int port; + }_input0; + + const char *_input1; + + ::bhome_msg::BHAddress input0; + ::bhome_msg::MsgQueryProc input1; + if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + _input0.mq_id = input0.mq_id(); + _input0.abs_addr = input0.abs_addr(); + _input0.ip = input0.ip().c_str(); + _input0.port = input0.port(); + _input1 = input1.proc_id().c_str(); +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { + + if (query != NULL) { + strncpy(data_buf, (char *)query, (sizeof(data_buf) - 1) > query_len ? query_len : (sizeof(data_buf) - 1)); + } + + rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS); + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + +exit_entry: +#if defined(PRO_DE_SERIALIZE) + struct _MsgQueryProcReply + { + std::string proc_id; + std::string name; + std::string public_info; + std::string private_info; + + bool online; + + std::string topic_list[128]; + int topic_list_num; + + } mpr_list[128]; + int mpr_list_num = 0; + + if (rv == 0) { + + mpr_list_num = *(int *)buf; + + if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) { + mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]); + } + + Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); + for(int i = 0; i < mpr_list_num; i++) { + mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id; + mpr_list[i].name = (Proc_ptr + i)->procData.name; + mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info; + mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info; + mpr_list[i].online = (Proc_ptr + i)->stat; + mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num; + + for(int j = 0; j < mpr_list[i].topic_list_num; j++) + { + if (j == 0) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info; + } else if (j == 1) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info; + } else if (j == 2) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info; + } + } + } + + ::bhome_msg::MsgQueryProcReply mpr; + mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mpr.mutable_errmsg()->set_errstring(errString); + + for(int i = 0; i < mpr_list_num; i++) + { + ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list(); + mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id); + mpri->mutable_proc()->set_name(mpr_list[i].name); + mpri->mutable_proc()->set_public_info(mpr_list[i].public_info); + mpri->mutable_proc()->set_private_info(mpr_list[i].private_info); + mpri->set_online(mpr_list[i].online); + for(int j = 0; j < mpr_list[i].topic_list_num; j++) + { + mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]); + } + } + + *reply_len = mpr.ByteSizeLong(); + *reply = malloc(*reply_len); + mpr.SerializePartialToArray(*reply,*reply_len); + } +#else + if (rv == 0) { + *reply = buf; + *reply_len = size; + } else { + min = strlen(errString) + 1; + buf = malloc(min) ; + memcpy(buf, errString, strlen(errString)); + *((char *)buf + min - 1) = '\0'; + + *reply = buf; + *reply_len = min; + } +#endif + + if (rv == 0) + return true; + + return false; + +} + +int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + int sec, nsec; + int total = 0; + int count = 0; + int min, i; + void *buf = NULL; + char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; + +#if defined(PRO_DE_SERIALIZE) + struct _MsgTopicList + { + int amount; + const char *topics[MAX_STR_LEN]; + }_input; + + ::bhome_msg::MsgTopicList input; + if(!input.ParseFromArray(topics, topics_len)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + _input.amount = input.topic_list_size(); + + if (_input.amount > MAX_STR_LEN) { + _input.amount = MAX_STR_LEN; + } + + for(int i = 0; i < _input.amount; i++) + _input.topics[i] = input.topic_list(i).c_str(); + +#else + if ((topics == NULL) || (topics_len == 0)) { + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + goto exit_entry; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { +#if defined(PRO_DE_SERIALIZE) + total = sizeof(topics_buf) / sizeof(char); + for (i = 0; i < _input.amount; i++) { + min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i])); + if (min > 0) { + strncpy(topics_buf + count, _input.topics[i], min); + count += min; + + if (total >= strlen(_input.topics[i])) { + total -= strlen(_input.topics[i]); + } + + if ((_input.amount > 1) && (i < (_input.amount - 1))) { + strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC)); + total -= 1; + count++; + } + } else { + topics_buf[strlen(topics_buf) - 1] = '\0'; + } + } + logger->debug("the parsed compound sub topics: %s!\n", topics_buf); +#else + memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len); +#endif + + if (timeout_ms > 0) { + + sec = timeout_ms / 1000; + nsec = (timeout_ms - sec * 1000) * 1000 * 1000; + rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec); + + } else if (timeout_ms == 0) { + + rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1); + + } else { + + rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1); + + } + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + +exit_entry: +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgCommonReply mcr; + mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mcr.mutable_errmsg()->set_errstring(errString); + *reply_len=mcr.ByteSizeLong(); + *reply=malloc(*reply_len); + mcr.SerializePartialToArray(*reply,*reply_len); +#else + min = strlen(errString) + 1; + buf = malloc(min) ; + memcpy(buf, errString, strlen(errString)); + *((char *)buf + min - 1) = '\0'; + + *reply = buf; + *reply_len = min; +#endif + + if (rv == 0) + return true; + + return false; + +} + +int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv = BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms); + + return rv; +} + +int BHHeartbeatEasy(const int timeout_ms) +{ + + return true; +} + +int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + +#if defined(PRO_DE_SERIALIZE) + struct _ProcInfo_proto + { + const char *proc_id; + const char *name; + const char *public_info; + const char *private_info; + }_input; + + ::bhome_msg::ProcInfo input; + if(!input.ParseFromArray(proc_info,proc_info_len)) { + + rv = EBUS_INVALID_PARA; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + _input.proc_id = input.proc_id().c_str(); + _input.name = input.name().c_str(); + _input.public_info = input.public_info().c_str(); + _input.private_info = input.private_info().c_str(); + + rv = 0; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + ::bhome_msg::MsgCommonReply mcr; + mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mcr.mutable_errmsg()->set_errstring(errString); + *reply_len=mcr.ByteSizeLong(); + *reply=malloc(*reply_len); + mcr.SerializePartialToArray(*reply,*reply_len); +#endif + + return true; +} + +#if defined(PRO_DE_SERIALIZE) +int BHPublish(const char *msgpub, const char msgpub_len, const int timeout_ms) +#else +int BHPublish(const char *topic, const char *content, const int timeout_ms) +#endif +{ + int rv; + int min; + void *buf = NULL; + net_node_t node_arr; + int node_arr_len = 0; + +#if defined(PRO_DE_SERIALIZE) + struct _MsgPublish + { + const char *topic; + const char *data; + }_input; + + ::bhome_msg::MsgPublish input; + if(!input.ParseFromArray(msgpub, msgpub_len)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + _input.topic = input.topic().c_str(); + _input.data = input.data().c_str(); +#else + if ((topic == NULL) || (content == NULL)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { +#if defined(PRO_DE_SERIALIZE) + if (timeout_ms > 0) { + rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data), timeout_ms); + } else if (timeout_ms == 0) { + rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data)); + + } else { + + rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data)); + } +#else + if (timeout_ms > 0) { + rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content), timeout_ms); + + } else if (timeout_ms == 0) { + rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content)); + + } else { + rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content)); + } +#endif + + pthread_mutex_unlock(&mutex); + + if (rv > 0) + return true; + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + + return false; +} + +int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) +{ + int rv; + int len; + void *buf; + int key; + int size; + int sec, nsec; + char topics_buf[MAX_STR_LEN] = { 0x00 }; + char data_buf[MAX_STR_LEN * 3] = { 0x00 }; + + struct _ReadSubReply + { + std::string proc_id; + std::string topic; + std::string data; + } rsr; + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + if (timeout_ms > 0) { + sec = timeout_ms / 1000; + nsec = (timeout_ms - sec * 1000) * 1000 * 1000; + + rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); + + } else if (timeout_ms == 0) { + + rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); + + } else { + + rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); + } + + if (rv == 0) { + + len = strlen((char *)buf); + if (len > size) { + len = size; + } + strncpy(topics_buf, (char *)buf, len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : len); + + if (len < size) { + len = strlen(topics_buf) + 1; + + strncpy(data_buf, (char *)buf + len, size - len); + } + + free(buf); + +#if defined(PRO_DE_SERIALIZE) + rsr.topic = topics_buf; + rsr.data = data_buf; + + memset(topics_buf, 0x00, sizeof(topics_buf)); + sprintf(topics_buf, "%d", key); + + rsr.proc_id = topics_buf; + *proc_id_len = rsr.proc_id.size(); + *proc_id = malloc(*proc_id_len); + memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len); + + ::bhome_msg::MsgPublish Mp; + Mp.set_topic(rsr.topic); + Mp.set_data(rsr.data.data()); + *msgpub_len = Mp.ByteSizeLong(); + *msgpub = malloc(*msgpub_len); + Mp.SerializePartialToArray(*msgpub, *msgpub_len); +#else + void *ptr; + if (len < size) { + ptr = malloc(size - len); + len = size - len; + memcpy(ptr, data_buf, len); + } else { + ptr = malloc(len); + memcpy(ptr, topics_buf, len); + } + *msgpub = ptr; + *msgpub_len = len; + + memset(topics_buf, 0x00, sizeof(topics_buf)); + sprintf(topics_buf, "%d", key); + + *proc_id_len = strlen(topics_buf); + *proc_id = malloc(*proc_id_len); + memcpy(*proc_id, topics_buf, *proc_id_len); + +#endif + + pthread_mutex_unlock(&mutex); + + } else { + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + + if (rv == 0) + return true; + + return false; +} + +int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len) +{ + int rv; + void *buf; + int size; + int val; + int len; + int min; + int sec, nsec; + std::string MsgID; + int timeout_ms = 3000; + char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; + +#if defined(PRO_DE_SERIALIZE) + struct _BHAddress + { + unsigned long long mq_id; + long long abs_addr; + const char *ip; + int port; + }_input0; + + struct MsgRequestTopic + { + const char *topic; + const char *data; + }_input1; + + ::bhome_msg::BHAddress input0; + ::bhome_msg::MsgRequestTopic input1; + if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + _input0.mq_id = input0.mq_id(); + _input0.abs_addr = input0.abs_addr(); + _input0.ip = input0.ip().c_str(); + _input0.port = input0.port(); + _input1.topic = input1.topic().c_str(); + _input1.data = input1.data().c_str(); + +#else + if ((request == NULL) || (request_len == 0)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { +#if defined(PRO_DE_SERIALIZE) + strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1)); +#else + strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1)); +#endif + + rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS); + if (rv == 0) { + + val = atoi((char *)buf); + + free(buf); + + if (val > 0) { + + len = strlen(topics_buf); +#if defined(PRO_DE_SERIALIZE) + min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len ); + strncpy(topics_buf + len + 1, _input1.data, min); + len += (min + 1); +#endif + if (timeout_ms > 0) { + + sec = timeout_ms / 1000; + nsec = (timeout_ms - sec * 1000) * 1000 * 1000; + + rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec); + + } else if (timeout_ms == 0) { + + rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val); + + } else { + + rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); + } + } else { + + rv = EBUS_RES_UNSUPPORT; + + } + } + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + + if((msg_id == NULL) || (msg_id_len == NULL)) { + if (rv == 0) + return true; + + return false; + } + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + + if (rv == 0) { + memset(topics_buf, 0x00, sizeof(topics_buf)); + sprintf(topics_buf, "%d", val); + MsgID = topics_buf; + + *msg_id_len = MsgID.size(); + *msg_id = malloc(*msg_id_len); + memcpy(*msg_id, MsgID.data(), *msg_id_len); + + return true; + } + + return false; + +} + +int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len, + void **reply, int *reply_len, const int timeout_ms) +{ + int rv; + void *buf; + int size; + int val; + int min, len; + net_node_t node; + int node_size; + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + net_mod_err_t *errarr; + int errarr_size = 0; + int sec, nsec; + char topics_buf[MAX_STR_LEN] = { 0x00 }; + +#if defined(PRO_DE_SERIALIZE) + struct _BHAddress + { + unsigned long long mq_id; + long long abs_addr; + const char *ip; + int port; + }_input0; + + struct _MsgRequestTopic + { + const char *topic; + const char *data; + }_input1; + + ::bhome_msg::BHAddress input0; + ::bhome_msg::MsgRequestTopic input1; + if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + _input0.mq_id = input0.mq_id(); + _input0.abs_addr = input0.abs_addr(); + _input0.ip = input0.ip().c_str(); + _input0.port = input0.port(); + _input1.topic = input1.topic().c_str(); + _input1.data = input1.data().c_str(); + +#else + if ((request == NULL) || (request_len == 0)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { +#if defined(PRO_DE_SERIALIZE) + strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1)); +#else + strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1)); +#endif + + rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS); + if (rv == 0) { + val = atoi((char *)buf); + + free(buf); + + if (val > 0) { + memset(&node, 0x00, sizeof(node)); + + len = strlen(topics_buf); +#if defined(PRO_DE_SERIALIZE) + min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len ); + strncpy(topics_buf + len + 1, _input1.data, min); + len += (min + 1); +#endif + + node.key = val; + rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size); + if (rv > 0) { + if (recv_arr_size > 0) { + + node.key = recv_arr[0].key; + + memset(topics_buf, 0x00, sizeof(topics_buf)); + size = recv_arr[0].content_length; + buf = (char *)malloc(size); + strncpy((char *)buf, (char *)recv_arr[0].content, size); +#if !defined(PRO_DE_SERIALIZE) + *reply = buf; + *reply_len = size; +#endif + } + + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + + if(errarr_size > 0) { + free(errarr); + } + + rv = 0; + + } else { + rv = EBUS_TIMEOUT; + } + + } else { + rv = EBUS_RES_UNSUPPORT; + } + } + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + struct _RequestReply + { + std::string proc_id; + std::string data; + }rr; + + if (rv == 0) { + memset(topics_buf, 0x00, sizeof(topics_buf)); + sprintf(topics_buf, "%d", node.key); + + rr.proc_id = topics_buf; + *proc_id_len = rr.proc_id.size(); + *proc_id = malloc(*proc_id_len); + memcpy(*proc_id, rr.proc_id.data(), *proc_id_len); + + memset(topics_buf, 0x00, sizeof(topics_buf)); + memcpy(topics_buf, buf, size); + rr.data = topics_buf; + +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgRequestTopicReply mrt; + mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); + mrt.mutable_errmsg()->set_errstring(errString); + mrt.set_data(rr.data.data()); + *reply_len = mrt.ByteSizeLong(); + *reply = malloc(*reply_len); + mrt.SerializePartialToArray(*reply, *reply_len); +#endif + } + + pthread_mutex_unlock(&mutex); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + + if (rv == 0) + return true; + + return false; +} + +int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) +{ + int rv; + void *buf; + int key; + int size; + int sec, nsec; + char topics_buf[MAX_STR_LEN] = { 0x00 }; + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { + if (timeout_ms > 0) { + + sec = timeout_ms / 1000; + nsec = (timeout_ms - sec * 1000) * 1000 * 1000; + + rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); + + } else if (timeout_ms == 0) { + + rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); + + } else { + + rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); + } + + if (rv == 0) { + struct _ReadRequestReply + { + std::string proc_id; + std::string topic; + std::string data; + void *src; + } rrr; + + sprintf(topics_buf, "%d", key); + rrr.proc_id = topics_buf; + + *proc_id_len = rrr.proc_id.size(); + *proc_id = malloc(*proc_id_len); + memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len); + + memset(topics_buf, 0x00, sizeof(topics_buf)); + memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size); + rrr.topic = topics_buf; + rrr.data = topics_buf; + +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgRequestTopic mrt; + mrt.set_topic(rrr.topic); + mrt.set_data(rrr.data.data()); + *request_len = mrt.ByteSizeLong(); + *request = malloc(*request_len); + mrt.SerializePartialToArray(*request,*request_len); +#else + *request = buf; + *request_len = size; +#endif + + buf = malloc(sizeof(int)); + *(int *)buf = key; + *src = buf; + } + + pthread_mutex_unlock(&mutex); + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + + if (rv == 0) + return true; + + return false; +} + +int BHSendReply(void *src, const void *reply, const int reply_len) +{ + int rv; + +#if defined(PRO_DE_SERIALIZE) + ::bhome_msg::MsgRequestTopicReply input; + if (!input.ParseFromArray(reply, reply_len)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + const char *_input; + _input = input.data().data(); + +#else + if ((src == NULL) || (reply == NULL) || (reply_len == 0)) { + + rv = EBUS_INVALID_PARA; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } +#endif + + if (gRun_stat == 0) { + logger->error("the process has not been registered yet!\n"); + + rv = EBUS_RES_NO; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + return false; + } + + rv = pthread_mutex_trylock(&mutex); + if (rv == 0) { + + rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src); + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + pthread_mutex_unlock(&mutex); + } else { + + rv = EBUS_RES_BUSY; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + } + + if (rv == 0) + return true; + + return false; +} + +int BHCleanup() { + + return true; +} + +void BHFree(void *buf, int size) { + free(buf); +} + +int BHGetLastError(void **msg, int *msg_len) +{ + void *buf = NULL; + + buf = malloc(strlen(errString) + 1); + + memset(buf, 0x00, strlen(errString) + 1); + memcpy(buf, errString, strlen(errString)); + + if ((msg != NULL) && (msg_len != NULL)) { + *msg = buf; + *msg_len = strlen(errString); + + return true; + } + + return false; + +} diff --git a/src/bh_api.h b/src/bh_api.h index 75a9c17..40d9ffa 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -1,9 +1,11 @@ -#ifndef BH_API -#define BH_API +#ifndef _BH_API_WRAPPER_ +#define _BH_API_WRAPPER_ #ifdef __cplusplus extern "C" { #endif + +#define PRO_DE_SERIALIZE 1 int BHRegister(const void *proc_info, const int proc_info_len, @@ -17,15 +19,19 @@ int *reply_len, const int timeout_ms); + int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms); -int BHQueryTopicAddress(const void *remote, const int remote_len, - const void *topic, const int topic_len, - void **reply, int *reply_len, +int BHQueryTopicAddress(const void *remote, + const int remote_len, + const void *topics, + const int topics_len, + void **reply, + int *reply_len, const int timeout_ms); int BHQueryProcs(const void *remote, @@ -41,6 +47,7 @@ void **reply, int *reply_len, const int timeout_ms); + int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, @@ -54,9 +61,13 @@ int *reply_len, const int timeout_ms); +#if defined(PRO_DE_SERIALIZE) int BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms); +#else +int BHPublish(const char *topic, const char *content, const int timeout_ms); +#endif int BHReadSub(void **proc_id, int *proc_id_len, @@ -96,7 +107,12 @@ void BHFree(void *buf, int size); +int BHGetLastError(void **msg, int *msg_len); + #ifdef __cplusplus } #endif -#endif +#endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */ + + + diff --git a/src/bus_def.h b/src/bus_def.h index 78a7eb9..9634883 100644 --- a/src/bus_def.h +++ b/src/bus_def.h @@ -4,4 +4,11 @@ #define BUS_TIMEOUT_FLAG 1 #define BUS_NOWAIT_FLAG 1 << 1 +#define SHM_RES_SIZE 512 + +#define SHM_BUS_PROC_MAP_KEY 10 +#define SHM_BUS_PROC_TCS_MAP_KEY 11 +#define SHM_BUS_TCS_MAP_KEY 20 +#define SHM_BUS_PROC_PART_MAP_KEY 30 + #endif \ No newline at end of file diff --git a/src/bus_error.cpp b/src/bus_error.cpp index 913a771..49244cd 100644 --- a/src/bus_error.cpp +++ b/src/bus_error.cpp @@ -20,7 +20,11 @@ "Send to self error", "Receive from wrong end", "Service stoped", - "Exceed resource limit" + "Exceed resource limit", + "Service not supported", + "Resource busy", + "Resource not provide", + "Invalid parameters" }; @@ -50,6 +54,10 @@ char *buf; /* Make first caller allocate key for thread-specific data */ + if (err == 0) { + err = EBUS_BASE; + } + s = pthread_once(&once, createKey); if (s != 0) err_exit(s, "pthread_once"); diff --git a/src/bus_error.h b/src/bus_error.h index 769aa36..84f2e89 100644 --- a/src/bus_error.h +++ b/src/bus_error.h @@ -13,6 +13,10 @@ #define EBUS_RECVFROM_WRONG_END 506 #define EBUS_STOPED 507 #define EBUS_EXCEED_LIMIT 508 +#define EBUS_RES_UNSUPPORT 509 +#define EBUS_RES_BUSY 510 +#define EBUS_RES_NO 511 +#define EBUS_INVALID_PARA 512 extern int bus_errno; diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp new file mode 100644 index 0000000..a04edad --- /dev/null +++ b/src/bus_proxy_start.cpp @@ -0,0 +1,127 @@ +#include "net_mod_server_socket_wrapper.h" +#include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + +#include "shm_mm_wrapper.h" +#include "usg_common.h" +#include <signal.h> +#include <limits.h> +#include <stdio.h> +#include <errno.h> +#include <getopt.h> +#include <stdlib.h> + +#define SVR_PORT 5000 + +#define TOTAL_THREADS 2 + +static void *gBusServer_socket = NULL; +static void *gServer_socket = NULL; + +static int gShm_size = -1; +static int gPort = -1; + +static int gBusServer_act = 0; +static int gBusServer_stat = 0; + +pthread_t tids[2]; +void *res[2]; + +void *bus_start(void *skptr) { + + gBusServer_act = 1; + gBusServer_socket = bus_server_socket_wrapper_open(); + if (bus_server_socket_wrapper_start_bus(gBusServer_socket) != 0) { + printf("start bus failed\n"); + gBusServer_stat = -1; + } + + return NULL; +} + + + + +void *svr_start(void *skptr) { + int port = *(int *)skptr; + + gServer_socket = net_mod_server_socket_open(port); + if(net_mod_server_socket_start(gServer_socket) != 0) { + printf("start net mod server failed\n"); + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + char *endptr; + char i; + int val; + + sigset_t mask_all, pre; + sigfillset(&mask_all); + + sigprocmask(SIG_BLOCK, &mask_all, &pre); + + if (argc >= 4) { + fprintf(stderr, "Usage: %s [shm size] [server port]\n", argv[0]); + exit(0); + }; + + if (argc >= 2) { + argc -= 2; + for (i = 0; i <= argc; i++) { + errno = 0; + val = strtol(argv[i + 1], &endptr, 10); + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) + || (errno != 0 && val == 0)) { + fprintf(stderr, "invalid parameter: %s\n", argv[i + 1]); + exit(0); + } + + if (endptr == argv[i + 1]) { + fprintf(stderr, "invalid parameter %s: No digits were found\n", argv[i + 1]); + exit(0); + } + + if (i == 0) { + gShm_size = val; + } else { + gPort = val; + } + } + } + + if (gShm_size == -1) { + gShm_size = SHM_RES_SIZE; + } + shm_mm_wrapper_init(SHM_RES_SIZE); + + pthread_create(&tids[0], NULL, bus_start, NULL); + + if (gPort == -1) { + gPort = SVR_PORT; + } + + while(gBusServer_act == 0) { + sleep(1); + } + + if (gBusServer_stat >= 0) { + pthread_create(&tids[1], NULL, svr_start, (void *)&gPort); + } + + for (i = 0; i< TOTAL_THREADS; i++) { + if(pthread_join(tids[i], &res[i]) != 0) { + perror("bus_proxy pthread_join"); + } + } + + bus_server_socket_wrapper_close(gBusServer_socket); + net_mod_socket_close(gServer_socket); + shm_mm_wrapper_destroy(); + + return 0; +} + diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp index 10b0aac..2f733f8 100644 --- a/src/net/net_mod_server_socket.cpp +++ b/src/net/net_mod_server_socket.cpp @@ -181,7 +181,7 @@ } if( ret != 0) { - logger->error("杞彂澶辫触 : NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key, bus_strerror(ret)); + logger->error("fail: NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key, bus_strerror(ret)); // 杞彂澶辫触 response_head.code = ret; response_head.content_length = 0; @@ -277,7 +277,6 @@ FD_CLR(connfd, &pool.read_set); pool.clientfd[i] = -1; logger->debug("===server close client %d\n", connfd); - // printf("===server close client %d\n", connfd); } } diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 6a541d6..ab065eb 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -22,7 +22,7 @@ NetModSocket::~NetModSocket() { - + } @@ -46,6 +46,15 @@ return shmModSocket.force_bind(key); } +int NetModSocket::bind_proc_id(char *buf, int len) { + return shmModSocket.bind_proc_id(buf, len); +} + +int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) { + + return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag); +} + // int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { // return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); @@ -67,7 +76,7 @@ NetConnPool *mpool = (NetConnPool *)_pool; delete mpool; - logger->debug("destory connPool"); + } /* One-time key creation function */ @@ -343,9 +352,6 @@ return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec); } - -// int pub(char *topic, int topic_size, void *content, int content_size, int port); - int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec) { int i, connfd; @@ -363,7 +369,7 @@ net_mod_err_t err_msg; // 鏈湴鍙戦�� - if(node_arr == NULL || arrlen == 0) { + if ((node_arr == NULL) || (arrlen == 0)) { if(msec == 0) { ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); } else if(msec > 0) { @@ -525,7 +531,6 @@ int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); } - int NetModSocket::recvandsend(recvandsend_callback_fn callback, const struct timespec *timeout , int flag, void * user_data ) { @@ -764,23 +769,23 @@ head.mod = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); memcpy(head.host, tmp_ptr, sizeof(head.host)); tmp_ptr += sizeof(head.host); head.port = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.key = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.content_length = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.topic_length = ntohl(GET(tmp_ptr)); - tmp_ptr += 4; + tmp_ptr += sizeof(uint32_t); head.timeout = ntohl(GET_INT32(tmp_ptr)); return head; diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h index 6289fa6..d8e53ae 100644 --- a/src/net/net_mod_socket.h +++ b/src/net/net_mod_socket.h @@ -3,6 +3,7 @@ #include "usg_common.h" #include "shm_mod_socket.h" #include "socket_io.h" +#include "proc_def.h" #include <poll.h> #include "socket_def.h" #include "net_conn_pool.h" @@ -17,7 +18,7 @@ int key; }; -#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 6 * sizeof(uint32_t)) +#define NET_MODE_REQUEST_HEAD_LENGTH sizeof(net_mod_request_head_t) // 璇锋眰澶� @@ -118,8 +119,8 @@ */ int force_bind( int key); - - + int bind_proc_id(char *buf, int len); + int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); /** * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖 @@ -166,7 +167,6 @@ // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec); int recvfrom_nowait( void **buf, int *size, int *key); - /** * 鏈湴鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @key 鍙戦�佺粰璋� diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp index abdcbb7..ab4d59d 100644 --- a/src/net/net_mod_socket_wrapper.cpp +++ b/src/net/net_mod_socket_wrapper.cpp @@ -44,6 +44,14 @@ // return sockt->bind(key); } +int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) +{ + NetModSocket *sockt = (NetModSocket *)_socket; + + return sockt->reg(pData, len, buf, size, timeout_ms, flag); + +} + /** * 鍙戦�佷俊鎭� * @key 鍙戦�佺粰璋� @@ -51,20 +59,17 @@ */ int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) { NetModSocket *sockt = (NetModSocket *)_socket; - logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); return sockt->sendto(buf, size, key); } // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ NetModSocket *sockt = (NetModSocket *)_socket; - logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); return sockt->sendto_timeout(buf, size, key, sec, nsec); // return sockt->sendto(buf, size, key); } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ NetModSocket *sockt = (NetModSocket *)_socket; - logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); return sockt->sendto_nowait(buf, size, key); } @@ -77,22 +82,19 @@ int rv; NetModSocket *sockt = (NetModSocket *)_socket; - logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket)); rv = sockt->recvfrom(buf, size, key); - logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv); return rv; } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ - NetModSocket *sockt = (NetModSocket *)_socket; - // return sockt->recvfrom(buf, size, key); - return sockt->recvfrom_timeout(buf, size, key, sec, nsec); + NetModSocket *sockt = (NetModSocket *)_socket; + return sockt->recvfrom_timeout(buf, size, key, sec, nsec); } int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ - NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->recvfrom_nowait(buf, size, key); + NetModSocket *sockt = (NetModSocket *)_socket; + return sockt->recvfrom_nowait(buf, size, key); } int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, @@ -101,6 +103,11 @@ return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1); } +int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){ + NetModSocket *sockt = (NetModSocket *)_socket; + return sockt->bind_proc_id(proc_id, len); +} + /** * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖 * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉� diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h index b4941c0..b869510 100644 --- a/src/net/net_mod_socket_wrapper.h +++ b/src/net/net_mod_socket_wrapper.h @@ -12,6 +12,7 @@ #define __NET_MOD_SOCKET_H__ #include "net_mod_socket.h" +#include "proc_def.h" #ifdef __cplusplus extern "C" { @@ -55,6 +56,8 @@ */ int net_mod_socket_force_bind(void * _socket, int key); +int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); + /** * @brief 鍙戦�佷俊鎭�,鍙戦�佸畬鎴愭墠杩斿洖 * diff --git a/src/proc_def.h b/src/proc_def.h new file mode 100644 index 0000000..cb4dc0a --- /dev/null +++ b/src/proc_def.h @@ -0,0 +1,70 @@ +#ifndef __PROC_DEF_ +#define __PROC_DEF_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define MAX_STR_LEN 128 //keep the same with serializer in proc check +#define MIN_STR_LEN 10 +#define MAX_PROC_NUM 128 +#define MAX_TOPICS_NUN 60 + +#define PROC_REG 1 +#define PROC_UNREG 2 +#define PROC_REG_TCS 3 +#define PROC_QUE_TCS 4 +#define PROC_QUE_STCS 5 +#define PROC_QUE_ATCS 6 + +#define STR_MAGIC "," + +typedef struct _ProcInfo { +#if 0 + char ServerID[MAX_STR_LEN]; // 鏈哄櫒ID + char BoardID[MAX_STR_LEN]; // 鏉垮崱ID + char ServerIP[MAX_STR_LEN]; // 鏈哄櫒IP + char ProcID[MAX_STR_LEN]; // 杩涚▼鍞竴鏍囪瘑 + char ProcName[MAX_STR_LEN]; // 杩涚▼鍚嶇О + char ProcLabel[MAX_STR_LEN]; // 杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋� +#else + char proc_id[MAX_STR_LEN]; + char name[MAX_STR_LEN]; + char public_info[MAX_STR_LEN]; + char private_info[MAX_STR_LEN]; +#endif +} ProcInfo; + +typedef struct _ProcInfo_sum { + + ProcInfo procData; + + int stat; + char reg_info[MAX_STR_LEN]; + char local_info[MAX_STR_LEN]; + char net_info[MAX_STR_LEN]; + + int list_num; + +} ProcInfo_sum; + +typedef struct _ProcInfo_query { + + char name[MAX_STR_LEN]; + + int num; + + ProcInfo procData; + +} ProcInfo_query; + +#ifdef __cplusplus +} +#endif + + +#endif //end of file + + + + diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index f536208..ada70c6 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -15,7 +15,7 @@ #include "bus_def.h" // default Queue size -#define LOCK_FREE_Q_DEFAULT_SIZE 16 +#define LOCK_FREE_Q_DEFAULT_SIZE 320 #define LOCK_FREE_Q_ST_OPENED 0 @@ -177,6 +177,7 @@ typename Allocator, template<typename T, typename AT> class Q_TYPE> LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) { + //std::cout << "LockFreeQueue init reference=" << reference << std::endl; if (sem_init(&slots, 1, qsize) == -1) err_exit(errno, "LockFreeQueue sem_init"); if (sem_init(&items, 1, 0) == -1) @@ -211,6 +212,7 @@ typename Allocator, template<typename T, typename AT> class Q_TYPE> LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { + // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); if (sem_destroy(&slots) == -1) { err_exit(errno, "LockFreeQueue sem_destroy"); } @@ -249,10 +251,10 @@ typename Allocator, template<typename T, typename AT> class Q_TYPE> int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { - // sigset_t mask_all, pre; - // sigfillset(&mask_all); + sigset_t mask_all, pre; + sigfillset(&mask_all); - // sigprocmask(SIG_BLOCK, &mask_all, &pre); + sigprocmask(SIG_BLOCK, &mask_all, &pre); if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { if (psem_trywait(&slots) == -1) { @@ -271,12 +273,12 @@ if (m_qImpl.push(a_data)) { psem_post(&items); - // sigprocmask(SIG_SETMASK, &pre, NULL); + sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } LABEL_FAILTURE: - // sigprocmask(SIG_SETMASK, &pre, NULL); + sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } @@ -285,10 +287,10 @@ template<typename T, typename AT> class Q_TYPE> int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { - // sigset_t mask_all, pre; - // sigfillset(&mask_all); + sigset_t mask_all, pre; + sigfillset(&mask_all); - // sigprocmask(SIG_BLOCK, &mask_all, &pre); + sigprocmask(SIG_BLOCK, &mask_all, &pre); if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { if (psem_trywait(&items) == -1) { @@ -306,13 +308,13 @@ if (m_qImpl.pop(a_data)) { psem_post(&slots); - // sigprocmask(SIG_SETMASK, &pre, NULL); + sigprocmask(SIG_SETMASK, &pre, NULL); return 0; } LABEL_FAILTURE: - // sigprocmask(SIG_SETMASK, &pre, NULL); + sigprocmask(SIG_SETMASK, &pre, NULL); return errno; } diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 74b9b33..f5d64db 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -40,8 +40,8 @@ bool full(); bool empty(); - int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0); - int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0); + int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); + int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); ELEM_T &operator[](unsigned i); diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h index 6c3cd27..7bb6eac 100755 --- a/src/shm/hashtable.h +++ b/src/shm/hashtable.h @@ -5,7 +5,7 @@ #include <functional> #include <set> -#define MAPSIZE 1024 +#define MAPSIZE 4096 // 鍒涘缓Queue鏁伴噺鐨勪笂闄� #define QUEUE_COUNT_LIMIT 300 diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp index f23fa08..e4ef672 100644 --- a/src/shm/mm.cpp +++ b/src/shm/mm.cpp @@ -256,6 +256,7 @@ first = false; shmid = shmget(SHM_KEY, 0, 0); } + if (shmid == -1) err_exit(errno, "mm_init shmget"); shmp = shmat(shmid, key_addr, 0); @@ -338,8 +339,6 @@ else LoggerFactory::getLogger()->debug("shared memory destroy\n"); - LoggerFactory::getLogger()->debug( "mm_destroy: real destroy."); - SemUtil::inc(mutex); SemUtil::remove(mutex); return true; @@ -363,6 +362,7 @@ void mm_free_by_key(int key) { void *ptr; + ptr = hashtable_get(hashtable, key); if(ptr != NULL) { mm_free(ptr); diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h index 5339d8a..0ca0d08 100644 --- a/src/shm/shm_mm.h +++ b/src/shm/shm_mm.h @@ -8,6 +8,8 @@ #define SHM_QUEUE_ST_CLOSED 1 #define SHM_QUEUE_ST_RECYCLED 2 +#define SHM_QUEUE_ST_SET 50 + struct shm_queue_status_t { int status; @@ -49,4 +51,6 @@ int shm_mm_alloc_key(); +typedef std::map<SHMString, int, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, int> > > ProcDataZone; + #endif diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 7a45696..1646da5 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -1,6 +1,7 @@ #include "bus_server_socket.h" #include "shm_mod_socket.h" +#include "shm_socket.h" #include "bus_error.h" static Logger *logger = LoggerFactory::getLogger(); @@ -12,10 +13,10 @@ SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { cb(subscripter_set, *set_iter); } } @@ -35,7 +36,7 @@ SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { subscripter_set->erase(set_iter); @@ -50,7 +51,6 @@ BusServerSocket::BusServerSocket() { - logger->debug("BusServerSocket Init"); shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); topic_sub_map = NULL; @@ -80,10 +80,13 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int BusServerSocket::start(){ + int rv; + topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); - _run_proxy_(); - return 0; + rv = _run_proxy_(); + + return rv; } @@ -114,7 +117,7 @@ SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; if(subscripter_set != NULL) { subscripter_set->clear(); @@ -126,8 +129,8 @@ shm_mm_free_by_key(SHM_BUS_MAP_KEY); } shm_socket_close(shm_socket); - logger->debug("BusServerSocket destory 3"); - return 0; + + return 0; } /* @@ -135,10 +138,10 @@ */ void BusServerSocket::_proxy_sub( char *topic, int key) { SHMKeySet *subscripter_set; - + + struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; SHMTopicSubMap::iterator map_iter; SHMKeySet::iterator set_iter; -//printf("_proxy_sub topic = %s\n", topic); if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; } else { @@ -147,6 +150,7 @@ topic_sub_map->insert({topic, subscripter_set}); } subscripter_set->insert(key); + } /* @@ -173,7 +177,7 @@ SHMTopicSubMap::iterator map_iter; // SHMKeySet::iterator set_iter; - for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { subscripter_set = map_iter->second; subscripter_set->erase(key); } @@ -182,7 +186,7 @@ /* * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� */ -void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) { +void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; @@ -198,7 +202,7 @@ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { send_key = *set_iter; rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); if(rv == 0) { @@ -209,7 +213,7 @@ } // 鍒犻櫎宸插叧闂殑绔� - for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { + for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) { if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { subscripter_set->erase(set_iter); logger->debug("remove closed subscripter %d \n", send_key); @@ -218,13 +222,458 @@ subscripter_to_del.clear(); } + } +ProcInfo_query *Qurey_object(const char *object, int *length) { + int flag = 0; + int val; + int len; + int total = 0; + ProcInfo *Proc_ptr = NULL; + ProcInfo Data_stru; + ProcInfo_query *dataBuf = NULL; + SvrProc *SvrSub_ele; + SvrTcs::iterator svr_tcs_iter; + SvrProc::iterator svr_proc_iter; + ProcZone::iterator proc_iter; + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { + val = *svr_proc_iter; + + if ((proc_iter = proc->find(val)) != proc->end()) { + + if (dataBuf == NULL) { + dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query)); + if (dataBuf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + total = sizeof(ProcInfo_query); + } + + if (flag == 0) { + memset(dataBuf, 0x00, sizeof(ProcInfo_query)); + + dataBuf->num = 1; + strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1); + + flag = 1; + + } else { + dataBuf->num++; + len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1); + dataBuf = (ProcInfo_query *)realloc(dataBuf, len); + if (dataBuf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + total += sizeof(ProcInfo); + memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo)); + } + + memset(&Data_stru, 0x00, sizeof(ProcInfo)); + Data_stru = proc_iter->second; + + Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1; + strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1); + strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1); + strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1); + strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1); + + if (length != NULL) + *length = total; + } + } + } + + return dataBuf; +} + +void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) +{ + char buf_temp[MAX_STR_LEN] = { 0x00 }; + int count = 0; + int i = 0; + int len = 0; + char *data_ptr; + ProcInfo Data_stru; + ProcZone::iterator proc_iter; + TcsZone *TcsSub_ele; + ProcDataZone::iterator proc_que_iter; + ProcTcsMap::iterator proc_tcs_iter; + SvrProc *SvrSub_ele; + SvrProc::iterator svr_proc_iter; + SvrTcs::iterator svr_tcs_iter; + TcsZone::iterator tcssub_iter; + ProcPartZone::iterator proc_part_iter; + + struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + memset(&Data_stru, 0x00, sizeof(ProcInfo)); + + if (buf != NULL) { + + memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); + count = strlen(buf) + 1; + + memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + } + + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); + ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); + if (flag == PROC_REG) { + if ((proc_iter = proc->find(key)) == proc->end()) { + proc->insert({key, Data_stru}); + } + + if ((proc_part_iter = procPart->find(key)) == procPart->end()) { + procPart->insert({key, Data_stru.proc_id}); + } + + if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) { + procQuePart->insert({Data_stru.proc_id, key}); + } + + } else { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) { + SvrSub_ele = svr_tcs_iter->second; + + SvrSub_ele->erase(key); + } + + if ((proc_iter = proc->find(key)) != proc->end()) { + + len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); + strncpy(buf_temp, (proc_iter->second).proc_id, len); + proc->erase(proc_iter); + + } + + if ((proc_part_iter = procPart->find(key)) != procPart->end()) { + + procPart->erase(key); + } + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + procQuePart->erase(buf_temp); + } + + } + } else if (flag == PROC_REG_TCS) { + ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + if ((proc_tcs_iter = proc->find(key)) != proc->end()) { + TcsSub_ele = proc_tcs_iter->second; + } else { + + void *ptr_set = mm_malloc(sizeof(TcsZone)); + TcsSub_ele = new(ptr_set) TcsZone; + proc->insert({key, TcsSub_ele}); + } + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); + while(data_ptr) { + TcsSub_ele->insert(data_ptr); + if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + } else { + + void *ptr_set = mm_malloc(sizeof(SvrProc)); + SvrSub_ele = new(ptr_set) SvrProc; + SvrData->insert({data_ptr, SvrSub_ele}); + } + SvrSub_ele->insert(key); + data_ptr = strtok(NULL, STR_MAGIC); + } + + } else if (flag == PROC_QUE_TCS) { + + struct _temp_store { + void *ptr; + int total; + } *temp_store = NULL; + + int num = 0; + int sum = 0; + + ProcInfo_query *ret = NULL; + ProcInfo_query *ret_store = NULL; + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); + while(data_ptr) { + ret = Qurey_object(data_ptr, &len); + if (ret != NULL) { + + if (temp_store == NULL) { + temp_store = (_temp_store *)malloc(sizeof(_temp_store)); + if (temp_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + temp_store->ptr = ret; + temp_store->total = len; + num = 1; + + } else { + num++; + temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num); + if (temp_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + (temp_store + num - 1)->ptr = ret; + (temp_store + num - 1)->total = len; + } + + } + data_ptr = strtok(NULL, STR_MAGIC); + } + + if (num > 0) { + for (count = 0; count < num; count++) { + + if (ret_store == NULL) { + ret_store = (ProcInfo_query *)malloc((temp_store + count)->total); + if (ret_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + sum = (temp_store + count)->total; + memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total); + + } else { + + ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total); + if (ret_store == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total); + + sum += (temp_store + count)->total; + + } + + free((temp_store + count)->ptr); + + } + + free(temp_store); + } + + void *last_buf = malloc(sum + sizeof(int)); + if (last_buf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + *(int *)last_buf = num; + if (num > 0) { + memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum); + free(ret_store); + } + + shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + + free(last_buf); + } else if (flag == PROC_QUE_STCS) { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + + strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); + if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { + SvrSub_ele = svr_tcs_iter->second; + + for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { + count = *svr_proc_iter; + + break; + } + } else { + count = 0; + } + + memset(buf_temp, 0x00, sizeof(buf_temp)); + sprintf(buf_temp, "%d", count); + shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG); + + } else { + + int val; + int temp = 0; + int pos = 0; + int size = 0; + ProcInfo_sum *Data_sum = NULL; + SHMKeySet *subs_proc; + SHMKeySet::iterator subs_proc_iter; + SHMTopicSubMap::iterator subs_iter; + + ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) { + + memset(&Data_stru, 0x00, sizeof(Data_stru)); + + if (count == 0) { + Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum)); + if (Data_sum == NULL) { + + logger->error("in proxy_reg: Out of memory!\n"); + + exit(1); + } + + count++; + + memset(Data_sum, 0x00, sizeof(ProcInfo_sum)); + + } else { + + count++; + len = sizeof(ProcInfo_sum) * count; + Data_sum = (ProcInfo_sum *)realloc(Data_sum, len); + if (Data_sum == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + + exit(1); + } + + memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum)); + } + + Data_stru = proc_iter->second; + + memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id)); + memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name)); + memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info)); + memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info)); + + (Data_sum + count - 1)->stat = 1; + (Data_sum + count - 1)->list_num = 3; + + val = proc_iter->first; + if ((proc_tcs_iter = procData->find(val)) != procData->end()) { + TcsSub_ele = proc_tcs_iter->second; + + temp = 0; + pos = 0; + len = sizeof(Data_sum->reg_info) - 1; + for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) { + + if (temp == 0) { + strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str())); + pos += strlen((Data_sum + count - 1)->reg_info); + len -= strlen((Data_sum + count - 1)->reg_info); + + temp++; + } else { + + if (len > 0) { + strcat((Data_sum + count - 1)->reg_info, ","); + + pos += 1; + len -= 1; + } + + if (len > 0) { + size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()); + strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size); + + pos += size; + len -= size; + } + } + } + + pos = 0; + temp = 0; + len = sizeof(Data_sum->local_info) - 1; + for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) { + subs_proc = subs_iter->second; + + if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) { + + if ((temp == 0)) { + + strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str())); + pos += strlen((Data_sum + count - 1)->local_info); + len -= strlen((Data_sum + count - 1)->local_info); + + temp++; + } else { + + if (len > 0) { + strcat((Data_sum + count - 1)->local_info, ","); + + pos += 1; + len -= 1; + } + + if (len > 0) { + size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()); + strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size); + + pos += size; + len -= size; + } + } + + } + } + + } + } + + temp = count * sizeof(ProcInfo_sum); + void *last_buf = malloc(temp + sizeof(int)); + if (last_buf == NULL) { + logger->error("in proxy_reg: Out of memory!\n"); + exit(1); + } + + *(int *)last_buf = count; + if (count > 0) { + memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp); + free(Data_sum); + } + + shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); + + } +} // 杩愯浠g悊 -void * BusServerSocket::_run_proxy_() { +int BusServerSocket::_run_proxy_() { int size; int key; + int flag; char * action, *topic, *topics, *buf, *content; size_t head_len; char resp_buf[128]; @@ -233,25 +682,21 @@ int rv; char send_buf[512] = { 0x00 }; - const char *topic_delim = ","; - - while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { + const char *topic_delim = ","; + while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; - if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); while(topic) { - _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } } else if(strcmp(action, "desub") == 0) { - if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� _proxy_desub_all(key); @@ -259,7 +704,6 @@ topic = strtok(topics, topic_delim); while(topic) { - _proxy_desub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } @@ -267,10 +711,45 @@ } else if(strcmp(action, "pub") == 0) { - content = topics + head.topic_size; - _proxy_pub(topics, content, head.content_size, key); + topics[head.topic_size - 1] = '\0'; + content = topics + head.topic_size; - } + _proxy_pub(topics, topics, head.topic_size + head.content_size, key); + + } + else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ + || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ + || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { + content = topics + head.topic_size; + if (strcmp(action, "reg") == 0) { + + flag = PROC_REG; + + } else if (strcmp(action, "unreg") == 0) { + + flag = PROC_UNREG; + + } else if (strcmp(action, "tcsreg") == 0) { + + flag = PROC_REG_TCS; + + } else if (strcmp(action, "tcsque") == 0) { + + flag = PROC_QUE_TCS; + + } else if (strcmp(action, "stcsque") == 0) { + + flag = PROC_QUE_STCS; + + } else { + + flag = PROC_QUE_ATCS; + + } + + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); + + } else if (strncmp(buf, "request", strlen("request")) == 0) { sprintf(send_buf, "%4d", key); strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); @@ -281,7 +760,6 @@ } } else if(strcmp(action, "stop") == 0) { - logger->info( "Stopping Bus..."); free(buf); break; } else { @@ -291,5 +769,5 @@ } - return NULL; + return rv; } diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h index 956271b..3052c8b 100644 --- a/src/socket/bus_server_socket.h +++ b/src/socket/bus_server_socket.h @@ -18,7 +18,6 @@ typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap; - class BusServerSocket { private: shm_socket_t *shm_socket; @@ -29,14 +28,16 @@ private: int destroy(); void _proxy_sub( char *topic, int key); - void _proxy_pub( char *topic, void *buf, size_t size, int key); - void *_run_proxy_(); + void _proxy_pub( char *topic, char *buf, size_t size, int key); + int _run_proxy_(); // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); void _proxy_desub( char *topic, int key); void _proxy_desub_all(int key); - static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); + void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag); + + static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); // static bool include_in_keys(int key, int keys[], size_t length); public: diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp index d2f5a8e..6b730a9 100644 --- a/src/socket/bus_server_socket_wrapper.cpp +++ b/src/socket/bus_server_socket_wrapper.cpp @@ -7,7 +7,6 @@ * 鍒涘缓 */ void * bus_server_socket_wrapper_open() { - logger->debug("===bus_server_socket_wrapper_open\n"); BusServerSocket *sockt = new BusServerSocket; return (void *)sockt; } @@ -19,7 +18,6 @@ BusServerSocket *sockt = (BusServerSocket *)_socket; delete sockt; - logger->debug("===bus_server_socket_wrapper_close\n"); } int bus_server_socket_wrapper_stop(void *_socket) { @@ -35,7 +33,7 @@ int ret; BusServerSocket *sockt = (BusServerSocket *)_socket; - if( (ret = sockt->force_bind(SHM_BUS_KEY)) == 0) { + if( (ret = sockt->bind(SHM_BUS_KEY)) == 0) { return sockt->start(); } else { logger->error("start bus failed"); diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index abd9477..a94b9c3 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -38,6 +38,129 @@ return shm_socket_force_bind(shm_socket, key); } +int ShmModSocket::bind_proc_id(char *buf, int len) { + return shm_socket_bind_proc_id(shm_socket, buf, len); +} + +int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) +{ + int ret; + struct timespec ts; + + bus_head_t head = {}; + + if (flag == PROC_REG) { + + memcpy(head.action, "reg", sizeof(head.action)); + + } else if (flag == PROC_UNREG) { + + memcpy(head.action, "unreg", sizeof(head.action)); + + } else if (flag == PROC_REG_TCS) { + + memcpy(head.action, "tcsreg", sizeof(head.action)); + + } else if (flag == PROC_QUE_TCS) { + + memcpy(head.action, "tcsque", sizeof(head.action)); + + } else if (flag == PROC_QUE_STCS) { + + memcpy(head.action, "stcsque", sizeof(head.action)); + + } else if (flag == PROC_QUE_ATCS) { + + memcpy(head.action, "atcsque", sizeof(head.action)); + + } else { + + return -1; + + } + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + head.topic_size = 0; + + if (pData != NULL) { + + head.content_size = sizeof(ProcInfo); + + } else { + + head.content_size = 0; + + } + } else { + + head.topic_size = len; + + head.content_size = 0; + + } + + void *buf_temp; + int buf_size; + + if ((flag == PROC_REG) || (flag == PROC_UNREG)) { + + buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp); + + } else { + + buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp); + + } + + if (timeout_ms > 0) { + + ts.tv_sec = timeout_ms /1000; + + ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000; + + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + + ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG); + + } else { + + ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG); + + } + + } else if (timeout_ms == 0) { + + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + + ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG); + + } else { + + ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG); + + } + + } else { + + if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { + + ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1); + + } else { + + ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1); + + } + + } + + free(buf_temp); + + return ret; + +} + /** * 鍙戦�佷俊鎭� * @key 鍙戦�佺粰璋� @@ -60,7 +183,8 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { - int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); + + int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); if(rv == 0) { logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); @@ -77,7 +201,7 @@ * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, +int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); @@ -183,8 +307,8 @@ memcpy(head.action, "pub", sizeof(head.action)); head.topic_size = topic_size = strlen(topic) + 1; head.content_size = content_size; - - void *buf; + + void *buf; int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf); if(size > 0) { ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); @@ -216,6 +340,7 @@ char *buf; int max_buf_size; void *buf_ptr; + int count = 0; if((buf = (char *) malloc(MAXBUF)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); exit(1); @@ -223,7 +348,7 @@ max_buf_size = MAXBUF; } - buf_size = BUS_HEAD_SIZE + content_size + topic_size ; + buf_size = BUS_HEAD_SIZE + content_size + topic_size; if(max_buf_size < buf_size) { if((buf = (char *) realloc(buf, buf_size)) == NULL) { @@ -238,8 +363,19 @@ memcpy(buf, buf_ptr, BUS_HEAD_SIZE); if(topic_size != 0 ) memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); - if(content_size != 0) - memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); + if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \ + (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) { + memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); + } else { + if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \ + strlen("unreg")) == 0)) && (content_buf != NULL)) { + proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count); + + request_head.content_size = count; + buf_size -= (content_size - count); + + } + } *retbuf = buf; free(buf_ptr); diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index 9890aef..da02fab 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -5,6 +5,7 @@ #include "shm_allocator.h" #include "shm_mm.h" #include "hashtable.h" +#include "proc_def.h" #include "sem_util.h" #include "logger_factory.h" #include "key_def.h" @@ -60,6 +61,9 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int force_bind(int key); + + int bind_proc_id(char *buf, int len); + int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); /** * 鍙戦�佷俊鎭� * @key 鍙戦�佺粰璋� @@ -75,9 +79,7 @@ * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ - int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0); - /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @key 鍙戦�佺粰璋� @@ -128,7 +130,14 @@ */ int get_key() ; + int get_procid(char *buf, int len); + }; + +typedef std::map<int, ProcInfo, std::less<int>, SHM_STL_Allocator<std::pair<int, ProcInfo> > > ProcZone; +typedef std::set<SHMString, std::less<SHMString>, SHM_STL_Allocator<SHMString> > TcsZone; +typedef std::map<int, TcsZone *, std::less<int>, SHM_STL_Allocator<std::pair<const int, TcsZone *> > > ProcTcsMap; + #endif diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 918aef6..84bf77e 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -1,4 +1,5 @@ #include "shm_socket.h" +#include "socket_def.h" #include "hashtable.h" #include "logger_factory.h" #include <map> @@ -108,8 +109,6 @@ int rv, i; hashtable_t *hashtable = mm_get_hashtable(); - logger->debug("shm_socket_close\n"); - // if(sockt->key != 0) { // auto it = shmQueueStMap->find(sockt->key); @@ -118,8 +117,6 @@ // it->second.closeTime = time(NULL); // } // } - - if(sockt->queue != NULL) { sockt->queue->close(); @@ -169,8 +166,20 @@ return 0; } +int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) { + strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len); + + return 0; +} + int shm_socket_get_key(shm_socket_t *sockt){ return sockt->key; +} + +int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) { + strncpy(buf, sockt->proc_id, len); + + return 0; } // 鐭繛鎺ユ柟寮忓彂閫� @@ -297,8 +306,6 @@ { int rv; - logger->debug("%lu destroy threadlocal socket\n", pthread_self()); - if(tmp_socket == NULL) return; @@ -411,8 +418,6 @@ int rv = 0, tryn = 16; - static int Counter_suc = 0; - static int Counter_fail = 0; shm_packet_t sendpak; shm_packet_t recvpak; std::map<int, shm_packet_t>::iterator recvbufIter; @@ -430,12 +435,13 @@ { /* If first call from this thread, allocate buffer for thread, and save its location */ tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM); - - rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); - if ( rv != 0) { - logger->error(rv, "shm_sendandrecv : pthread_setspecific"); - exit(1); - } + + } + + rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); + if ( rv != 0) { + logger->error(rv, "shm_sendandrecv : pthread_setspecific"); + exit(1); } sendpak.key = tmp_socket->key; @@ -473,6 +479,7 @@ } else { // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲� tmp_socket->recvbuf2.insert({recvpak.key, recvpak}); + exit(0); continue; } } @@ -481,7 +488,6 @@ return EBUS_RECVFROM_WRONG_END; LABLE_SUC: - sockt->key = tmp_socket->key; if(recv_buf != NULL) { void *_buf = malloc(recvpak.size); memcpy(_buf, recvpak.buf, recvpak.size); @@ -520,8 +526,8 @@ // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐� if(send_key != recv_key) { - // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); - // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); + + logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); continue; } @@ -640,7 +646,6 @@ if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); - if (sockt->key == 0) { sockt->key = hashtable_alloc_key(hashtable); } @@ -664,7 +669,6 @@ LABEL_POP: - rv = sockt->queue->pop(recvpak, timeout, flag); if(rv != 0) { if(rv == ETIMEDOUT) { @@ -680,4 +684,23 @@ *_recvpak = recvpak; return 0; } - + +void proc_copy(char *dst, void *src, int *counter) { + int count = 0; + ProcInfo *ptr = static_cast<ProcInfo *>(src); + + memcpy(dst, ptr->proc_id, strlen(ptr->proc_id) + 1); + count = strlen(ptr->proc_id) + 1; + memcpy(dst + count, ptr->name, strlen(ptr->name) + 1); + count += strlen(ptr->name) + 1; + memcpy(dst + count, ptr->public_info, strlen(ptr->public_info) + 1); + count += strlen(ptr->public_info) + 1; + memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1); + count += strlen(ptr->private_info) + 1; + + *counter = count; +} + + + + diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index 97d9f2c..8e874d1 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -4,6 +4,7 @@ #include "usg_common.h" #include "usg_typedef.h" #include "shm_queue.h" +#include "proc_def.h" #include "lock_free_queue.h" #include <functional> @@ -18,7 +19,8 @@ #define BUS_ACTION_STOP 1 typedef struct shm_packet_t { - int key; + int key; + size_t size; void * buf; char uuid[64]; @@ -31,8 +33,8 @@ typedef struct shm_socket_t { shm_socket_type_t socket_type; - // 鏈湴key int key; + char proc_id[MAX_STR_LEN]; bool force_bind; pthread_mutex_t mutex; @@ -59,7 +61,8 @@ int shm_socket_bind(shm_socket_t * socket, int key) ; int shm_socket_force_bind(shm_socket_t * socket, int key) ; - + +int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len); /** * @flags : BUS_NOWAIT_FLAG */ @@ -70,6 +73,9 @@ int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec * timeout = NULL, int flags = 0); +typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SvrProc; +typedef std::map<SHMString, SvrProc *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SvrProc *> > > SvrTcs; +typedef std::map<int, SHMString, std::less<int>, SHM_STL_Allocator<std::pair<int, const SHMString> > > ProcPartZone; /** * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data) * @recvbuf 鏀跺埌鐨勬暟鎹� @@ -83,7 +89,6 @@ const struct timespec *timeout = NULL, int flag = 0, void * user_data = NULL); - - +void proc_copy(char *dst, void *src, int *count); #endif \ No newline at end of file diff --git a/src/svsem.cpp b/src/svsem.cpp index 26bde2c..00e6dbc 100644 --- a/src/svsem.cpp +++ b/src/svsem.cpp @@ -11,7 +11,6 @@ union semun arg; struct sembuf sop; - arg.val = 0; /* So initialize it to 0 */ if (semctl(semid, 0, SETVAL, arg) == -1) err_exit(errno, "semctl 1"); diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt index 18bcf4c..a096136 100644 --- a/test_socket/CMakeLists.txt +++ b/test_socket/CMakeLists.txt @@ -6,7 +6,19 @@ ${EXTRA_INCLUDES} ) - +add_executable(bus_test_inter bus_test_inter.cpp) +target_link_libraries(bus_test_inter PRIVATE shm_queue ${EXTRA_LIBS} ) +target_include_directories(bus_test_inter PRIVATE + "${PROJECT_BINARY_DIR}" + ${EXTRA_INCLUDES} + ) + +add_executable(bus_test_server_mode bus_test_server_mode.cpp) +target_link_libraries(bus_test_server_mode PRIVATE shm_queue ${EXTRA_LIBS} ) +target_include_directories(bus_test_server_mode PRIVATE + "${PROJECT_BINARY_DIR}" + ${EXTRA_INCLUDES} + ) add_custom_command( OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh diff --git a/test_socket/bus_test_inter.cpp b/test_socket/bus_test_inter.cpp new file mode 100644 index 0000000..8abcb46 --- /dev/null +++ b/test_socket/bus_test_inter.cpp @@ -0,0 +1,502 @@ +#include "bus_server_socket.h" +#include "shm_mod_socket.h" +#include "shm_mm_wrapper.h" +#include "usg_common.h" +#include "mm.h" +#include "logger_factory.h" +#include <sys/time.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <unistd.h> +#include "bus_error.h" + +#include "bh_api.h" +#include "proc_def.h" + +#define TOTAL_REG_UNREG 2 + +#define MAGIC_STR "INTER_" +#define STR_LEN 30 + +pthread_t tids; +void *res; + +static ProcInfo proc_desc; + +char *genStr(int length, char *buf) +{ + int flag, i; + char *str; + + if (length < (strlen(buf) + 10)) { + length = strlen(buf) + 10; + } + + srand((unsigned) time(NULL)); + if ((str = (char *)malloc(length + 1)) == NULL) { + printf("out of memory!\n"); + + exit(0); + } + + memset(str, 0x00, length + 1); + memcpy(str, MAGIC_STR, strlen(MAGIC_STR)); + strcat(str, buf); + for (i = strlen(str); i < length; i++) { + flag = rand() % 3; + switch (flag) { + case 0: + str[i] = 'A' + rand() % 26; + break; + + case 1: + str[i] = 'a' + rand() % 26; + break; + + default: + str[i] = '0' + rand() % 10; + break; + } + } + + str[length] = '\0'; + + return str; + +} + +void *client_recv(void *skptr) { + + pthread_detach(pthread_self()); + + void *recvbuf = NULL; + int recv_len; + void *proc_id = NULL; + int id_len; + int rv; + void *errBuf = NULL; + int len; + char proc_data[200] = { 0x00 }; + char topics[200] = { 0x00 }; + + struct timespec timeout = {2, 0}; + + while (true) { + + rv = BHReadSub(&proc_id, &id_len, &recvbuf, &recv_len, -1); + if(rv == true) { + + memset(topics, 0x00, sizeof(topics)); + memset(proc_data, 0x00, sizeof(proc_data)); + + memcpy(proc_data, proc_id, (sizeof(proc_data) - 1) > id_len ? id_len : (sizeof(proc_data) - 1)); + + memcpy(topics, recvbuf, (sizeof(topics) - 1) > recv_len ? recv_len : (sizeof(topics) - 1)); + + printf("Get the sub topics data(%s) from proc id(%s)\n", (char *)topics, (char *)proc_id); + + BHFree(recvbuf, len); + BHFree(proc_id, id_len); + + } else { + BHGetLastError(&errBuf, &len); + printf("the thread recv fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, len); + } + + } + +} + +void parseQueryTopicsBuf(void *buf, int len) +{ + if (buf == NULL) + return; + + int total_topics = *(int *)buf; + int i, j; + int buf_pos = 0; + void *ptr_temp = NULL; + ProcInfo_query *ptr = NULL; + ProcInfo *Proc_ptr = NULL; + + buf_pos = sizeof(ProcInfo_query); + ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); + ptr_temp = (void *)ptr; + for (i = 0; i < total_topics; i++) { + printf("topic %s:\n", ptr->name); + for (j = 0; j < ptr->num; j++) { + printf("the %dst process info:\n", j + 1); + Proc_ptr = &(ptr->procData) + j; + printf("proc_id: %s\n", Proc_ptr->proc_id); + printf("name: %s\n", Proc_ptr->name); + printf("public_info: %s\n", Proc_ptr->public_info); + printf("private_info: %s\n", Proc_ptr->private_info); + + } + + if (ptr->num > 1) { + buf_pos += sizeof(ProcInfo) * (ptr->num - 1); + } + + ptr = (ProcInfo_query *)((char *)ptr_temp + buf_pos); + } + +} + +void parseQueryProcBuf(void *buf, int len) +{ + if (buf == NULL) + return; + + int total = *(int *)buf; + int i; + ProcInfo_sum *Proc_ptr = NULL; + + Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); + for (i = 0; i < total; i++) { + printf("proc_id: %s\n", (Proc_ptr + i)->procData.proc_id); + printf("name: %s\n", (Proc_ptr + i)->procData.name); + printf("public_info: %s\n", (Proc_ptr + i)->procData.public_info); + printf("private_info: %s\n", (Proc_ptr + i)->procData.private_info); + + printf("service: %s\n", (Proc_ptr + i)->reg_info); + printf("sub: %s\n", (Proc_ptr + i)->local_info); + } +} + +int main(int argc, char *argv[]) { + int ret; + char *ptr = NULL; + char buf[] = "Process"; + void *buf_temp = NULL; + void *errBuf = NULL; + void *proc_id = NULL; + int size; + int id_len; + int i; + char data_buf[200] = { 0x00 }; + const int timeout_ms = 3000; + + memset(&proc_desc, 0x00, sizeof(ProcInfo)); + ptr = genStr(STR_LEN, buf); + strncpy(proc_desc.proc_id, ptr, strlen(ptr) + 1); + //strncpy(proc_desc.proc_id, "Hello", strlen("Hello") + 1); + free(ptr); + + sleep(2); //make rand change + ptr = genStr(STR_LEN, buf); + strncpy(proc_desc.name, ptr, strlen(ptr) + 1); + //strncpy(proc_desc.name, "World", strlen("World") + 1); + free(ptr); + + sleep(2); + ptr = genStr(STR_LEN, buf); + //strncpy(proc_desc.public_info, ptr, strlen(ptr) + 1); + strncpy(proc_desc.public_info, "Good", strlen("Good") + 1); + free(ptr); + + sleep(2); + ptr = genStr(STR_LEN, buf); + //strncpy(proc_desc.private_info, ptr, strlen(ptr) + 1); + //strncpy(proc_desc.private_info, "Bye", strlen("Bye") + 1); + free(ptr); + + printf("before the registered, process info:\n"); + printf("proc_id: %s\n", proc_desc.proc_id); + printf("name: %s\n", proc_desc.name); + printf("public_info: %s\n", proc_desc.public_info); + printf("private_info: %s\n", proc_desc.private_info); + + for (i = 0; i < TOTAL_REG_UNREG; i++) { + ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered fail with error: %s\n", (char *)errBuf); + BHFree(errBuf, size); + + printf("the second way to get the error log: %s\n", buf_temp); + + }; + BHFree(buf_temp, size); + + ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process unregistered OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process unregistered fail with error: %s\n", (char *)errBuf); + BHFree(errBuf, size); + + printf("the second way to get the error log: %s\n", buf_temp); + + }; + BHFree(buf_temp, size); + } + + //const char *topics_reg_buf1[] = {"topics demo1"}; + const char *topics_reg_buf1 = "topics demo1"; + const char *topics_query_buf1 = "topics demo1"; + const char *topics_query_buf2 = "Hello World,So,Great,Good"; //No space between each other + //const char *topics_reg_buf2[] = {"Hello World", "So", "Great", "Good"}; + const char *topics_reg_buf2 = "Hello World,So,Great,Good"; + + //const char *topics_sub_buf1[] = {"news"}; + //const char *topics_sub_buf2[] = {"sports", "balls", "topics demo1"}; + const char *topics_sub_buf1 = "news"; + const char *topics_sub_buf2 = "sports,balls,topics demo1"; + + const char *topics_pub_topic1 = "news"; + const char *topics_pub_topic1_data = "boob news"; + + const char *topics_pub_topic2 = "balls"; + const char *topics_pub_topic2_data = "Great volleyballs and basketballs"; + + ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered fail with error: %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered topics OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered topics OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered2 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHQueryTopicAddress(NULL, 0, topics_query_buf1, strlen(topics_query_buf1), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process query topics OKay\n"); + + parseQueryTopicsBuf(buf_temp, size); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process query3 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHQueryTopicAddress(NULL, 0, topics_query_buf2, strlen(topics_query_buf2), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process query topics OKay\n"); + + parseQueryTopicsBuf(buf_temp, size); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process query4 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + pthread_create(&tids, NULL, client_recv, NULL); + + ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered topics OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHSubscribeTopics(topics_sub_buf1, strlen(topics_sub_buf1), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process subscribe topics OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process sub1 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHSubscribeTopics(topics_sub_buf2, strlen(topics_sub_buf2), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("tthe process subscribe topics OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process sub2 fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered topics OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered10 fail with errorL %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1"; + const char *topics_server_specific_reg_buf2 = "Server Specific Hello World"; + void *msgID = NULL; + int msg_id_len; + + ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &msgID, &msg_id_len); + if (ret == true) { + printf("the process BHAsyncRequest topics OKay\n"); + + BHFree(msgID, msg_id_len); + + } else { + + BHGetLastError(&errBuf, &size); + printf("the process BHAsyncRequest1 topics fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + + ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), NULL, 0); + if (ret == true) { + printf("the process BHAsyncRequest topics OKay\n"); + + } else { + + BHGetLastError(&errBuf, &size); + printf("the process BHAsyncRequest2 topics fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + + ret = BHRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &proc_id, &id_len, + &buf_temp, &size, timeout_ms); + if (ret == true) { + + memset(data_buf, 0x00, sizeof(data_buf)); + strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1)); + printf("the process BHRequest topics OKay\n"); + + printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id); + + + } else { + + BHGetLastError(&errBuf, &size); + printf("the process BHRequest topics fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + + ret = BHRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &proc_id, &id_len, + &buf_temp, &size, timeout_ms); + if (ret == true) { + + memset(data_buf, 0x00, sizeof(data_buf)); + strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1)); + printf("the process BHRequest topics OKay\n"); + + printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id); + + } else { + + BHGetLastError(&errBuf, &size); + printf("the process BHRequest2 topics fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + +#if !defined(PRO_DE_SERIALIZE) + ret = BHPublish(topics_pub_topic1, topics_pub_topic1_data, timeout_ms); + if (ret == true) { + printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic1, topics_pub_topic1_data); + + } else { + printf("the process published1 fail\n"); + }; + + ret = BHPublish(topics_pub_topic2, topics_pub_topic2_data, timeout_ms); + if (ret == true) { + printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic2, topics_pub_topic2_data); + + } else { + printf("the process published2 fail\n"); + }; +#endif + + memset(data_buf, 0x00, sizeof(data_buf)); + strcpy(data_buf, "query the process"); + ret = BHQueryProcs(NULL, 0, data_buf, strlen(data_buf), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process query all the process data OKay\n"); + + parseQueryProcBuf(buf_temp, size); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process query proc fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + +#if 1 + while(1) { + sleep(1); + } +#else + /*if the process will exit finally, we must call BHUnregister to release the resources*/ + ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process unregistered OKay\n"); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process unregistered fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + BHFree(buf_temp, size); + +#endif + + return 0; + +} + + diff --git a/test_socket/bus_test_server_mode.cpp b/test_socket/bus_test_server_mode.cpp new file mode 100644 index 0000000..71f13c9 --- /dev/null +++ b/test_socket/bus_test_server_mode.cpp @@ -0,0 +1,122 @@ +#include "bus_server_socket.h" +#include "shm_mod_socket.h" +#include "shm_mm_wrapper.h" +#include "usg_common.h" +#include "mm.h" +#include "logger_factory.h" +#include <sys/time.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <unistd.h> +#include "bus_error.h" + +#include "bh_api.h" +#include "proc_def.h" + +static ProcInfo proc_desc; + +int main(int argc, char *argv[]) { + int ret; + void *buf_temp = NULL; + void *errBuf = NULL; + void *proc_id = NULL; + void *src = NULL; + void *data_buf = NULL; + char topics_buf[200] = { 0x00 }; + int size; + int id_len; + int count = 0; + const int timeout_ms = 3000; + + memset(&proc_desc, 0x00, sizeof(ProcInfo)); + strncpy(proc_desc.proc_id, "Hello", strlen("Hello")); + strncpy(proc_desc.name, "World", strlen("World")); + strncpy(proc_desc.public_info, "Good", strlen("Good") + 1); + + const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1"; + const char *topics_server_specific_reg_buf2 = "Server Specific Hello World"; + + ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered OKay\n"); + + BHFree(buf_temp, size); + + } else { + BHGetLastError(&errBuf, &size); + + printf("the process registered fail with error: %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + + ret = BHRegisterTopics(topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered topics OKay\n"); + + BHFree(buf_temp, size); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered1 fail with errorL %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + + ret = BHRegisterTopics(topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &buf_temp, &size, timeout_ms); + if (ret == true) { + printf("the process registered topics OKay\n"); + + BHFree(buf_temp, size); + + } else { + BHGetLastError(&errBuf, &size); + printf("the process registered1 fail with errorL %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + + while(true) { + ret = BHReadRequest(&proc_id, &id_len, &buf_temp, &size, &src, -1); + if (ret == true) { + + strncpy(topics_buf, (char *)buf_temp, (sizeof(topics_buf) - 1) > size ? size : (sizeof(topics_buf) - 1)); + printf("Get data(%s)", topics_buf); + + memset(topics_buf, 0x00, sizeof(topics_buf)); + strncpy(topics_buf, (char *)proc_id, (sizeof(topics_buf) - 1) > id_len ? id_len : (sizeof(topics_buf) - 1)); + printf("proc id(%s)", topics_buf); + + BHFree(buf_temp, size); + BHFree(proc_id, id_len); + + memset(topics_buf, 0x00, sizeof(topics_buf)); + sprintf(topics_buf, "Get data count: %d", ++count); + ret = BHSendReply(src, topics_buf, strlen(topics_buf)); + if (ret == true) { + + printf("the process send reply OKay\n"); + + } else { + + BHGetLastError(&errBuf, &size); + printf("the process send reply fail with errorL %s\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + + BHFree(src, size); + + } else { + BHGetLastError(&errBuf, &size); + printf("BHReadRequest fail with error(%s)\n", (char *)errBuf); + + BHFree(errBuf, size); + }; + } + + return 0; + +} + + -- Gitblit v1.8.0