re-structure the communication work flow.
| | |
| | | |
| | | option(BUILD_SHARED_LIBS "Build using shared libraries" ON) |
| | | |
| | | add_compile_options(-fPIC) |
| | | |
| | | option(BUILD_DOC "Build doc" OFF) |
| | | |
| | | |
| | |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) |
| | | # add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) |
| | | include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto) |
| | | #add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) |
| | | endif() |
| | | |
| | | add_definitions("-DPROTOBUF_USS_DLLS") |
| | | |
| | |
| | | |
| | | BUILD_TYPE="Debug" |
| | | BUILD_DOC="OFF" |
| | | BUILD_SHARED_LIBS="OFF" |
| | | BUILD_SHARED_LIBS="ON" |
| | | |
| | | function usage() { |
| | | echo "build.sh [release | debug | doc]" |
| | |
| | | # to the source code |
| | | configure_file(bus_config.h.in bus_config.h) |
| | | |
| | | #set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON) |
| | | add_compile_options(-fPIC) |
| | | #target_compile_options(shm_queue PRIVATE -fPIC) |
| | | |
| | | list(APPEND _SOURCES_ |
| | | ./logger_factory.cpp |
| | |
| | | ./bus_error.cpp |
| | | ./futex_sem.cpp |
| | | ./svsem.cpp |
| | | ./bh_api.cpp |
| | | ./net/net_conn_pool.cpp |
| | | ./net/net_mod_server_socket_wrapper.cpp |
| | | ./net/net_mod_socket_wrapper.cpp |
| | | ./net/net_mod_socket.cpp |
| | | ./net/net_mod_socket_io.cpp |
| | | ./net/net_mod_server_socket.cpp |
| | | ./proto/bhome_msg_api.pb.cc |
| | | ./proto/bhome_msg.pb.cc |
| | | ./proto/error_msg.pb.cc |
| | | ./shm/shm_mm_wrapper.cpp |
| | | ./shm/mm.cpp |
| | | ./shm/hashtable.cpp |
| | | ./shm/shm_mm.cpp |
| | | ./bh_api.cc |
| | | ./proto/bhome_msg.pb.cc |
| | | ./proto/bhome_msg_api.pb.cc |
| | | ./proto/error_msg.pb.cc |
| | | |
| | | ) |
| | | |
| | | if (BUILD_SHARED_LIBS) |
| | | add_library(shm_queue SHARED ${_SOURCES_}) |
| | | target_compile_options(shm_queue PRIVATE -fPIC) |
| | | set_property(TARGET shm_queue PROPERTY POSITION_INDEPENDENT_CODE ON) |
| | | else() |
| | | add_library(shm_queue STATIC ${_SOURCES_}) |
| | | endif() |
| | | |
| | | # STATIC SHARED |
| | | # add_library(shm_queue ${_SOURCES_}) |
| | | #add_library(shm_queue ${_SOURCES_}) |
| | | |
| | | target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} ) |
| | | |
| | |
| | | ${PROJECT_BINARY_DIR}/src |
| | | ${CMAKE_CURRENT_SOURCE_DIR} |
| | | ${CMAKE_CURRENT_SOURCE_DIR}/shm |
| | | ${CMAKE_CURRENT_SOURCE_DIR}/proto |
| | | ${CMAKE_CURRENT_SOURCE_DIR}/queue |
| | | ${CMAKE_CURRENT_SOURCE_DIR}/socket |
| | | ${CMAKE_CURRENT_SOURCE_DIR}/net |
| | | ) |
| | | |
| | | add_executable(bus_proxy_start bus_proxy_start.cpp) |
| | | target_link_libraries(bus_proxy_start PRIVATE shm_queue ${EXTRA_LIBS} ) |
| | | |
| | | target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} ) |
| | | |
| | |
| | | ./bus_def.h |
| | | ./logger_factory.h |
| | | ./sole.h |
| | | ./proc_def.h |
| | | ./queue/linked_lock_free_queue.h |
| | | ./queue/array_lock_free_queue.h |
| | | ./queue/shm_queue.h |
| | |
| | | ./shm/shm_mm_wrapper.h |
| | | ./shm/shm_allocator.h |
| | | ./shm/shm_mm.h |
| | | ./bh_api.h |
| | | |
| | | |
| | | DESTINATION include) |
| New file |
| | |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "bus_server_socket_wrapper.h" |
| | | #include "shm_mm_wrapper.h" |
| | | #include "proc_def.h" |
| | | #include "usg_common.h" |
| | | #include "bh_api.h" |
| | | #include <pthread.h> |
| | | #include <getopt.h> |
| | | #include "bhome_msg_api.pb.h" |
| | | #include "bhome_msg.pb.h" |
| | | #include "error_msg.pb.h" |
| | | #include "proto/bhome_msg.pb.h" |
| | | #include "proto/bhome_msg_api.pb.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | static int gRun_stat = 0; |
| | | static void *gNetmod_socket = NULL; |
| | | |
| | | static pthread_mutex_t mutex; |
| | | |
| | | static char errString[100] = { 0x00 }; |
| | | |
| | | int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | int key; |
| | | int count = 0; |
| | | void *buf = NULL; |
| | | int min = 0; |
| | | ProcInfo pData; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _ProcInfo_proto |
| | | { |
| | | const char *proc_id; |
| | | const char *name; |
| | | const char *public_info; |
| | | const char *private_info; |
| | | }_input; |
| | | |
| | | ::bhome_msg::ProcInfo input; |
| | | if(!input.ParseFromArray(proc_info, proc_info_len)) { |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | _input.proc_id = input.proc_id().c_str(); |
| | | _input.name = input.name().c_str(); |
| | | _input.public_info = input.public_info().c_str(); |
| | | _input.private_info = input.private_info().c_str(); |
| | | |
| | | #else |
| | | if ((proc_info == NULL) || (proc_info_len == 0)) { |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x90, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | #endif |
| | | |
| | | memset(&pData, 0x00, sizeof(ProcInfo)); |
| | | if (gRun_stat == 0) { |
| | | pthread_mutex_init(&mutex, NULL); |
| | | |
| | | } else { |
| | | logger->error("the process has already registered!\n"); |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | |
| | | gRun_stat = 1; |
| | | shm_mm_wrapper_init(SHM_RES_SIZE); |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | if (_input.proc_id != NULL) { |
| | | count = strlen(_input.proc_id) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.proc_id, _input.proc_id, min); |
| | | } |
| | | |
| | | if (_input.name != NULL) { |
| | | count = strlen(_input.name) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.name, _input.name, min); |
| | | } |
| | | |
| | | if (_input.public_info != NULL) { |
| | | count = strlen(_input.public_info) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.public_info, _input.public_info, min); |
| | | } |
| | | |
| | | if (_input.private_info != NULL) { |
| | | count = strlen(_input.private_info) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.private_info, _input.private_info, min); |
| | | } |
| | | #else |
| | | if (strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) > 0) { |
| | | count = strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.proc_id, ((ProcInfo *)proc_info)->proc_id, min); |
| | | } |
| | | |
| | | if (strlen((const char *)(((ProcInfo *)proc_info)->name)) > 0) { |
| | | count = strlen((const char *)(((ProcInfo *)proc_info)->name)) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.name, ((ProcInfo *)proc_info)->name, min); |
| | | } |
| | | |
| | | if (strlen((const char *)(((ProcInfo *)proc_info)->public_info)) > 0) { |
| | | count = strlen((const char *)(((ProcInfo *)proc_info)->public_info)) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.public_info, ((ProcInfo *)proc_info)->public_info, min); |
| | | } |
| | | |
| | | if (strlen((const char *)(((ProcInfo *)proc_info)->private_info)) > 0) { |
| | | count = strlen((const char *)(((ProcInfo *)proc_info)->private_info)) + 1; |
| | | min = count > MAX_STR_LEN ? MAX_STR_LEN : count; |
| | | strncpy(pData.private_info, ((ProcInfo *)proc_info)->private_info, min); |
| | | } |
| | | #endif |
| | | |
| | | gNetmod_socket = net_mod_socket_open(); |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | key = hashtable_alloc_key(hashtable); |
| | | net_mod_socket_bind(gNetmod_socket, key); |
| | | |
| | | rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | } |
| | | |
| | | exit_entry: |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgCommonReply mcr; |
| | | mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mcr.mutable_errmsg()->set_errstring(errString); |
| | | *reply_len = mcr.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mcr.SerializePartialToArray(*reply, *reply_len); |
| | | #else |
| | | min = strlen(errString) + 1; |
| | | buf = malloc(min) ; |
| | | memcpy(buf, errString, strlen(errString)); |
| | | *((char *)buf + min - 1) = '\0'; |
| | | |
| | | *reply = buf; |
| | | *reply_len = min; |
| | | |
| | | #endif |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | int min; |
| | | void *buf = NULL; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _ProcInfo_proto |
| | | { |
| | | const char *proc_id; |
| | | const char *name; |
| | | const char *public_info; |
| | | const char *private_info; |
| | | }_input; |
| | | |
| | | ::bhome_msg::ProcInfo input; |
| | | |
| | | if(!input.ParseFromArray(proc_info, proc_info_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | _input.proc_id = input.proc_id().c_str(); |
| | | _input.name = input.name().c_str(); |
| | | _input.public_info = input.public_info().c_str(); |
| | | _input.private_info = input.private_info().c_str(); |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG); |
| | | if (rv == 0) { |
| | | |
| | | net_mod_socket_close(gNetmod_socket); |
| | | |
| | | gNetmod_socket = NULL; |
| | | |
| | | gRun_stat = 0; |
| | | |
| | | } |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | exit_entry: |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgCommonReply mcr; |
| | | mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mcr.mutable_errmsg()->set_errstring(errString); |
| | | *reply_len = mcr.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mcr.SerializePartialToArray(*reply, *reply_len); |
| | | #else |
| | | min = strlen(errString) + 1; |
| | | buf = malloc(min) ; |
| | | memcpy(buf, errString, strlen(errString)); |
| | | *((char *)buf + min - 1) = '\0'; |
| | | |
| | | *reply = buf; |
| | | *reply_len = min; |
| | | #endif |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | int min, i; |
| | | void *buf = NULL; |
| | | int total = 0; |
| | | int count = 0; |
| | | char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _MsgTopicList |
| | | { |
| | | int amount; |
| | | const char *topics[MAX_STR_LEN]; |
| | | }_input; |
| | | |
| | | ::bhome_msg::MsgTopicList input; |
| | | if(!input.ParseFromArray(topics, topics_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | _input.amount = input.topic_list_size(); |
| | | if (_input.amount > MAX_STR_LEN) { |
| | | _input.amount = MAX_STR_LEN; |
| | | } |
| | | |
| | | for(int i = 0; i < _input.amount; i++) { |
| | | _input.topics[i] = input.topic_list(i).c_str(); |
| | | } |
| | | #else |
| | | if ((topics == NULL) || (topics_len == 0)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | total = sizeof(topics_buf) / sizeof(char); |
| | | for (i = 0; i < _input.amount; i++) { |
| | | min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i])); |
| | | if (min > 0) { |
| | | strncpy(topics_buf + count, _input.topics[i], min); |
| | | count += min; |
| | | |
| | | if (total >= strlen(_input.topics[i])) { |
| | | total -= strlen(_input.topics[i]); |
| | | } |
| | | |
| | | if ((_input.amount > 1) && (i < (_input.amount - 1))) { |
| | | strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC)); |
| | | total -= 1; |
| | | count++; |
| | | } |
| | | } else { |
| | | topics_buf[strlen(topics_buf) - 1] = '\0'; |
| | | } |
| | | } |
| | | |
| | | logger->debug("the parsed compound register topics: %s!\n", topics_buf); |
| | | #else |
| | | memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len); |
| | | #endif |
| | | |
| | | rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | exit_entry: |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgCommonReply mcr; |
| | | mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mcr.mutable_errmsg()->set_errstring(errString); |
| | | *reply_len = mcr.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mcr.SerializePartialToArray(*reply, *reply_len); |
| | | #else |
| | | min = strlen(errString) + 1; |
| | | buf = malloc(min) ; |
| | | memcpy(buf, errString, strlen(errString)); |
| | | *((char *)buf + min - 1) = '\0'; |
| | | |
| | | *reply = buf; |
| | | *reply_len = min; |
| | | #endif |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | int min; |
| | | void *buf = NULL; |
| | | int size; |
| | | char topics_buf[MAX_STR_LEN] = { 0x00 }; |
| | | ProcInfo_query *ptr = NULL; |
| | | ProcInfo *Proc_ptr = NULL; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _BHAddress |
| | | { |
| | | unsigned long long mq_id; |
| | | long long abs_addr; |
| | | const char *ip; |
| | | int port; |
| | | }_input0; |
| | | |
| | | const char *_input1; |
| | | |
| | | ::bhome_msg::BHAddress input0; |
| | | ::bhome_msg::MsgQueryTopic input1; |
| | | if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len)) { |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | _input0.mq_id = input0.mq_id(); |
| | | _input0.abs_addr = input0.abs_addr(); |
| | | _input0.ip = input0.ip().c_str(); |
| | | _input0.port = input0.port(); |
| | | _input1 = input1.topic().c_str(); |
| | | |
| | | #else |
| | | if ((topic == NULL) || (topic_len == 0)) { |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | min = (strlen(_input1) > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : strlen(_input1)); |
| | | strncpy(topics_buf, _input1, min); |
| | | #else |
| | | min = (topic_len > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : topic_len); |
| | | buf = const_cast<void *>(topic); |
| | | strncpy(topics_buf, (const char *)buf, min); |
| | | #endif |
| | | rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | exit_entry: |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | |
| | | struct _MsgQueryTopicReply |
| | | { |
| | | std::string proc_id; |
| | | |
| | | unsigned long long mq_id; |
| | | long long abs_addr; |
| | | std::string ip; |
| | | int port; |
| | | }mtr_list[128]; |
| | | int mtr_list_num = 0; |
| | | |
| | | if (rv == 0) { |
| | | |
| | | ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); |
| | | mtr_list_num = ptr->num; |
| | | |
| | | if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { |
| | | mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); |
| | | } |
| | | |
| | | for(int i = 0; i < mtr_list_num; i++) { |
| | | mtr_list[i].proc_id = ptr->procData.proc_id; |
| | | mtr_list[i].mq_id = 0x00; |
| | | mtr_list[i].abs_addr = 0x00; |
| | | mtr_list[i].ip = "192.168.1.1"; |
| | | mtr_list[i].port = 5000; |
| | | } |
| | | } |
| | | |
| | | ::bhome_msg::MsgQueryTopicReply mtr; |
| | | mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mtr.mutable_errmsg()->set_errstring(errString); |
| | | for(int i = 0; i < mtr_list_num; i++) |
| | | { |
| | | ::bhome_msg::MsgQueryTopicReply_BHNodeAddress *mtrb = mtr.add_node_address(); |
| | | mtrb->set_proc_id(mtr_list[i].proc_id); |
| | | mtrb->mutable_addr()->set_mq_id(mtr_list[i].mq_id); |
| | | mtrb->mutable_addr()->set_abs_addr(mtr_list[i].abs_addr); |
| | | mtrb->mutable_addr()->set_ip(mtr_list[i].ip); |
| | | mtrb->mutable_addr()->set_port(mtr_list[i].port); |
| | | } |
| | | |
| | | *reply_len = mtr.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mtr.SerializePartialToArray(*reply, *reply_len); |
| | | #else |
| | | if (rv == 0) { |
| | | *reply = buf; |
| | | *reply_len = size; |
| | | |
| | | } else { |
| | | min = strlen(errString) + 1; |
| | | buf = malloc(min) ; |
| | | memcpy(buf, errString, strlen(errString)); |
| | | *((char *)buf + min - 1) = '\0'; |
| | | |
| | | *reply = buf; |
| | | *reply_len = min; |
| | | } |
| | | |
| | | #endif |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | void *buf = NULL; |
| | | int size; |
| | | int min; |
| | | ProcInfo_sum *Proc_ptr = NULL; |
| | | char data_buf[MAX_STR_LEN] = { 0x00 }; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _BHAddress |
| | | { |
| | | unsigned long long mq_id; |
| | | long long abs_addr; |
| | | const char *ip; |
| | | int port; |
| | | }_input0; |
| | | |
| | | const char *_input1; |
| | | |
| | | ::bhome_msg::BHAddress input0; |
| | | ::bhome_msg::MsgQueryProc input1; |
| | | if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | _input0.mq_id = input0.mq_id(); |
| | | _input0.abs_addr = input0.abs_addr(); |
| | | _input0.ip = input0.ip().c_str(); |
| | | _input0.port = input0.port(); |
| | | _input1 = input1.proc_id().c_str(); |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | |
| | | if (query != NULL) { |
| | | strncpy(data_buf, (char *)query, (sizeof(data_buf) - 1) > query_len ? query_len : (sizeof(data_buf) - 1)); |
| | | } |
| | | |
| | | rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | exit_entry: |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _MsgQueryProcReply |
| | | { |
| | | std::string proc_id; |
| | | std::string name; |
| | | std::string public_info; |
| | | std::string private_info; |
| | | |
| | | bool online; |
| | | |
| | | std::string topic_list[128]; |
| | | int topic_list_num; |
| | | |
| | | } mpr_list[128]; |
| | | int mpr_list_num = 0; |
| | | |
| | | if (rv == 0) { |
| | | |
| | | mpr_list_num = *(int *)buf; |
| | | |
| | | if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) { |
| | | mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]); |
| | | } |
| | | |
| | | Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); |
| | | for(int i = 0; i < mpr_list_num; i++) { |
| | | mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id; |
| | | mpr_list[i].name = (Proc_ptr + i)->procData.name; |
| | | mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info; |
| | | mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info; |
| | | mpr_list[i].online = (Proc_ptr + i)->stat; |
| | | mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num; |
| | | |
| | | for(int j = 0; j < mpr_list[i].topic_list_num; j++) |
| | | { |
| | | if (j == 0) { |
| | | mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info; |
| | | } else if (j == 1) { |
| | | mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info; |
| | | } else if (j == 2) { |
| | | mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info; |
| | | } |
| | | } |
| | | } |
| | | |
| | | ::bhome_msg::MsgQueryProcReply mpr; |
| | | mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mpr.mutable_errmsg()->set_errstring(errString); |
| | | |
| | | for(int i = 0; i < mpr_list_num; i++) |
| | | { |
| | | ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list(); |
| | | mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id); |
| | | mpri->mutable_proc()->set_name(mpr_list[i].name); |
| | | mpri->mutable_proc()->set_public_info(mpr_list[i].public_info); |
| | | mpri->mutable_proc()->set_private_info(mpr_list[i].private_info); |
| | | mpri->set_online(mpr_list[i].online); |
| | | for(int j = 0; j < mpr_list[i].topic_list_num; j++) |
| | | { |
| | | mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]); |
| | | } |
| | | } |
| | | |
| | | *reply_len = mpr.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mpr.SerializePartialToArray(*reply,*reply_len); |
| | | } |
| | | #else |
| | | if (rv == 0) { |
| | | *reply = buf; |
| | | *reply_len = size; |
| | | } else { |
| | | min = strlen(errString) + 1; |
| | | buf = malloc(min) ; |
| | | memcpy(buf, errString, strlen(errString)); |
| | | *((char *)buf + min - 1) = '\0'; |
| | | |
| | | *reply = buf; |
| | | *reply_len = min; |
| | | } |
| | | #endif |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | |
| | | } |
| | | |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | int sec, nsec; |
| | | int total = 0; |
| | | int count = 0; |
| | | int min, i; |
| | | void *buf = NULL; |
| | | char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _MsgTopicList |
| | | { |
| | | int amount; |
| | | const char *topics[MAX_STR_LEN]; |
| | | }_input; |
| | | |
| | | ::bhome_msg::MsgTopicList input; |
| | | if(!input.ParseFromArray(topics, topics_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | _input.amount = input.topic_list_size(); |
| | | |
| | | if (_input.amount > MAX_STR_LEN) { |
| | | _input.amount = MAX_STR_LEN; |
| | | } |
| | | |
| | | for(int i = 0; i < _input.amount; i++) |
| | | _input.topics[i] = input.topic_list(i).c_str(); |
| | | |
| | | #else |
| | | if ((topics == NULL) || (topics_len == 0)) { |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | goto exit_entry; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | total = sizeof(topics_buf) / sizeof(char); |
| | | for (i = 0; i < _input.amount; i++) { |
| | | min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i])); |
| | | if (min > 0) { |
| | | strncpy(topics_buf + count, _input.topics[i], min); |
| | | count += min; |
| | | |
| | | if (total >= strlen(_input.topics[i])) { |
| | | total -= strlen(_input.topics[i]); |
| | | } |
| | | |
| | | if ((_input.amount > 1) && (i < (_input.amount - 1))) { |
| | | strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC)); |
| | | total -= 1; |
| | | count++; |
| | | } |
| | | } else { |
| | | topics_buf[strlen(topics_buf) - 1] = '\0'; |
| | | } |
| | | } |
| | | logger->debug("the parsed compound sub topics: %s!\n", topics_buf); |
| | | #else |
| | | memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len); |
| | | #endif |
| | | |
| | | if (timeout_ms > 0) { |
| | | |
| | | sec = timeout_ms / 1000; |
| | | nsec = (timeout_ms - sec * 1000) * 1000 * 1000; |
| | | rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1); |
| | | |
| | | } |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | exit_entry: |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgCommonReply mcr; |
| | | mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mcr.mutable_errmsg()->set_errstring(errString); |
| | | *reply_len=mcr.ByteSizeLong(); |
| | | *reply=malloc(*reply_len); |
| | | mcr.SerializePartialToArray(*reply,*reply_len); |
| | | #else |
| | | min = strlen(errString) + 1; |
| | | buf = malloc(min) ; |
| | | memcpy(buf, errString, strlen(errString)); |
| | | *((char *)buf + min - 1) = '\0'; |
| | | |
| | | *reply = buf; |
| | | *reply_len = min; |
| | | #endif |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | |
| | | } |
| | | |
| | | int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv = BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | int BHHeartbeatEasy(const int timeout_ms) |
| | | { |
| | | |
| | | return true; |
| | | } |
| | | |
| | | int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _ProcInfo_proto |
| | | { |
| | | const char *proc_id; |
| | | const char *name; |
| | | const char *public_info; |
| | | const char *private_info; |
| | | }_input; |
| | | |
| | | ::bhome_msg::ProcInfo input; |
| | | if(!input.ParseFromArray(proc_info,proc_info_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | _input.proc_id = input.proc_id().c_str(); |
| | | _input.name = input.name().c_str(); |
| | | _input.public_info = input.public_info().c_str(); |
| | | _input.private_info = input.private_info().c_str(); |
| | | |
| | | rv = 0; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | ::bhome_msg::MsgCommonReply mcr; |
| | | mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mcr.mutable_errmsg()->set_errstring(errString); |
| | | *reply_len=mcr.ByteSizeLong(); |
| | | *reply=malloc(*reply_len); |
| | | mcr.SerializePartialToArray(*reply,*reply_len); |
| | | #endif |
| | | |
| | | return true; |
| | | } |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | int BHPublish(const char *msgpub, const char msgpub_len, const int timeout_ms) |
| | | #else |
| | | int BHPublish(const char *topic, const char *content, const int timeout_ms) |
| | | #endif |
| | | { |
| | | int rv; |
| | | int min; |
| | | void *buf = NULL; |
| | | net_node_t node_arr; |
| | | int node_arr_len = 0; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _MsgPublish |
| | | { |
| | | const char *topic; |
| | | const char *data; |
| | | }_input; |
| | | |
| | | ::bhome_msg::MsgPublish input; |
| | | if(!input.ParseFromArray(msgpub, msgpub_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | _input.topic = input.topic().c_str(); |
| | | _input.data = input.data().c_str(); |
| | | #else |
| | | if ((topic == NULL) || (content == NULL)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | if (timeout_ms > 0) { |
| | | rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data), timeout_ms); |
| | | } else if (timeout_ms == 0) { |
| | | rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data)); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data)); |
| | | } |
| | | #else |
| | | if (timeout_ms > 0) { |
| | | rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content), timeout_ms); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content)); |
| | | |
| | | } else { |
| | | rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content)); |
| | | } |
| | | #endif |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | if (rv > 0) |
| | | return true; |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | int len; |
| | | void *buf; |
| | | int key; |
| | | int size; |
| | | int sec, nsec; |
| | | char topics_buf[MAX_STR_LEN] = { 0x00 }; |
| | | char data_buf[MAX_STR_LEN * 3] = { 0x00 }; |
| | | |
| | | struct _ReadSubReply |
| | | { |
| | | std::string proc_id; |
| | | std::string topic; |
| | | std::string data; |
| | | } rsr; |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | if (timeout_ms > 0) { |
| | | sec = timeout_ms / 1000; |
| | | nsec = (timeout_ms - sec * 1000) * 1000 * 1000; |
| | | |
| | | rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); |
| | | } |
| | | |
| | | if (rv == 0) { |
| | | |
| | | len = strlen((char *)buf); |
| | | if (len > size) { |
| | | len = size; |
| | | } |
| | | strncpy(topics_buf, (char *)buf, len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : len); |
| | | |
| | | if (len < size) { |
| | | len = strlen(topics_buf) + 1; |
| | | |
| | | strncpy(data_buf, (char *)buf + len, size - len); |
| | | } |
| | | |
| | | free(buf); |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | rsr.topic = topics_buf; |
| | | rsr.data = data_buf; |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | sprintf(topics_buf, "%d", key); |
| | | |
| | | rsr.proc_id = topics_buf; |
| | | *proc_id_len = rsr.proc_id.size(); |
| | | *proc_id = malloc(*proc_id_len); |
| | | memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len); |
| | | |
| | | ::bhome_msg::MsgPublish Mp; |
| | | Mp.set_topic(rsr.topic); |
| | | Mp.set_data(rsr.data.data()); |
| | | *msgpub_len = Mp.ByteSizeLong(); |
| | | *msgpub = malloc(*msgpub_len); |
| | | Mp.SerializePartialToArray(*msgpub, *msgpub_len); |
| | | #else |
| | | void *ptr; |
| | | if (len < size) { |
| | | ptr = malloc(size - len); |
| | | len = size - len; |
| | | memcpy(ptr, data_buf, len); |
| | | } else { |
| | | ptr = malloc(len); |
| | | memcpy(ptr, topics_buf, len); |
| | | } |
| | | *msgpub = ptr; |
| | | *msgpub_len = len; |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | sprintf(topics_buf, "%d", key); |
| | | |
| | | *proc_id_len = strlen(topics_buf); |
| | | *proc_id = malloc(*proc_id_len); |
| | | memcpy(*proc_id, topics_buf, *proc_id_len); |
| | | |
| | | #endif |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len) |
| | | { |
| | | int rv; |
| | | void *buf; |
| | | int size; |
| | | int val; |
| | | int len; |
| | | int min; |
| | | int sec, nsec; |
| | | std::string MsgID; |
| | | int timeout_ms = 3000; |
| | | char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _BHAddress |
| | | { |
| | | unsigned long long mq_id; |
| | | long long abs_addr; |
| | | const char *ip; |
| | | int port; |
| | | }_input0; |
| | | |
| | | struct MsgRequestTopic |
| | | { |
| | | const char *topic; |
| | | const char *data; |
| | | }_input1; |
| | | |
| | | ::bhome_msg::BHAddress input0; |
| | | ::bhome_msg::MsgRequestTopic input1; |
| | | if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | _input0.mq_id = input0.mq_id(); |
| | | _input0.abs_addr = input0.abs_addr(); |
| | | _input0.ip = input0.ip().c_str(); |
| | | _input0.port = input0.port(); |
| | | _input1.topic = input1.topic().c_str(); |
| | | _input1.data = input1.data().c_str(); |
| | | |
| | | #else |
| | | if ((request == NULL) || (request_len == 0)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1)); |
| | | #else |
| | | strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1)); |
| | | #endif |
| | | |
| | | rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS); |
| | | if (rv == 0) { |
| | | |
| | | val = atoi((char *)buf); |
| | | |
| | | free(buf); |
| | | |
| | | if (val > 0) { |
| | | |
| | | len = strlen(topics_buf); |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len ); |
| | | strncpy(topics_buf + len + 1, _input1.data, min); |
| | | len += (min + 1); |
| | | #endif |
| | | if (timeout_ms > 0) { |
| | | |
| | | sec = timeout_ms / 1000; |
| | | nsec = (timeout_ms - sec * 1000) * 1000 * 1000; |
| | | |
| | | rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); |
| | | } |
| | | } else { |
| | | |
| | | rv = EBUS_RES_UNSUPPORT; |
| | | |
| | | } |
| | | } |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | if((msg_id == NULL) || (msg_id_len == NULL)) { |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | if (rv == 0) { |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | sprintf(topics_buf, "%d", val); |
| | | MsgID = topics_buf; |
| | | |
| | | *msg_id_len = MsgID.size(); |
| | | *msg_id = malloc(*msg_id_len); |
| | | memcpy(*msg_id, MsgID.data(), *msg_id_len); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | return false; |
| | | |
| | | } |
| | | |
| | | int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len, |
| | | void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | void *buf; |
| | | int size; |
| | | int val; |
| | | int min, len; |
| | | net_node_t node; |
| | | int node_size; |
| | | int recv_arr_size; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | net_mod_err_t *errarr; |
| | | int errarr_size = 0; |
| | | int sec, nsec; |
| | | char topics_buf[MAX_STR_LEN] = { 0x00 }; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | struct _BHAddress |
| | | { |
| | | unsigned long long mq_id; |
| | | long long abs_addr; |
| | | const char *ip; |
| | | int port; |
| | | }_input0; |
| | | |
| | | struct _MsgRequestTopic |
| | | { |
| | | const char *topic; |
| | | const char *data; |
| | | }_input1; |
| | | |
| | | ::bhome_msg::BHAddress input0; |
| | | ::bhome_msg::MsgRequestTopic input1; |
| | | if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | _input0.mq_id = input0.mq_id(); |
| | | _input0.abs_addr = input0.abs_addr(); |
| | | _input0.ip = input0.ip().c_str(); |
| | | _input0.port = input0.port(); |
| | | _input1.topic = input1.topic().c_str(); |
| | | _input1.data = input1.data().c_str(); |
| | | |
| | | #else |
| | | if ((request == NULL) || (request_len == 0)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1)); |
| | | #else |
| | | strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1)); |
| | | #endif |
| | | |
| | | rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS); |
| | | if (rv == 0) { |
| | | val = atoi((char *)buf); |
| | | |
| | | free(buf); |
| | | |
| | | if (val > 0) { |
| | | memset(&node, 0x00, sizeof(node)); |
| | | |
| | | len = strlen(topics_buf); |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len ); |
| | | strncpy(topics_buf + len + 1, _input1.data, min); |
| | | len += (min + 1); |
| | | #endif |
| | | |
| | | node.key = val; |
| | | rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size); |
| | | if (rv > 0) { |
| | | if (recv_arr_size > 0) { |
| | | |
| | | node.key = recv_arr[0].key; |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | size = recv_arr[0].content_length; |
| | | buf = (char *)malloc(size); |
| | | strncpy((char *)buf, (char *)recv_arr[0].content, size); |
| | | #if !defined(PRO_DE_SERIALIZE) |
| | | *reply = buf; |
| | | *reply_len = size; |
| | | #endif |
| | | } |
| | | |
| | | net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); |
| | | |
| | | if(errarr_size > 0) { |
| | | free(errarr); |
| | | } |
| | | |
| | | rv = 0; |
| | | |
| | | } else { |
| | | rv = EBUS_TIMEOUT; |
| | | } |
| | | |
| | | } else { |
| | | rv = EBUS_RES_UNSUPPORT; |
| | | } |
| | | } |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | struct _RequestReply |
| | | { |
| | | std::string proc_id; |
| | | std::string data; |
| | | }rr; |
| | | |
| | | if (rv == 0) { |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | sprintf(topics_buf, "%d", node.key); |
| | | |
| | | rr.proc_id = topics_buf; |
| | | *proc_id_len = rr.proc_id.size(); |
| | | *proc_id = malloc(*proc_id_len); |
| | | memcpy(*proc_id, rr.proc_id.data(), *proc_id_len); |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | memcpy(topics_buf, buf, size); |
| | | rr.data = topics_buf; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgRequestTopicReply mrt; |
| | | mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mrt.mutable_errmsg()->set_errstring(errString); |
| | | mrt.set_data(rr.data.data()); |
| | | *reply_len = mrt.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mrt.SerializePartialToArray(*reply, *reply_len); |
| | | #endif |
| | | } |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) |
| | | { |
| | | int rv; |
| | | void *buf; |
| | | int key; |
| | | int size; |
| | | int sec, nsec; |
| | | char topics_buf[MAX_STR_LEN] = { 0x00 }; |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | if (timeout_ms > 0) { |
| | | |
| | | sec = timeout_ms / 1000; |
| | | nsec = (timeout_ms - sec * 1000) * 1000 * 1000; |
| | | |
| | | rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); |
| | | } |
| | | |
| | | if (rv == 0) { |
| | | struct _ReadRequestReply |
| | | { |
| | | std::string proc_id; |
| | | std::string topic; |
| | | std::string data; |
| | | void *src; |
| | | } rrr; |
| | | |
| | | sprintf(topics_buf, "%d", key); |
| | | rrr.proc_id = topics_buf; |
| | | |
| | | *proc_id_len = rrr.proc_id.size(); |
| | | *proc_id = malloc(*proc_id_len); |
| | | memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len); |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size); |
| | | rrr.topic = topics_buf; |
| | | rrr.data = topics_buf; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgRequestTopic mrt; |
| | | mrt.set_topic(rrr.topic); |
| | | mrt.set_data(rrr.data.data()); |
| | | *request_len = mrt.ByteSizeLong(); |
| | | *request = malloc(*request_len); |
| | | mrt.SerializePartialToArray(*request,*request_len); |
| | | #else |
| | | *request = buf; |
| | | *request_len = size; |
| | | #endif |
| | | |
| | | buf = malloc(sizeof(int)); |
| | | *(int *)buf = key; |
| | | *src = buf; |
| | | } |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHSendReply(void *src, const void *reply, const int reply_len) |
| | | { |
| | | int rv; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | ::bhome_msg::MsgRequestTopicReply input; |
| | | if (!input.ParseFromArray(reply, reply_len)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | const char *_input; |
| | | _input = input.data().data(); |
| | | |
| | | #else |
| | | if ((src == NULL) || (reply == NULL) || (reply_len == 0)) { |
| | | |
| | | rv = EBUS_INVALID_PARA; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | if (gRun_stat == 0) { |
| | | logger->error("the process has not been registered yet!\n"); |
| | | |
| | | rv = EBUS_RES_NO; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | |
| | | rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | |
| | | pthread_mutex_unlock(&mutex); |
| | | } else { |
| | | |
| | | rv = EBUS_RES_BUSY; |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | | } |
| | | |
| | | if (rv == 0) |
| | | return true; |
| | | |
| | | return false; |
| | | } |
| | | |
| | | int BHCleanup() { |
| | | |
| | | return true; |
| | | } |
| | | |
| | | void BHFree(void *buf, int size) { |
| | | free(buf); |
| | | } |
| | | |
| | | int BHGetLastError(void **msg, int *msg_len) |
| | | { |
| | | void *buf = NULL; |
| | | |
| | | buf = malloc(strlen(errString) + 1); |
| | | |
| | | memset(buf, 0x00, strlen(errString) + 1); |
| | | memcpy(buf, errString, strlen(errString)); |
| | | |
| | | if ((msg != NULL) && (msg_len != NULL)) { |
| | | *msg = buf; |
| | | *msg_len = strlen(errString); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | return false; |
| | | |
| | | } |
| | |
| | | #ifndef BH_API |
| | | #define BH_API |
| | | #ifndef _BH_API_WRAPPER_ |
| | | #define _BH_API_WRAPPER_ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | #define PRO_DE_SERIALIZE 1 |
| | | |
| | | int BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | |
| | | int BHRegisterTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHQueryTopicAddress(const void *remote, const int remote_len, |
| | | const void *topic, const int topic_len, |
| | | void **reply, int *reply_len, |
| | | int BHQueryTopicAddress(const void *remote, |
| | | const int remote_len, |
| | | const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHQueryProcs(const void *remote, |
| | |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHSubscribeNetTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | | int BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms); |
| | | #else |
| | | int BHPublish(const char *topic, const char *content, const int timeout_ms); |
| | | #endif |
| | | |
| | | int BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | |
| | | |
| | | void BHFree(void *buf, int size); |
| | | |
| | | int BHGetLastError(void **msg, int *msg_len); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | #endif |
| | | #endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */ |
| | | |
| | | |
| | | |
| | |
| | | #define BUS_TIMEOUT_FLAG 1 |
| | | #define BUS_NOWAIT_FLAG 1 << 1 |
| | | |
| | | #define SHM_RES_SIZE 512 |
| | | |
| | | #define SHM_BUS_PROC_MAP_KEY 10 |
| | | #define SHM_BUS_PROC_TCS_MAP_KEY 11 |
| | | #define SHM_BUS_TCS_MAP_KEY 20 |
| | | #define SHM_BUS_PROC_PART_MAP_KEY 30 |
| | | |
| | | #endif |
| | |
| | | "Send to self error", |
| | | "Receive from wrong end", |
| | | "Service stoped", |
| | | "Exceed resource limit" |
| | | "Exceed resource limit", |
| | | "Service not supported", |
| | | "Resource busy", |
| | | "Resource not provide", |
| | | "Invalid parameters" |
| | | |
| | | }; |
| | | |
| | |
| | | char *buf; |
| | | /* Make first caller allocate key for thread-specific data */ |
| | | |
| | | if (err == 0) { |
| | | err = EBUS_BASE; |
| | | } |
| | | |
| | | s = pthread_once(&once, createKey); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_once"); |
| | |
| | | #define EBUS_RECVFROM_WRONG_END 506 |
| | | #define EBUS_STOPED 507 |
| | | #define EBUS_EXCEED_LIMIT 508 |
| | | #define EBUS_RES_UNSUPPORT 509 |
| | | #define EBUS_RES_BUSY 510 |
| | | #define EBUS_RES_NO 511 |
| | | #define EBUS_INVALID_PARA 512 |
| | | |
| | | extern int bus_errno; |
| | | |
| New file |
| | |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "bus_server_socket_wrapper.h" |
| | | |
| | | #include "shm_mm_wrapper.h" |
| | | #include "usg_common.h" |
| | | #include <signal.h> |
| | | #include <limits.h> |
| | | #include <stdio.h> |
| | | #include <errno.h> |
| | | #include <getopt.h> |
| | | #include <stdlib.h> |
| | | |
| | | #define SVR_PORT 5000 |
| | | |
| | | #define TOTAL_THREADS 2 |
| | | |
| | | static void *gBusServer_socket = NULL; |
| | | static void *gServer_socket = NULL; |
| | | |
| | | static int gShm_size = -1; |
| | | static int gPort = -1; |
| | | |
| | | static int gBusServer_act = 0; |
| | | static int gBusServer_stat = 0; |
| | | |
| | | pthread_t tids[2]; |
| | | void *res[2]; |
| | | |
| | | void *bus_start(void *skptr) { |
| | | |
| | | gBusServer_act = 1; |
| | | gBusServer_socket = bus_server_socket_wrapper_open(); |
| | | if (bus_server_socket_wrapper_start_bus(gBusServer_socket) != 0) { |
| | | printf("start bus failed\n"); |
| | | gBusServer_stat = -1; |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | void *svr_start(void *skptr) { |
| | | int port = *(int *)skptr; |
| | | |
| | | gServer_socket = net_mod_server_socket_open(port); |
| | | if(net_mod_server_socket_start(gServer_socket) != 0) { |
| | | printf("start net mod server failed\n"); |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) |
| | | { |
| | | char *endptr; |
| | | char i; |
| | | int val; |
| | | |
| | | sigset_t mask_all, pre; |
| | | sigfillset(&mask_all); |
| | | |
| | | sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | if (argc >= 4) { |
| | | fprintf(stderr, "Usage: %s [shm size] [server port]\n", argv[0]); |
| | | exit(0); |
| | | }; |
| | | |
| | | if (argc >= 2) { |
| | | argc -= 2; |
| | | for (i = 0; i <= argc; i++) { |
| | | errno = 0; |
| | | val = strtol(argv[i + 1], &endptr, 10); |
| | | if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) |
| | | || (errno != 0 && val == 0)) { |
| | | fprintf(stderr, "invalid parameter: %s\n", argv[i + 1]); |
| | | exit(0); |
| | | } |
| | | |
| | | if (endptr == argv[i + 1]) { |
| | | fprintf(stderr, "invalid parameter %s: No digits were found\n", argv[i + 1]); |
| | | exit(0); |
| | | } |
| | | |
| | | if (i == 0) { |
| | | gShm_size = val; |
| | | } else { |
| | | gPort = val; |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (gShm_size == -1) { |
| | | gShm_size = SHM_RES_SIZE; |
| | | } |
| | | shm_mm_wrapper_init(SHM_RES_SIZE); |
| | | |
| | | pthread_create(&tids[0], NULL, bus_start, NULL); |
| | | |
| | | if (gPort == -1) { |
| | | gPort = SVR_PORT; |
| | | } |
| | | |
| | | while(gBusServer_act == 0) { |
| | | sleep(1); |
| | | } |
| | | |
| | | if (gBusServer_stat >= 0) { |
| | | pthread_create(&tids[1], NULL, svr_start, (void *)&gPort); |
| | | } |
| | | |
| | | for (i = 0; i< TOTAL_THREADS; i++) { |
| | | if(pthread_join(tids[i], &res[i]) != 0) { |
| | | perror("bus_proxy pthread_join"); |
| | | } |
| | | } |
| | | |
| | | bus_server_socket_wrapper_close(gBusServer_socket); |
| | | net_mod_socket_close(gServer_socket); |
| | | shm_mm_wrapper_destroy(); |
| | | |
| | | return 0; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | if( ret != 0) { |
| | | logger->error("转发失败 : NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key, bus_strerror(ret)); |
| | | logger->error("fail: NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key, bus_strerror(ret)); |
| | | // 转发失败 |
| | | response_head.code = ret; |
| | | response_head.content_length = 0; |
| | |
| | | FD_CLR(connfd, &pool.read_set); |
| | | pool.clientfd[i] = -1; |
| | | logger->debug("===server close client %d\n", connfd); |
| | | // printf("===server close client %d\n", connfd); |
| | | } |
| | | |
| | | } |
| | |
| | | |
| | | |
| | | NetModSocket::~NetModSocket() { |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | return shmModSocket.force_bind(key); |
| | | } |
| | | |
| | | int NetModSocket::bind_proc_id(char *buf, int len) { |
| | | return shmModSocket.bind_proc_id(buf, len); |
| | | } |
| | | |
| | | int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) { |
| | | |
| | | return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag); |
| | | } |
| | | |
| | | // int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | // return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); |
| | |
| | | |
| | | NetConnPool *mpool = (NetConnPool *)_pool; |
| | | delete mpool; |
| | | logger->debug("destory connPool"); |
| | | |
| | | } |
| | | |
| | | /* One-time key creation function */ |
| | |
| | | return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec); |
| | | } |
| | | |
| | | |
| | | // int pub(char *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, |
| | | int content_size, int msec) { |
| | | int i, connfd; |
| | |
| | | net_mod_err_t err_msg; |
| | | |
| | | // 本地发送 |
| | | if(node_arr == NULL || arrlen == 0) { |
| | | if ((node_arr == NULL) || (arrlen == 0)) { |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0) { |
| | |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ |
| | | return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | int NetModSocket::recvandsend(recvandsend_callback_fn callback, |
| | | const struct timespec *timeout , int flag, void * user_data ) { |
| | |
| | | |
| | | head.mod = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(uint32_t); |
| | | memcpy(head.host, tmp_ptr, sizeof(head.host)); |
| | | |
| | | |
| | | tmp_ptr += sizeof(head.host); |
| | | head.port = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(uint32_t); |
| | | head.key = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(uint32_t); |
| | | head.content_length = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(uint32_t); |
| | | head.topic_length = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(uint32_t); |
| | | head.timeout = ntohl(GET_INT32(tmp_ptr)); |
| | | |
| | | return head; |
| | |
| | | #include "usg_common.h" |
| | | #include "shm_mod_socket.h" |
| | | #include "socket_io.h" |
| | | #include "proc_def.h" |
| | | #include <poll.h> |
| | | #include "socket_def.h" |
| | | #include "net_conn_pool.h" |
| | |
| | | int key; |
| | | }; |
| | | |
| | | #define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 6 * sizeof(uint32_t)) |
| | | #define NET_MODE_REQUEST_HEAD_LENGTH sizeof(net_mod_request_head_t) |
| | | |
| | | |
| | | // 请求头 |
| | |
| | | */ |
| | | int force_bind( int key); |
| | | |
| | | |
| | | |
| | | int bind_proc_id(char *buf, int len); |
| | | int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec); |
| | | int recvfrom_nowait( void **buf, int *size, int *key); |
| | | |
| | | /** |
| | | * 本地发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | |
| | | // return sockt->bind(key); |
| | | } |
| | | |
| | | int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) |
| | | { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | |
| | | return sockt->reg(pData, len, buf, size, timeout_ms, flag); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | |
| | | */ |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto(buf, size, key); |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | // return sockt->sendto(buf, size, key); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto_nowait(buf, size, key); |
| | | } |
| | | |
| | |
| | | int rv; |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | |
| | | logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket)); |
| | | rv = sockt->recvfrom(buf, size, key); |
| | | logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv); |
| | | return rv; |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | // return sockt->recvfrom(buf, size, key); |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | } |
| | | |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1); |
| | | } |
| | | |
| | | int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->bind_proc_id(proc_id, len); |
| | | } |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | | * @timeout 等待时间,单位是千分之一秒 |
| | |
| | | #define __NET_MOD_SOCKET_H__ |
| | | |
| | | #include "net_mod_socket.h" |
| | | #include "proc_def.h" |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | |
| | | */ |
| | | int net_mod_socket_force_bind(void * _socket, int key); |
| | | |
| | | int net_mod_socket_reg(void *_socket, void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); |
| | | |
| | | /** |
| | | * @brief 发送信息,发送完成才返回 |
| | | * |
| New file |
| | |
| | | #ifndef __PROC_DEF_ |
| | | #define __PROC_DEF_ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | #define MAX_STR_LEN 128 //keep the same with serializer in proc check |
| | | #define MIN_STR_LEN 10 |
| | | #define MAX_PROC_NUM 128 |
| | | #define MAX_TOPICS_NUN 60 |
| | | |
| | | #define PROC_REG 1 |
| | | #define PROC_UNREG 2 |
| | | #define PROC_REG_TCS 3 |
| | | #define PROC_QUE_TCS 4 |
| | | #define PROC_QUE_STCS 5 |
| | | #define PROC_QUE_ATCS 6 |
| | | |
| | | #define STR_MAGIC "," |
| | | |
| | | typedef struct _ProcInfo { |
| | | #if 0 |
| | | char ServerID[MAX_STR_LEN]; // 机器ID |
| | | char BoardID[MAX_STR_LEN]; // 板卡ID |
| | | char ServerIP[MAX_STR_LEN]; // 机器IP |
| | | char ProcID[MAX_STR_LEN]; // 进程唯一标识 |
| | | char ProcName[MAX_STR_LEN]; // 进程名称 |
| | | char ProcLabel[MAX_STR_LEN]; // 进程的描述信息,用于区分同一进程名称下多个进程 |
| | | #else |
| | | char proc_id[MAX_STR_LEN]; |
| | | char name[MAX_STR_LEN]; |
| | | char public_info[MAX_STR_LEN]; |
| | | char private_info[MAX_STR_LEN]; |
| | | #endif |
| | | } ProcInfo; |
| | | |
| | | typedef struct _ProcInfo_sum { |
| | | |
| | | ProcInfo procData; |
| | | |
| | | int stat; |
| | | char reg_info[MAX_STR_LEN]; |
| | | char local_info[MAX_STR_LEN]; |
| | | char net_info[MAX_STR_LEN]; |
| | | |
| | | int list_num; |
| | | |
| | | } ProcInfo_sum; |
| | | |
| | | typedef struct _ProcInfo_query { |
| | | |
| | | char name[MAX_STR_LEN]; |
| | | |
| | | int num; |
| | | |
| | | ProcInfo procData; |
| | | |
| | | } ProcInfo_query; |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | |
| | | #endif //end of file |
| | | |
| | | |
| | | |
| | | |
| | |
| | | #include "bus_def.h" |
| | | |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 320 |
| | | |
| | | |
| | | #define LOCK_FREE_Q_ST_OPENED 0 |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) { |
| | | //std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | if (sem_init(&slots, 1, qsize) == -1) |
| | | err_exit(errno, "LockFreeQueue sem_init"); |
| | | if (sem_init(&items, 1, 0) == -1) |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { |
| | | // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); |
| | | if (sem_destroy(&slots) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | sigset_t mask_all, pre; |
| | | sigfillset(&mask_all); |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | if (psem_trywait(&slots) == -1) { |
| | |
| | | |
| | | if (m_qImpl.push(a_data)) { |
| | | psem_post(&items); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | | LABEL_FAILTURE: |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | |
| | |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | sigset_t mask_all, pre; |
| | | sigfillset(&mask_all); |
| | | |
| | | // sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | sigprocmask(SIG_BLOCK, &mask_all, &pre); |
| | | |
| | | if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) { |
| | | if (psem_trywait(&items) == -1) { |
| | |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | psem_post(&slots); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | LABEL_FAILTURE: |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | return errno; |
| | | } |
| | | |
| | |
| | | bool full(); |
| | | bool empty(); |
| | | |
| | | int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0); |
| | | int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0); |
| | | int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); |
| | | int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | ELEM_T &operator[](unsigned i); |
| | | |
| | |
| | | #include <functional> |
| | | #include <set> |
| | | |
| | | #define MAPSIZE 1024 |
| | | #define MAPSIZE 4096 |
| | | |
| | | // 创建Queue数量的上限 |
| | | #define QUEUE_COUNT_LIMIT 300 |
| | |
| | | first = false; |
| | | shmid = shmget(SHM_KEY, 0, 0); |
| | | } |
| | | |
| | | if (shmid == -1) |
| | | err_exit(errno, "mm_init shmget"); |
| | | shmp = shmat(shmid, key_addr, 0); |
| | |
| | | else |
| | | LoggerFactory::getLogger()->debug("shared memory destroy\n"); |
| | | |
| | | LoggerFactory::getLogger()->debug( "mm_destroy: real destroy."); |
| | | |
| | | SemUtil::inc(mutex); |
| | | SemUtil::remove(mutex); |
| | | return true; |
| | |
| | | |
| | | void mm_free_by_key(int key) { |
| | | void *ptr; |
| | | |
| | | ptr = hashtable_get(hashtable, key); |
| | | if(ptr != NULL) { |
| | | mm_free(ptr); |
| | |
| | | #define SHM_QUEUE_ST_CLOSED 1 |
| | | #define SHM_QUEUE_ST_RECYCLED 2 |
| | | |
| | | #define SHM_QUEUE_ST_SET 50 |
| | | |
| | | struct shm_queue_status_t { |
| | | |
| | | int status; |
| | |
| | | |
| | | int shm_mm_alloc_key(); |
| | | |
| | | typedef std::map<SHMString, int, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, int> > > ProcDataZone; |
| | | |
| | | #endif |
| | |
| | | |
| | | #include "bus_server_socket.h" |
| | | #include "shm_mod_socket.h" |
| | | #include "shm_socket.h" |
| | | #include "bus_error.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL) { |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { |
| | | cb(subscripter_set, *set_iter); |
| | | } |
| | | } |
| | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | |
| | | |
| | | |
| | | BusServerSocket::BusServerSocket() { |
| | | logger->debug("BusServerSocket Init"); |
| | | shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | | topic_sub_map = NULL; |
| | | |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int BusServerSocket::start(){ |
| | | int rv; |
| | | |
| | | topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | |
| | | _run_proxy_(); |
| | | return 0; |
| | | rv = _run_proxy_(); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | |
| | |
| | | SHMKeySet *subscripter_set; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL) { |
| | | subscripter_set->clear(); |
| | |
| | | shm_mm_free_by_key(SHM_BUS_MAP_KEY); |
| | | } |
| | | shm_socket_close(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | | return 0; |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | /* |
| | |
| | | */ |
| | | void BusServerSocket::_proxy_sub( char *topic, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | |
| | | struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | SHMKeySet::iterator set_iter; |
| | | //printf("_proxy_sub topic = %s\n", topic); |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | subscripter_set = map_iter->second; |
| | | } else { |
| | |
| | | topic_sub_map->insert({topic, subscripter_set}); |
| | | } |
| | | subscripter_set->insert(key); |
| | | |
| | | } |
| | | |
| | | /* |
| | |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | | // SHMKeySet::iterator set_iter; |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) { |
| | | subscripter_set = map_iter->second; |
| | | subscripter_set->erase(key); |
| | | } |
| | |
| | | /* |
| | | * 处理发布,代理转发 |
| | | */ |
| | | void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) { |
| | | void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) { |
| | | SHMKeySet *subscripter_set; |
| | | |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { |
| | | |
| | | subscripter_set = map_iter->second; |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) { |
| | | send_key = *set_iter; |
| | | rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); |
| | | if(rv == 0) { |
| | |
| | | } |
| | | |
| | | // 删除已关闭的端 |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { |
| | | for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) { |
| | | if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | logger->debug("remove closed subscripter %d \n", send_key); |
| | |
| | | subscripter_to_del.clear(); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | ProcInfo_query *Qurey_object(const char *object, int *length) { |
| | | int flag = 0; |
| | | int val; |
| | | int len; |
| | | int total = 0; |
| | | ProcInfo *Proc_ptr = NULL; |
| | | ProcInfo Data_stru; |
| | | ProcInfo_query *dataBuf = NULL; |
| | | SvrProc *SvrSub_ele; |
| | | SvrTcs::iterator svr_tcs_iter; |
| | | SvrProc::iterator svr_proc_iter; |
| | | ProcZone::iterator proc_iter; |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { |
| | | val = *svr_proc_iter; |
| | | |
| | | if ((proc_iter = proc->find(val)) != proc->end()) { |
| | | |
| | | if (dataBuf == NULL) { |
| | | dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query)); |
| | | if (dataBuf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | total = sizeof(ProcInfo_query); |
| | | } |
| | | |
| | | if (flag == 0) { |
| | | memset(dataBuf, 0x00, sizeof(ProcInfo_query)); |
| | | |
| | | dataBuf->num = 1; |
| | | strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1); |
| | | |
| | | flag = 1; |
| | | |
| | | } else { |
| | | dataBuf->num++; |
| | | len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1); |
| | | dataBuf = (ProcInfo_query *)realloc(dataBuf, len); |
| | | if (dataBuf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | total += sizeof(ProcInfo); |
| | | memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo)); |
| | | } |
| | | |
| | | memset(&Data_stru, 0x00, sizeof(ProcInfo)); |
| | | Data_stru = proc_iter->second; |
| | | |
| | | Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1; |
| | | strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1); |
| | | strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1); |
| | | strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1); |
| | | strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1); |
| | | |
| | | if (length != NULL) |
| | | *length = total; |
| | | } |
| | | } |
| | | } |
| | | |
| | | return dataBuf; |
| | | } |
| | | |
| | | void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) |
| | | { |
| | | char buf_temp[MAX_STR_LEN] = { 0x00 }; |
| | | int count = 0; |
| | | int i = 0; |
| | | int len = 0; |
| | | char *data_ptr; |
| | | ProcInfo Data_stru; |
| | | ProcZone::iterator proc_iter; |
| | | TcsZone *TcsSub_ele; |
| | | ProcDataZone::iterator proc_que_iter; |
| | | ProcTcsMap::iterator proc_tcs_iter; |
| | | SvrProc *SvrSub_ele; |
| | | SvrProc::iterator svr_proc_iter; |
| | | SvrTcs::iterator svr_tcs_iter; |
| | | TcsZone::iterator tcssub_iter; |
| | | ProcPartZone::iterator proc_part_iter; |
| | | |
| | | struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0}; |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG)) { |
| | | |
| | | memset(&Data_stru, 0x00, sizeof(ProcInfo)); |
| | | |
| | | if (buf != NULL) { |
| | | |
| | | memcpy(Data_stru.proc_id, buf, strlen(buf) + 1); |
| | | count = strlen(buf) + 1; |
| | | |
| | | memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | } |
| | | |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); |
| | | ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY); |
| | | if (flag == PROC_REG) { |
| | | if ((proc_iter = proc->find(key)) == proc->end()) { |
| | | proc->insert({key, Data_stru}); |
| | | } |
| | | |
| | | if ((proc_part_iter = procPart->find(key)) == procPart->end()) { |
| | | procPart->insert({key, Data_stru.proc_id}); |
| | | } |
| | | |
| | | if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) { |
| | | procQuePart->insert({Data_stru.proc_id, key}); |
| | | } |
| | | |
| | | } else { |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | |
| | | for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | |
| | | SvrSub_ele->erase(key); |
| | | } |
| | | |
| | | if ((proc_iter = proc->find(key)) != proc->end()) { |
| | | |
| | | len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); |
| | | strncpy(buf_temp, (proc_iter->second).proc_id, len); |
| | | proc->erase(proc_iter); |
| | | |
| | | } |
| | | |
| | | if ((proc_part_iter = procPart->find(key)) != procPart->end()) { |
| | | |
| | | procPart->erase(key); |
| | | } |
| | | |
| | | if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { |
| | | |
| | | procQuePart->erase(buf_temp); |
| | | } |
| | | |
| | | } |
| | | } else if (flag == PROC_REG_TCS) { |
| | | ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | |
| | | if ((proc_tcs_iter = proc->find(key)) != proc->end()) { |
| | | TcsSub_ele = proc_tcs_iter->second; |
| | | } else { |
| | | |
| | | void *ptr_set = mm_malloc(sizeof(TcsZone)); |
| | | TcsSub_ele = new(ptr_set) TcsZone; |
| | | proc->insert({key, TcsSub_ele}); |
| | | } |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); |
| | | while(data_ptr) { |
| | | TcsSub_ele->insert(data_ptr); |
| | | if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | } else { |
| | | |
| | | void *ptr_set = mm_malloc(sizeof(SvrProc)); |
| | | SvrSub_ele = new(ptr_set) SvrProc; |
| | | SvrData->insert({data_ptr, SvrSub_ele}); |
| | | } |
| | | SvrSub_ele->insert(key); |
| | | data_ptr = strtok(NULL, STR_MAGIC); |
| | | } |
| | | |
| | | } else if (flag == PROC_QUE_TCS) { |
| | | |
| | | struct _temp_store { |
| | | void *ptr; |
| | | int total; |
| | | } *temp_store = NULL; |
| | | |
| | | int num = 0; |
| | | int sum = 0; |
| | | |
| | | ProcInfo_query *ret = NULL; |
| | | ProcInfo_query *ret_store = NULL; |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC); |
| | | while(data_ptr) { |
| | | ret = Qurey_object(data_ptr, &len); |
| | | if (ret != NULL) { |
| | | |
| | | if (temp_store == NULL) { |
| | | temp_store = (_temp_store *)malloc(sizeof(_temp_store)); |
| | | if (temp_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | temp_store->ptr = ret; |
| | | temp_store->total = len; |
| | | num = 1; |
| | | |
| | | } else { |
| | | num++; |
| | | temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num); |
| | | if (temp_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | (temp_store + num - 1)->ptr = ret; |
| | | (temp_store + num - 1)->total = len; |
| | | } |
| | | |
| | | } |
| | | data_ptr = strtok(NULL, STR_MAGIC); |
| | | } |
| | | |
| | | if (num > 0) { |
| | | for (count = 0; count < num; count++) { |
| | | |
| | | if (ret_store == NULL) { |
| | | ret_store = (ProcInfo_query *)malloc((temp_store + count)->total); |
| | | if (ret_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | sum = (temp_store + count)->total; |
| | | memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total); |
| | | |
| | | } else { |
| | | |
| | | ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total); |
| | | if (ret_store == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total); |
| | | |
| | | sum += (temp_store + count)->total; |
| | | |
| | | } |
| | | |
| | | free((temp_store + count)->ptr); |
| | | |
| | | } |
| | | |
| | | free(temp_store); |
| | | } |
| | | |
| | | void *last_buf = malloc(sum + sizeof(int)); |
| | | if (last_buf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | *(int *)last_buf = num; |
| | | if (num > 0) { |
| | | memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum); |
| | | free(ret_store); |
| | | } |
| | | |
| | | shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | free(last_buf); |
| | | } else if (flag == PROC_QUE_STCS) { |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { |
| | | SvrSub_ele = svr_tcs_iter->second; |
| | | |
| | | for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { |
| | | count = *svr_proc_iter; |
| | | |
| | | break; |
| | | } |
| | | } else { |
| | | count = 0; |
| | | } |
| | | |
| | | memset(buf_temp, 0x00, sizeof(buf_temp)); |
| | | sprintf(buf_temp, "%d", count); |
| | | shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } else { |
| | | |
| | | int val; |
| | | int temp = 0; |
| | | int pos = 0; |
| | | int size = 0; |
| | | ProcInfo_sum *Data_sum = NULL; |
| | | SHMKeySet *subs_proc; |
| | | SHMKeySet::iterator subs_proc_iter; |
| | | SHMTopicSubMap::iterator subs_iter; |
| | | |
| | | ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY); |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) { |
| | | |
| | | memset(&Data_stru, 0x00, sizeof(Data_stru)); |
| | | |
| | | if (count == 0) { |
| | | Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum)); |
| | | if (Data_sum == NULL) { |
| | | |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | |
| | | exit(1); |
| | | } |
| | | |
| | | count++; |
| | | |
| | | memset(Data_sum, 0x00, sizeof(ProcInfo_sum)); |
| | | |
| | | } else { |
| | | |
| | | count++; |
| | | len = sizeof(ProcInfo_sum) * count; |
| | | Data_sum = (ProcInfo_sum *)realloc(Data_sum, len); |
| | | if (Data_sum == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | |
| | | exit(1); |
| | | } |
| | | |
| | | memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum)); |
| | | } |
| | | |
| | | Data_stru = proc_iter->second; |
| | | |
| | | memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id)); |
| | | memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name)); |
| | | memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info)); |
| | | memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info)); |
| | | |
| | | (Data_sum + count - 1)->stat = 1; |
| | | (Data_sum + count - 1)->list_num = 3; |
| | | |
| | | val = proc_iter->first; |
| | | if ((proc_tcs_iter = procData->find(val)) != procData->end()) { |
| | | TcsSub_ele = proc_tcs_iter->second; |
| | | |
| | | temp = 0; |
| | | pos = 0; |
| | | len = sizeof(Data_sum->reg_info) - 1; |
| | | for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) { |
| | | |
| | | if (temp == 0) { |
| | | strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str())); |
| | | pos += strlen((Data_sum + count - 1)->reg_info); |
| | | len -= strlen((Data_sum + count - 1)->reg_info); |
| | | |
| | | temp++; |
| | | } else { |
| | | |
| | | if (len > 0) { |
| | | strcat((Data_sum + count - 1)->reg_info, ","); |
| | | |
| | | pos += 1; |
| | | len -= 1; |
| | | } |
| | | |
| | | if (len > 0) { |
| | | size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()); |
| | | strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size); |
| | | |
| | | pos += size; |
| | | len -= size; |
| | | } |
| | | } |
| | | } |
| | | |
| | | pos = 0; |
| | | temp = 0; |
| | | len = sizeof(Data_sum->local_info) - 1; |
| | | for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) { |
| | | subs_proc = subs_iter->second; |
| | | |
| | | if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) { |
| | | |
| | | if ((temp == 0)) { |
| | | |
| | | strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str())); |
| | | pos += strlen((Data_sum + count - 1)->local_info); |
| | | len -= strlen((Data_sum + count - 1)->local_info); |
| | | |
| | | temp++; |
| | | } else { |
| | | |
| | | if (len > 0) { |
| | | strcat((Data_sum + count - 1)->local_info, ","); |
| | | |
| | | pos += 1; |
| | | len -= 1; |
| | | } |
| | | |
| | | if (len > 0) { |
| | | size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()); |
| | | strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size); |
| | | |
| | | pos += size; |
| | | len -= size; |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | temp = count * sizeof(ProcInfo_sum); |
| | | void *last_buf = malloc(temp + sizeof(int)); |
| | | if (last_buf == NULL) { |
| | | logger->error("in proxy_reg: Out of memory!\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | *(int *)last_buf = count; |
| | | if (count > 0) { |
| | | memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp); |
| | | free(Data_sum); |
| | | } |
| | | |
| | | shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | } |
| | | |
| | | // 运行代理 |
| | | void * BusServerSocket::_run_proxy_() { |
| | | int BusServerSocket::_run_proxy_() { |
| | | int size; |
| | | int key; |
| | | int flag; |
| | | char * action, *topic, *topics, *buf, *content; |
| | | size_t head_len; |
| | | char resp_buf[128]; |
| | |
| | | int rv; |
| | | char send_buf[512] = { 0x00 }; |
| | | |
| | | const char *topic_delim = ","; |
| | | |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { |
| | | const char *topic_delim = ","; |
| | | while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { |
| | | head = ShmModSocket::decode_bus_head(buf); |
| | | topics = buf + BUS_HEAD_SIZE; |
| | | action = head.action; |
| | | |
| | | if(strcmp(action, "sub") == 0) { |
| | | // 订阅支持多主题订阅 |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | |
| | | _proxy_sub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | | } |
| | | else if(strcmp(action, "desub") == 0) { |
| | | |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | _proxy_desub_all(key); |
| | |
| | | |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | |
| | | _proxy_desub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | |
| | | |
| | | } |
| | | else if(strcmp(action, "pub") == 0) { |
| | | content = topics + head.topic_size; |
| | | _proxy_pub(topics, content, head.content_size, key); |
| | | topics[head.topic_size - 1] = '\0'; |
| | | content = topics + head.topic_size; |
| | | |
| | | } |
| | | _proxy_pub(topics, topics, head.topic_size + head.content_size, key); |
| | | |
| | | } |
| | | else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \ |
| | | || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \ |
| | | || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) { |
| | | content = topics + head.topic_size; |
| | | if (strcmp(action, "reg") == 0) { |
| | | |
| | | flag = PROC_REG; |
| | | |
| | | } else if (strcmp(action, "unreg") == 0) { |
| | | |
| | | flag = PROC_UNREG; |
| | | |
| | | } else if (strcmp(action, "tcsreg") == 0) { |
| | | |
| | | flag = PROC_REG_TCS; |
| | | |
| | | } else if (strcmp(action, "tcsque") == 0) { |
| | | |
| | | flag = PROC_QUE_TCS; |
| | | |
| | | } else if (strcmp(action, "stcsque") == 0) { |
| | | |
| | | flag = PROC_QUE_STCS; |
| | | |
| | | } else { |
| | | |
| | | flag = PROC_QUE_ATCS; |
| | | |
| | | } |
| | | |
| | | _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); |
| | | |
| | | } |
| | | else if (strncmp(buf, "request", strlen("request")) == 0) { |
| | | sprintf(send_buf, "%4d", key); |
| | | strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); |
| | |
| | | } |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | free(buf); |
| | | break; |
| | | } else { |
| | |
| | | } |
| | | |
| | | |
| | | return NULL; |
| | | return rv; |
| | | } |
| | |
| | | typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; |
| | | typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap; |
| | | |
| | | |
| | | class BusServerSocket { |
| | | private: |
| | | shm_socket_t *shm_socket; |
| | |
| | | private: |
| | | int destroy(); |
| | | void _proxy_sub( char *topic, int key); |
| | | void _proxy_pub( char *topic, void *buf, size_t size, int key); |
| | | void *_run_proxy_(); |
| | | void _proxy_pub( char *topic, char *buf, size_t size, int key); |
| | | int _run_proxy_(); |
| | | // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); |
| | | |
| | | void _proxy_desub( char *topic, int key); |
| | | void _proxy_desub_all(int key); |
| | | |
| | | static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); |
| | | void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag); |
| | | |
| | | static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); |
| | | // static bool include_in_keys(int key, int keys[], size_t length); |
| | | |
| | | public: |
| | |
| | | * 创建 |
| | | */ |
| | | void * bus_server_socket_wrapper_open() { |
| | | logger->debug("===bus_server_socket_wrapper_open\n"); |
| | | BusServerSocket *sockt = new BusServerSocket; |
| | | return (void *)sockt; |
| | | } |
| | |
| | | |
| | | BusServerSocket *sockt = (BusServerSocket *)_socket; |
| | | delete sockt; |
| | | logger->debug("===bus_server_socket_wrapper_close\n"); |
| | | } |
| | | |
| | | int bus_server_socket_wrapper_stop(void *_socket) { |
| | |
| | | int ret; |
| | | BusServerSocket *sockt = (BusServerSocket *)_socket; |
| | | |
| | | if( (ret = sockt->force_bind(SHM_BUS_KEY)) == 0) { |
| | | if( (ret = sockt->bind(SHM_BUS_KEY)) == 0) { |
| | | return sockt->start(); |
| | | } else { |
| | | logger->error("start bus failed"); |
| | |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | |
| | | int ShmModSocket::bind_proc_id(char *buf, int len) { |
| | | return shm_socket_bind_proc_id(shm_socket, buf, len); |
| | | } |
| | | |
| | | int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) |
| | | { |
| | | int ret; |
| | | struct timespec ts; |
| | | |
| | | bus_head_t head = {}; |
| | | |
| | | if (flag == PROC_REG) { |
| | | |
| | | memcpy(head.action, "reg", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_UNREG) { |
| | | |
| | | memcpy(head.action, "unreg", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_REG_TCS) { |
| | | |
| | | memcpy(head.action, "tcsreg", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_QUE_TCS) { |
| | | |
| | | memcpy(head.action, "tcsque", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_QUE_STCS) { |
| | | |
| | | memcpy(head.action, "stcsque", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_QUE_ATCS) { |
| | | |
| | | memcpy(head.action, "atcsque", sizeof(head.action)); |
| | | |
| | | } else { |
| | | |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG)) { |
| | | |
| | | head.topic_size = 0; |
| | | |
| | | if (pData != NULL) { |
| | | |
| | | head.content_size = sizeof(ProcInfo); |
| | | |
| | | } else { |
| | | |
| | | head.content_size = 0; |
| | | |
| | | } |
| | | } else { |
| | | |
| | | head.topic_size = len; |
| | | |
| | | head.content_size = 0; |
| | | |
| | | } |
| | | |
| | | void *buf_temp; |
| | | int buf_size; |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG)) { |
| | | |
| | | buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp); |
| | | |
| | | } else { |
| | | |
| | | buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp); |
| | | |
| | | } |
| | | |
| | | if (timeout_ms > 0) { |
| | | |
| | | ts.tv_sec = timeout_ms /1000; |
| | | |
| | | ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000; |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { |
| | | |
| | | ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG); |
| | | |
| | | } else { |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { |
| | | |
| | | ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG); |
| | | |
| | | } else { |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG); |
| | | |
| | | } |
| | | |
| | | } else { |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) { |
| | | |
| | | ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1); |
| | | |
| | | } else { |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | free(buf_temp); |
| | | |
| | | return ret; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); |
| | | |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, |
| | | int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, |
| | | void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ |
| | | int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); |
| | | |
| | |
| | | memcpy(head.action, "pub", sizeof(head.action)); |
| | | head.topic_size = topic_size = strlen(topic) + 1; |
| | | head.content_size = content_size; |
| | | |
| | | void *buf; |
| | | |
| | | void *buf; |
| | | int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | |
| | | char *buf; |
| | | int max_buf_size; |
| | | void *buf_ptr; |
| | | int count = 0; |
| | | if((buf = (char *) malloc(MAXBUF)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); |
| | | exit(1); |
| | |
| | | max_buf_size = MAXBUF; |
| | | } |
| | | |
| | | buf_size = BUS_HEAD_SIZE + content_size + topic_size ; |
| | | buf_size = BUS_HEAD_SIZE + content_size + topic_size; |
| | | if(max_buf_size < buf_size) { |
| | | |
| | | if((buf = (char *) realloc(buf, buf_size)) == NULL) { |
| | |
| | | memcpy(buf, buf_ptr, BUS_HEAD_SIZE); |
| | | if(topic_size != 0 ) |
| | | memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); |
| | | if(content_size != 0) |
| | | memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); |
| | | if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \ |
| | | (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) { |
| | | memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); |
| | | } else { |
| | | if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \ |
| | | strlen("unreg")) == 0)) && (content_buf != NULL)) { |
| | | proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count); |
| | | |
| | | request_head.content_size = count; |
| | | buf_size -= (content_size - count); |
| | | |
| | | } |
| | | } |
| | | |
| | | *retbuf = buf; |
| | | free(buf_ptr); |
| | |
| | | #include "shm_allocator.h" |
| | | #include "shm_mm.h" |
| | | #include "hashtable.h" |
| | | #include "proc_def.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | #include "key_def.h" |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int force_bind(int key); |
| | | |
| | | int bind_proc_id(char *buf, int len); |
| | | int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | |
| | | int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | |
| | | */ |
| | | int get_key() ; |
| | | |
| | | int get_procid(char *buf, int len); |
| | | |
| | | |
| | | }; |
| | | |
| | | |
| | | typedef std::map<int, ProcInfo, std::less<int>, SHM_STL_Allocator<std::pair<int, ProcInfo> > > ProcZone; |
| | | typedef std::set<SHMString, std::less<SHMString>, SHM_STL_Allocator<SHMString> > TcsZone; |
| | | typedef std::map<int, TcsZone *, std::less<int>, SHM_STL_Allocator<std::pair<const int, TcsZone *> > > ProcTcsMap; |
| | | |
| | | #endif |
| | |
| | | #include "shm_socket.h" |
| | | #include "socket_def.h" |
| | | #include "hashtable.h" |
| | | #include "logger_factory.h" |
| | | #include <map> |
| | |
| | | |
| | | int rv, i; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | logger->debug("shm_socket_close\n"); |
| | | |
| | | |
| | | // if(sockt->key != 0) { |
| | | // auto it = shmQueueStMap->find(sockt->key); |
| | |
| | | // it->second.closeTime = time(NULL); |
| | | // } |
| | | // } |
| | | |
| | | |
| | | |
| | | if(sockt->queue != NULL) { |
| | | sockt->queue->close(); |
| | |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) { |
| | | strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len); |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_get_key(shm_socket_t *sockt){ |
| | | return sockt->key; |
| | | } |
| | | |
| | | int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) { |
| | | strncpy(buf, sockt->proc_id, len); |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | // 短连接方式发送 |
| | |
| | | { |
| | | int rv; |
| | | |
| | | logger->debug("%lu destroy threadlocal socket\n", pthread_self()); |
| | | |
| | | if(tmp_socket == NULL) |
| | | return; |
| | | |
| | |
| | | |
| | | |
| | | int rv = 0, tryn = 16; |
| | | static int Counter_suc = 0; |
| | | static int Counter_fail = 0; |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<int, shm_packet_t>::iterator recvbufIter; |
| | |
| | | { |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | | |
| | | rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | } |
| | | |
| | | } |
| | | |
| | | rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | } |
| | | |
| | | sendpak.key = tmp_socket->key; |
| | |
| | | } else { |
| | | // 答非所问,放到缓存里 |
| | | tmp_socket->recvbuf2.insert({recvpak.key, recvpak}); |
| | | exit(0); |
| | | continue; |
| | | } |
| | | } |
| | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | |
| | | LABLE_SUC: |
| | | sockt->key = tmp_socket->key; |
| | | if(recv_buf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | |
| | | |
| | | // 超时导致接发送对象,与返回对象不对应的情况 |
| | | if(send_key != recv_key) { |
| | | // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | |
| | | logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | |
| | | continue; |
| | | } |
| | |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) { |
| | | if(rv == ETIMEDOUT) { |
| | |
| | | *_recvpak = recvpak; |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | void proc_copy(char *dst, void *src, int *counter) { |
| | | int count = 0; |
| | | ProcInfo *ptr = static_cast<ProcInfo *>(src); |
| | | |
| | | memcpy(dst, ptr->proc_id, strlen(ptr->proc_id) + 1); |
| | | count = strlen(ptr->proc_id) + 1; |
| | | memcpy(dst + count, ptr->name, strlen(ptr->name) + 1); |
| | | count += strlen(ptr->name) + 1; |
| | | memcpy(dst + count, ptr->public_info, strlen(ptr->public_info) + 1); |
| | | count += strlen(ptr->public_info) + 1; |
| | | memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1); |
| | | count += strlen(ptr->private_info) + 1; |
| | | |
| | | *counter = count; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | |
| | | #include "usg_common.h" |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | #include "proc_def.h" |
| | | #include "lock_free_queue.h" |
| | | #include <functional> |
| | | |
| | |
| | | #define BUS_ACTION_STOP 1 |
| | | |
| | | typedef struct shm_packet_t { |
| | | int key; |
| | | int key; |
| | | |
| | | size_t size; |
| | | void * buf; |
| | | char uuid[64]; |
| | |
| | | |
| | | typedef struct shm_socket_t { |
| | | shm_socket_type_t socket_type; |
| | | // 本地key |
| | | int key; |
| | | char proc_id[MAX_STR_LEN]; |
| | | bool force_bind; |
| | | pthread_mutex_t mutex; |
| | | |
| | |
| | | int shm_socket_bind(shm_socket_t * socket, int key) ; |
| | | |
| | | int shm_socket_force_bind(shm_socket_t * socket, int key) ; |
| | | |
| | | |
| | | int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len); |
| | | /** |
| | | * @flags : BUS_NOWAIT_FLAG |
| | | */ |
| | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, |
| | | const struct timespec * timeout = NULL, int flags = 0); |
| | | |
| | | typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SvrProc; |
| | | typedef std::map<SHMString, SvrProc *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SvrProc *> > > SvrTcs; |
| | | typedef std::map<int, SHMString, std::less<int>, SHM_STL_Allocator<std::pair<int, const SHMString> > > ProcPartZone; |
| | | /** |
| | | * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data) |
| | | * @recvbuf 收到的数据 |
| | |
| | | const struct timespec *timeout = NULL, int flag = 0, void * user_data = NULL); |
| | | |
| | | |
| | | |
| | | |
| | | void proc_copy(char *dst, void *src, int *count); |
| | | |
| | | #endif |
| | |
| | | union semun arg; |
| | | struct sembuf sop; |
| | | |
| | | |
| | | arg.val = 0; /* So initialize it to 0 */ |
| | | if (semctl(semid, 0, SETVAL, arg) == -1) |
| | | err_exit(errno, "semctl 1"); |
| | |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | |
| | | add_executable(bus_test_inter bus_test_inter.cpp) |
| | | target_link_libraries(bus_test_inter PRIVATE shm_queue ${EXTRA_LIBS} ) |
| | | target_include_directories(bus_test_inter PRIVATE |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | add_executable(bus_test_server_mode bus_test_server_mode.cpp) |
| | | target_link_libraries(bus_test_server_mode PRIVATE shm_queue ${EXTRA_LIBS} ) |
| | | target_include_directories(bus_test_server_mode PRIVATE |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | add_custom_command( |
| | | OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh |
| New file |
| | |
| | | #include "bus_server_socket.h" |
| | | #include "shm_mod_socket.h" |
| | | #include "shm_mm_wrapper.h" |
| | | #include "usg_common.h" |
| | | #include "mm.h" |
| | | #include "logger_factory.h" |
| | | #include <sys/time.h> |
| | | #include <sys/types.h> |
| | | #include <sys/wait.h> |
| | | #include <unistd.h> |
| | | #include "bus_error.h" |
| | | |
| | | #include "bh_api.h" |
| | | #include "proc_def.h" |
| | | |
| | | #define TOTAL_REG_UNREG 2 |
| | | |
| | | #define MAGIC_STR "INTER_" |
| | | #define STR_LEN 30 |
| | | |
| | | pthread_t tids; |
| | | void *res; |
| | | |
| | | static ProcInfo proc_desc; |
| | | |
| | | char *genStr(int length, char *buf) |
| | | { |
| | | int flag, i; |
| | | char *str; |
| | | |
| | | if (length < (strlen(buf) + 10)) { |
| | | length = strlen(buf) + 10; |
| | | } |
| | | |
| | | srand((unsigned) time(NULL)); |
| | | if ((str = (char *)malloc(length + 1)) == NULL) { |
| | | printf("out of memory!\n"); |
| | | |
| | | exit(0); |
| | | } |
| | | |
| | | memset(str, 0x00, length + 1); |
| | | memcpy(str, MAGIC_STR, strlen(MAGIC_STR)); |
| | | strcat(str, buf); |
| | | for (i = strlen(str); i < length; i++) { |
| | | flag = rand() % 3; |
| | | switch (flag) { |
| | | case 0: |
| | | str[i] = 'A' + rand() % 26; |
| | | break; |
| | | |
| | | case 1: |
| | | str[i] = 'a' + rand() % 26; |
| | | break; |
| | | |
| | | default: |
| | | str[i] = '0' + rand() % 10; |
| | | break; |
| | | } |
| | | } |
| | | |
| | | str[length] = '\0'; |
| | | |
| | | return str; |
| | | |
| | | } |
| | | |
| | | void *client_recv(void *skptr) { |
| | | |
| | | pthread_detach(pthread_self()); |
| | | |
| | | void *recvbuf = NULL; |
| | | int recv_len; |
| | | void *proc_id = NULL; |
| | | int id_len; |
| | | int rv; |
| | | void *errBuf = NULL; |
| | | int len; |
| | | char proc_data[200] = { 0x00 }; |
| | | char topics[200] = { 0x00 }; |
| | | |
| | | struct timespec timeout = {2, 0}; |
| | | |
| | | while (true) { |
| | | |
| | | rv = BHReadSub(&proc_id, &id_len, &recvbuf, &recv_len, -1); |
| | | if(rv == true) { |
| | | |
| | | memset(topics, 0x00, sizeof(topics)); |
| | | memset(proc_data, 0x00, sizeof(proc_data)); |
| | | |
| | | memcpy(proc_data, proc_id, (sizeof(proc_data) - 1) > id_len ? id_len : (sizeof(proc_data) - 1)); |
| | | |
| | | memcpy(topics, recvbuf, (sizeof(topics) - 1) > recv_len ? recv_len : (sizeof(topics) - 1)); |
| | | |
| | | printf("Get the sub topics data(%s) from proc id(%s)\n", (char *)topics, (char *)proc_id); |
| | | |
| | | BHFree(recvbuf, len); |
| | | BHFree(proc_id, id_len); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &len); |
| | | printf("the thread recv fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, len); |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | void parseQueryTopicsBuf(void *buf, int len) |
| | | { |
| | | if (buf == NULL) |
| | | return; |
| | | |
| | | int total_topics = *(int *)buf; |
| | | int i, j; |
| | | int buf_pos = 0; |
| | | void *ptr_temp = NULL; |
| | | ProcInfo_query *ptr = NULL; |
| | | ProcInfo *Proc_ptr = NULL; |
| | | |
| | | buf_pos = sizeof(ProcInfo_query); |
| | | ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); |
| | | ptr_temp = (void *)ptr; |
| | | for (i = 0; i < total_topics; i++) { |
| | | printf("topic %s:\n", ptr->name); |
| | | for (j = 0; j < ptr->num; j++) { |
| | | printf("the %dst process info:\n", j + 1); |
| | | Proc_ptr = &(ptr->procData) + j; |
| | | printf("proc_id: %s\n", Proc_ptr->proc_id); |
| | | printf("name: %s\n", Proc_ptr->name); |
| | | printf("public_info: %s\n", Proc_ptr->public_info); |
| | | printf("private_info: %s\n", Proc_ptr->private_info); |
| | | |
| | | } |
| | | |
| | | if (ptr->num > 1) { |
| | | buf_pos += sizeof(ProcInfo) * (ptr->num - 1); |
| | | } |
| | | |
| | | ptr = (ProcInfo_query *)((char *)ptr_temp + buf_pos); |
| | | } |
| | | |
| | | } |
| | | |
| | | void parseQueryProcBuf(void *buf, int len) |
| | | { |
| | | if (buf == NULL) |
| | | return; |
| | | |
| | | int total = *(int *)buf; |
| | | int i; |
| | | ProcInfo_sum *Proc_ptr = NULL; |
| | | |
| | | Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); |
| | | for (i = 0; i < total; i++) { |
| | | printf("proc_id: %s\n", (Proc_ptr + i)->procData.proc_id); |
| | | printf("name: %s\n", (Proc_ptr + i)->procData.name); |
| | | printf("public_info: %s\n", (Proc_ptr + i)->procData.public_info); |
| | | printf("private_info: %s\n", (Proc_ptr + i)->procData.private_info); |
| | | |
| | | printf("service: %s\n", (Proc_ptr + i)->reg_info); |
| | | printf("sub: %s\n", (Proc_ptr + i)->local_info); |
| | | } |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | int ret; |
| | | char *ptr = NULL; |
| | | char buf[] = "Process"; |
| | | void *buf_temp = NULL; |
| | | void *errBuf = NULL; |
| | | void *proc_id = NULL; |
| | | int size; |
| | | int id_len; |
| | | int i; |
| | | char data_buf[200] = { 0x00 }; |
| | | const int timeout_ms = 3000; |
| | | |
| | | memset(&proc_desc, 0x00, sizeof(ProcInfo)); |
| | | ptr = genStr(STR_LEN, buf); |
| | | strncpy(proc_desc.proc_id, ptr, strlen(ptr) + 1); |
| | | //strncpy(proc_desc.proc_id, "Hello", strlen("Hello") + 1); |
| | | free(ptr); |
| | | |
| | | sleep(2); //make rand change |
| | | ptr = genStr(STR_LEN, buf); |
| | | strncpy(proc_desc.name, ptr, strlen(ptr) + 1); |
| | | //strncpy(proc_desc.name, "World", strlen("World") + 1); |
| | | free(ptr); |
| | | |
| | | sleep(2); |
| | | ptr = genStr(STR_LEN, buf); |
| | | //strncpy(proc_desc.public_info, ptr, strlen(ptr) + 1); |
| | | strncpy(proc_desc.public_info, "Good", strlen("Good") + 1); |
| | | free(ptr); |
| | | |
| | | sleep(2); |
| | | ptr = genStr(STR_LEN, buf); |
| | | //strncpy(proc_desc.private_info, ptr, strlen(ptr) + 1); |
| | | //strncpy(proc_desc.private_info, "Bye", strlen("Bye") + 1); |
| | | free(ptr); |
| | | |
| | | printf("before the registered, process info:\n"); |
| | | printf("proc_id: %s\n", proc_desc.proc_id); |
| | | printf("name: %s\n", proc_desc.name); |
| | | printf("public_info: %s\n", proc_desc.public_info); |
| | | printf("private_info: %s\n", proc_desc.private_info); |
| | | |
| | | for (i = 0; i < TOTAL_REG_UNREG; i++) { |
| | | ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered fail with error: %s\n", (char *)errBuf); |
| | | BHFree(errBuf, size); |
| | | |
| | | printf("the second way to get the error log: %s\n", buf_temp); |
| | | |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process unregistered OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process unregistered fail with error: %s\n", (char *)errBuf); |
| | | BHFree(errBuf, size); |
| | | |
| | | printf("the second way to get the error log: %s\n", buf_temp); |
| | | |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | } |
| | | |
| | | //const char *topics_reg_buf1[] = {"topics demo1"}; |
| | | const char *topics_reg_buf1 = "topics demo1"; |
| | | const char *topics_query_buf1 = "topics demo1"; |
| | | const char *topics_query_buf2 = "Hello World,So,Great,Good"; //No space between each other |
| | | //const char *topics_reg_buf2[] = {"Hello World", "So", "Great", "Good"}; |
| | | const char *topics_reg_buf2 = "Hello World,So,Great,Good"; |
| | | |
| | | //const char *topics_sub_buf1[] = {"news"}; |
| | | //const char *topics_sub_buf2[] = {"sports", "balls", "topics demo1"}; |
| | | const char *topics_sub_buf1 = "news"; |
| | | const char *topics_sub_buf2 = "sports,balls,topics demo1"; |
| | | |
| | | const char *topics_pub_topic1 = "news"; |
| | | const char *topics_pub_topic1_data = "boob news"; |
| | | |
| | | const char *topics_pub_topic2 = "balls"; |
| | | const char *topics_pub_topic2_data = "Great volleyballs and basketballs"; |
| | | |
| | | ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered fail with error: %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered topics OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered topics OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered2 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHQueryTopicAddress(NULL, 0, topics_query_buf1, strlen(topics_query_buf1), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process query topics OKay\n"); |
| | | |
| | | parseQueryTopicsBuf(buf_temp, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process query3 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHQueryTopicAddress(NULL, 0, topics_query_buf2, strlen(topics_query_buf2), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process query topics OKay\n"); |
| | | |
| | | parseQueryTopicsBuf(buf_temp, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process query4 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | pthread_create(&tids, NULL, client_recv, NULL); |
| | | |
| | | ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered topics OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHSubscribeTopics(topics_sub_buf1, strlen(topics_sub_buf1), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process subscribe topics OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process sub1 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHSubscribeTopics(topics_sub_buf2, strlen(topics_sub_buf2), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("tthe process subscribe topics OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process sub2 fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered topics OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered10 fail with errorL %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1"; |
| | | const char *topics_server_specific_reg_buf2 = "Server Specific Hello World"; |
| | | void *msgID = NULL; |
| | | int msg_id_len; |
| | | |
| | | ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &msgID, &msg_id_len); |
| | | if (ret == true) { |
| | | printf("the process BHAsyncRequest topics OKay\n"); |
| | | |
| | | BHFree(msgID, msg_id_len); |
| | | |
| | | } else { |
| | | |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process BHAsyncRequest1 topics fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | |
| | | ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), NULL, 0); |
| | | if (ret == true) { |
| | | printf("the process BHAsyncRequest topics OKay\n"); |
| | | |
| | | } else { |
| | | |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process BHAsyncRequest2 topics fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | |
| | | ret = BHRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &proc_id, &id_len, |
| | | &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | |
| | | memset(data_buf, 0x00, sizeof(data_buf)); |
| | | strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1)); |
| | | printf("the process BHRequest topics OKay\n"); |
| | | |
| | | printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id); |
| | | |
| | | |
| | | } else { |
| | | |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process BHRequest topics fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | ret = BHRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &proc_id, &id_len, |
| | | &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | |
| | | memset(data_buf, 0x00, sizeof(data_buf)); |
| | | strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1)); |
| | | printf("the process BHRequest topics OKay\n"); |
| | | |
| | | printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id); |
| | | |
| | | } else { |
| | | |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process BHRequest2 topics fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | #if !defined(PRO_DE_SERIALIZE) |
| | | ret = BHPublish(topics_pub_topic1, topics_pub_topic1_data, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic1, topics_pub_topic1_data); |
| | | |
| | | } else { |
| | | printf("the process published1 fail\n"); |
| | | }; |
| | | |
| | | ret = BHPublish(topics_pub_topic2, topics_pub_topic2_data, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic2, topics_pub_topic2_data); |
| | | |
| | | } else { |
| | | printf("the process published2 fail\n"); |
| | | }; |
| | | #endif |
| | | |
| | | memset(data_buf, 0x00, sizeof(data_buf)); |
| | | strcpy(data_buf, "query the process"); |
| | | ret = BHQueryProcs(NULL, 0, data_buf, strlen(data_buf), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process query all the process data OKay\n"); |
| | | |
| | | parseQueryProcBuf(buf_temp, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process query proc fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | #if 1 |
| | | while(1) { |
| | | sleep(1); |
| | | } |
| | | #else |
| | | /*if the process will exit finally, we must call BHUnregister to release the resources*/ |
| | | ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process unregistered OKay\n"); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process unregistered fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | BHFree(buf_temp, size); |
| | | |
| | | #endif |
| | | |
| | | return 0; |
| | | |
| | | } |
| | | |
| | | |
| New file |
| | |
| | | #include "bus_server_socket.h" |
| | | #include "shm_mod_socket.h" |
| | | #include "shm_mm_wrapper.h" |
| | | #include "usg_common.h" |
| | | #include "mm.h" |
| | | #include "logger_factory.h" |
| | | #include <sys/time.h> |
| | | #include <sys/types.h> |
| | | #include <sys/wait.h> |
| | | #include <unistd.h> |
| | | #include "bus_error.h" |
| | | |
| | | #include "bh_api.h" |
| | | #include "proc_def.h" |
| | | |
| | | static ProcInfo proc_desc; |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | int ret; |
| | | void *buf_temp = NULL; |
| | | void *errBuf = NULL; |
| | | void *proc_id = NULL; |
| | | void *src = NULL; |
| | | void *data_buf = NULL; |
| | | char topics_buf[200] = { 0x00 }; |
| | | int size; |
| | | int id_len; |
| | | int count = 0; |
| | | const int timeout_ms = 3000; |
| | | |
| | | memset(&proc_desc, 0x00, sizeof(ProcInfo)); |
| | | strncpy(proc_desc.proc_id, "Hello", strlen("Hello")); |
| | | strncpy(proc_desc.name, "World", strlen("World")); |
| | | strncpy(proc_desc.public_info, "Good", strlen("Good") + 1); |
| | | |
| | | const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1"; |
| | | const char *topics_server_specific_reg_buf2 = "Server Specific Hello World"; |
| | | |
| | | ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered OKay\n"); |
| | | |
| | | BHFree(buf_temp, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | |
| | | printf("the process registered fail with error: %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | |
| | | ret = BHRegisterTopics(topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered topics OKay\n"); |
| | | |
| | | BHFree(buf_temp, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered1 fail with errorL %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | |
| | | ret = BHRegisterTopics(topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &buf_temp, &size, timeout_ms); |
| | | if (ret == true) { |
| | | printf("the process registered topics OKay\n"); |
| | | |
| | | BHFree(buf_temp, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process registered1 fail with errorL %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | |
| | | while(true) { |
| | | ret = BHReadRequest(&proc_id, &id_len, &buf_temp, &size, &src, -1); |
| | | if (ret == true) { |
| | | |
| | | strncpy(topics_buf, (char *)buf_temp, (sizeof(topics_buf) - 1) > size ? size : (sizeof(topics_buf) - 1)); |
| | | printf("Get data(%s)", topics_buf); |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | strncpy(topics_buf, (char *)proc_id, (sizeof(topics_buf) - 1) > id_len ? id_len : (sizeof(topics_buf) - 1)); |
| | | printf("proc id(%s)", topics_buf); |
| | | |
| | | BHFree(buf_temp, size); |
| | | BHFree(proc_id, id_len); |
| | | |
| | | memset(topics_buf, 0x00, sizeof(topics_buf)); |
| | | sprintf(topics_buf, "Get data count: %d", ++count); |
| | | ret = BHSendReply(src, topics_buf, strlen(topics_buf)); |
| | | if (ret == true) { |
| | | |
| | | printf("the process send reply OKay\n"); |
| | | |
| | | } else { |
| | | |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("the process send reply fail with errorL %s\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | |
| | | BHFree(src, size); |
| | | |
| | | } else { |
| | | BHGetLastError(&errBuf, &size); |
| | | printf("BHReadRequest fail with error(%s)\n", (char *)errBuf); |
| | | |
| | | BHFree(errBuf, size); |
| | | }; |
| | | } |
| | | |
| | | return 0; |
| | | |
| | | } |
| | | |
| | | |