#include "net_mod_socket_wrapper.h"
|
#include "net_mod_server_socket_wrapper.h"
|
#include "bus_server_socket_wrapper.h"
|
#include "shm_mm_wrapper.h"
|
#include "proc_def.h"
|
#include "usg_common.h"
|
#include "bh_api.h"
|
#include <pthread.h>
|
#include <getopt.h>
|
#include "../proto/source/bhome_msg.pb.h"
|
#include "../proto/source/bhome_msg_api.pb.h"
|
|
static Logger *logger = LoggerFactory::getLogger();
|
|
static int gRun_stat = 0;
|
static void *gNetmod_socket = NULL;
|
|
static pthread_mutex_t mutex;
|
|
static char errString[100] = { 0x00 };
|
|
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
int key;
|
int count = 0;
|
void *buf = NULL;
|
int min = 0;
|
ProcInfo pData;
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _ProcInfo_proto
|
{
|
const char *proc_id;
|
const char *name;
|
const char *public_info;
|
const char *private_info;
|
}_input;
|
|
::bhome_msg::ProcInfo input;
|
if(!input.ParseFromArray(proc_info, proc_info_len)) {
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
_input.proc_id = input.proc_id().c_str();
|
_input.name = input.name().c_str();
|
_input.public_info = input.public_info().c_str();
|
_input.private_info = input.private_info().c_str();
|
|
#else
|
if ((proc_info == NULL) || (proc_info_len == 0)) {
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x90, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
#endif
|
|
memset(&pData, 0x00, sizeof(ProcInfo));
|
if (gRun_stat == 0) {
|
pthread_mutex_init(&mutex, NULL);
|
|
} else {
|
logger->error("the process has already registered!\n");
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
|
gRun_stat = 1;
|
shm_mm_wrapper_init(SHM_RES_SIZE);
|
|
#if defined(PRO_DE_SERIALIZE)
|
if (_input.proc_id != NULL) {
|
count = strlen(_input.proc_id) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.proc_id, _input.proc_id, min);
|
}
|
|
if (_input.name != NULL) {
|
count = strlen(_input.name) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.name, _input.name, min);
|
}
|
|
if (_input.public_info != NULL) {
|
count = strlen(_input.public_info) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.public_info, _input.public_info, min);
|
}
|
|
if (_input.private_info != NULL) {
|
count = strlen(_input.private_info) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.private_info, _input.private_info, min);
|
}
|
#else
|
if (strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) > 0) {
|
count = strlen((const char *)(((ProcInfo *)proc_info)->proc_id)) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.proc_id, ((ProcInfo *)proc_info)->proc_id, min);
|
}
|
|
if (strlen((const char *)(((ProcInfo *)proc_info)->name)) > 0) {
|
count = strlen((const char *)(((ProcInfo *)proc_info)->name)) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.name, ((ProcInfo *)proc_info)->name, min);
|
}
|
|
if (strlen((const char *)(((ProcInfo *)proc_info)->public_info)) > 0) {
|
count = strlen((const char *)(((ProcInfo *)proc_info)->public_info)) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.public_info, ((ProcInfo *)proc_info)->public_info, min);
|
}
|
|
if (strlen((const char *)(((ProcInfo *)proc_info)->private_info)) > 0) {
|
count = strlen((const char *)(((ProcInfo *)proc_info)->private_info)) + 1;
|
min = count > MAX_STR_LEN ? MAX_STR_LEN : count;
|
strncpy(pData.private_info, ((ProcInfo *)proc_info)->private_info, min);
|
}
|
#endif
|
|
gNetmod_socket = net_mod_socket_open();
|
hashtable_t *hashtable = mm_get_hashtable();
|
key = hashtable_alloc_key(hashtable);
|
net_mod_socket_bind(gNetmod_socket, key);
|
|
rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
}
|
|
exit_entry:
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgCommonReply mcr;
|
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mcr.mutable_errmsg()->set_errstring(errString);
|
*reply_len = mcr.ByteSizeLong();
|
*reply = malloc(*reply_len);
|
mcr.SerializePartialToArray(*reply, *reply_len);
|
#else
|
min = strlen(errString) + 1;
|
buf = malloc(min) ;
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
int min;
|
void *buf = NULL;
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _ProcInfo_proto
|
{
|
const char *proc_id;
|
const char *name;
|
const char *public_info;
|
const char *private_info;
|
}_input;
|
|
::bhome_msg::ProcInfo input;
|
|
if(!input.ParseFromArray(proc_info, proc_info_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
_input.proc_id = input.proc_id().c_str();
|
_input.name = input.name().c_str();
|
_input.public_info = input.public_info().c_str();
|
_input.private_info = input.private_info().c_str();
|
#endif
|
|
if (gRun_stat == 0) {
|
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG);
|
if (rv == 0) {
|
|
net_mod_socket_close(gNetmod_socket);
|
|
gNetmod_socket = NULL;
|
|
gRun_stat = 0;
|
|
}
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
exit_entry:
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgCommonReply mcr;
|
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mcr.mutable_errmsg()->set_errstring(errString);
|
*reply_len = mcr.ByteSizeLong();
|
*reply = malloc(*reply_len);
|
mcr.SerializePartialToArray(*reply, *reply_len);
|
#else
|
min = strlen(errString) + 1;
|
buf = malloc(min) ;
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
int min, i;
|
void *buf = NULL;
|
int total = 0;
|
int count = 0;
|
char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _MsgTopicList
|
{
|
int amount;
|
const char *topics[MAX_STR_LEN];
|
}_input;
|
|
::bhome_msg::MsgTopicList input;
|
if(!input.ParseFromArray(topics, topics_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
_input.amount = input.topic_list_size();
|
if (_input.amount > MAX_STR_LEN) {
|
_input.amount = MAX_STR_LEN;
|
}
|
|
for(int i = 0; i < _input.amount; i++) {
|
_input.topics[i] = input.topic_list(i).c_str();
|
}
|
#else
|
if ((topics == NULL) || (topics_len == 0)) {
|
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
#if defined(PRO_DE_SERIALIZE)
|
total = sizeof(topics_buf) / sizeof(char);
|
for (i = 0; i < _input.amount; i++) {
|
min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
|
if (min > 0) {
|
strncpy(topics_buf + count, _input.topics[i], min);
|
count += min;
|
|
if (total >= strlen(_input.topics[i])) {
|
total -= strlen(_input.topics[i]);
|
}
|
|
if ((_input.amount > 1) && (i < (_input.amount - 1))) {
|
strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
|
total -= 1;
|
count++;
|
}
|
} else {
|
topics_buf[strlen(topics_buf) - 1] = '\0';
|
}
|
}
|
|
logger->debug("the parsed compound register topics: %s!\n", topics_buf);
|
#else
|
memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
|
#endif
|
|
rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS);
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
exit_entry:
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgCommonReply mcr;
|
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mcr.mutable_errmsg()->set_errstring(errString);
|
*reply_len = mcr.ByteSizeLong();
|
*reply = malloc(*reply_len);
|
mcr.SerializePartialToArray(*reply, *reply_len);
|
#else
|
min = strlen(errString) + 1;
|
buf = malloc(min) ;
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
int min;
|
void *buf = NULL;
|
int size;
|
char topics_buf[MAX_STR_LEN] = { 0x00 };
|
ProcInfo_query *ptr = NULL;
|
ProcInfo *Proc_ptr = NULL;
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _BHAddress
|
{
|
unsigned long long mq_id;
|
long long abs_addr;
|
const char *ip;
|
int port;
|
}_input0;
|
|
const char *_input1;
|
|
::bhome_msg::BHAddress input0;
|
::bhome_msg::MsgQueryTopic input1;
|
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len)) {
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
_input0.mq_id = input0.mq_id();
|
_input0.abs_addr = input0.abs_addr();
|
_input0.ip = input0.ip().c_str();
|
_input0.port = input0.port();
|
_input1 = input1.topic().c_str();
|
|
#else
|
if ((topic == NULL) || (topic_len == 0)) {
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
|
#if defined(PRO_DE_SERIALIZE)
|
min = (strlen(_input1) > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : strlen(_input1));
|
strncpy(topics_buf, _input1, min);
|
#else
|
min = (topic_len > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : topic_len);
|
buf = const_cast<void *>(topic);
|
strncpy(topics_buf, (const char *)buf, min);
|
#endif
|
rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS);
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
exit_entry:
|
#if defined(PRO_DE_SERIALIZE)
|
|
struct _MsgQueryTopicReply
|
{
|
std::string proc_id;
|
|
unsigned long long mq_id;
|
long long abs_addr;
|
std::string ip;
|
int port;
|
}mtr_list[128];
|
int mtr_list_num = 0;
|
|
if (rv == 0) {
|
|
ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
|
mtr_list_num = ptr->num;
|
|
if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
|
mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
|
}
|
|
for(int i = 0; i < mtr_list_num; i++) {
|
mtr_list[i].proc_id = ptr->procData.proc_id;
|
mtr_list[i].mq_id = 0x00;
|
mtr_list[i].abs_addr = 0x00;
|
mtr_list[i].ip = "192.168.1.1";
|
mtr_list[i].port = 5000;
|
}
|
}
|
|
::bhome_msg::MsgQueryTopicReply mtr;
|
mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mtr.mutable_errmsg()->set_errstring(errString);
|
for(int i = 0; i < mtr_list_num; i++)
|
{
|
::bhome_msg::MsgQueryTopicReply_BHNodeAddress *mtrb = mtr.add_node_address();
|
mtrb->set_proc_id(mtr_list[i].proc_id);
|
mtrb->mutable_addr()->set_mq_id(mtr_list[i].mq_id);
|
mtrb->mutable_addr()->set_abs_addr(mtr_list[i].abs_addr);
|
mtrb->mutable_addr()->set_ip(mtr_list[i].ip);
|
mtrb->mutable_addr()->set_port(mtr_list[i].port);
|
}
|
|
*reply_len = mtr.ByteSizeLong();
|
*reply = malloc(*reply_len);
|
mtr.SerializePartialToArray(*reply, *reply_len);
|
#else
|
if (rv == 0) {
|
*reply = buf;
|
*reply_len = size;
|
|
} else {
|
min = strlen(errString) + 1;
|
buf = malloc(min) ;
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
}
|
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
void *buf = NULL;
|
int size;
|
int min;
|
ProcInfo_sum *Proc_ptr = NULL;
|
char data_buf[MAX_STR_LEN] = { 0x00 };
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _BHAddress
|
{
|
unsigned long long mq_id;
|
long long abs_addr;
|
const char *ip;
|
int port;
|
}_input0;
|
|
const char *_input1;
|
|
::bhome_msg::BHAddress input0;
|
::bhome_msg::MsgQueryProc input1;
|
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
_input0.mq_id = input0.mq_id();
|
_input0.abs_addr = input0.abs_addr();
|
_input0.ip = input0.ip().c_str();
|
_input0.port = input0.port();
|
_input1 = input1.proc_id().c_str();
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
|
if (query != NULL) {
|
strncpy(data_buf, (char *)query, (sizeof(data_buf) - 1) > query_len ? query_len : (sizeof(data_buf) - 1));
|
}
|
|
rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
exit_entry:
|
#if defined(PRO_DE_SERIALIZE)
|
struct _MsgQueryProcReply
|
{
|
std::string proc_id;
|
std::string name;
|
std::string public_info;
|
std::string private_info;
|
|
bool online;
|
|
std::string topic_list[128];
|
int topic_list_num;
|
|
} mpr_list[128];
|
int mpr_list_num = 0;
|
|
if (rv == 0) {
|
|
mpr_list_num = *(int *)buf;
|
|
if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
|
mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
|
}
|
|
Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
|
for(int i = 0; i < mpr_list_num; i++) {
|
mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
|
mpr_list[i].name = (Proc_ptr + i)->procData.name;
|
mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
|
mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
|
mpr_list[i].online = (Proc_ptr + i)->stat;
|
mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
|
|
for(int j = 0; j < mpr_list[i].topic_list_num; j++)
|
{
|
if (j == 0) {
|
mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
|
} else if (j == 1) {
|
mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
|
} else if (j == 2) {
|
mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
|
}
|
}
|
}
|
|
::bhome_msg::MsgQueryProcReply mpr;
|
mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mpr.mutable_errmsg()->set_errstring(errString);
|
|
for(int i = 0; i < mpr_list_num; i++)
|
{
|
::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list();
|
mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id);
|
mpri->mutable_proc()->set_name(mpr_list[i].name);
|
mpri->mutable_proc()->set_public_info(mpr_list[i].public_info);
|
mpri->mutable_proc()->set_private_info(mpr_list[i].private_info);
|
mpri->set_online(mpr_list[i].online);
|
for(int j = 0; j < mpr_list[i].topic_list_num; j++)
|
{
|
mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]);
|
}
|
}
|
|
*reply_len = mpr.ByteSizeLong();
|
*reply = malloc(*reply_len);
|
mpr.SerializePartialToArray(*reply,*reply_len);
|
}
|
#else
|
if (rv == 0) {
|
*reply = buf;
|
*reply_len = size;
|
} else {
|
min = strlen(errString) + 1;
|
buf = malloc(min) ;
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
}
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
|
}
|
|
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
int sec, nsec;
|
int total = 0;
|
int count = 0;
|
int min, i;
|
void *buf = NULL;
|
char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _MsgTopicList
|
{
|
int amount;
|
const char *topics[MAX_STR_LEN];
|
}_input;
|
|
::bhome_msg::MsgTopicList input;
|
if(!input.ParseFromArray(topics, topics_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
_input.amount = input.topic_list_size();
|
|
if (_input.amount > MAX_STR_LEN) {
|
_input.amount = MAX_STR_LEN;
|
}
|
|
for(int i = 0; i < _input.amount; i++)
|
_input.topics[i] = input.topic_list(i).c_str();
|
|
#else
|
if ((topics == NULL) || (topics_len == 0)) {
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
goto exit_entry;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
#if defined(PRO_DE_SERIALIZE)
|
total = sizeof(topics_buf) / sizeof(char);
|
for (i = 0; i < _input.amount; i++) {
|
min = (strlen(_input.topics[i]) > (total - 1) ? (total - 1) : strlen(_input.topics[i]));
|
if (min > 0) {
|
strncpy(topics_buf + count, _input.topics[i], min);
|
count += min;
|
|
if (total >= strlen(_input.topics[i])) {
|
total -= strlen(_input.topics[i]);
|
}
|
|
if ((_input.amount > 1) && (i < (_input.amount - 1))) {
|
strncpy(topics_buf + count, STR_MAGIC, strlen(STR_MAGIC));
|
total -= 1;
|
count++;
|
}
|
} else {
|
topics_buf[strlen(topics_buf) - 1] = '\0';
|
}
|
}
|
logger->debug("the parsed compound sub topics: %s!\n", topics_buf);
|
#else
|
memcpy(topics_buf, topics, topics_len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : topics_len);
|
#endif
|
|
if (timeout_ms > 0) {
|
|
sec = timeout_ms / 1000;
|
nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
|
rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec);
|
|
} else if (timeout_ms == 0) {
|
|
rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
|
|
} else {
|
|
rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1);
|
|
}
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
exit_entry:
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgCommonReply mcr;
|
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mcr.mutable_errmsg()->set_errstring(errString);
|
*reply_len=mcr.ByteSizeLong();
|
*reply=malloc(*reply_len);
|
mcr.SerializePartialToArray(*reply,*reply_len);
|
#else
|
min = strlen(errString) + 1;
|
buf = malloc(min) ;
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
|
}
|
|
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv = BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms);
|
|
return rv;
|
}
|
|
int BHHeartbeatEasy(const int timeout_ms)
|
{
|
|
return true;
|
}
|
|
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _ProcInfo_proto
|
{
|
const char *proc_id;
|
const char *name;
|
const char *public_info;
|
const char *private_info;
|
}_input;
|
|
::bhome_msg::ProcInfo input;
|
if(!input.ParseFromArray(proc_info,proc_info_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
_input.proc_id = input.proc_id().c_str();
|
_input.name = input.name().c_str();
|
_input.public_info = input.public_info().c_str();
|
_input.private_info = input.private_info().c_str();
|
|
rv = 0;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
::bhome_msg::MsgCommonReply mcr;
|
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mcr.mutable_errmsg()->set_errstring(errString);
|
*reply_len=mcr.ByteSizeLong();
|
*reply=malloc(*reply_len);
|
mcr.SerializePartialToArray(*reply,*reply_len);
|
#endif
|
|
return true;
|
}
|
|
#if defined(PRO_DE_SERIALIZE)
|
int BHPublish(const char *msgpub, const int msgpub_len, const int timeout_ms)
|
#else
|
int BHPublish(const char *topic, const char *content, const int timeout_ms)
|
#endif
|
{
|
int rv;
|
int min;
|
void *buf = NULL;
|
net_node_t node_arr;
|
int node_arr_len = 0;
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _MsgPublish
|
{
|
const char *topic;
|
const char *data;
|
}_input;
|
|
::bhome_msg::MsgPublish input;
|
if(!input.ParseFromArray(msgpub, msgpub_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
_input.topic = input.topic().c_str();
|
_input.data = input.data().c_str();
|
#else
|
if ((topic == NULL) || (content == NULL)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
#if defined(PRO_DE_SERIALIZE)
|
if (timeout_ms > 0) {
|
rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data), timeout_ms);
|
} else if (timeout_ms == 0) {
|
rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
|
|
} else {
|
|
rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, _input.topic, strlen(_input.topic), _input.data, strlen(_input.data));
|
}
|
#else
|
if (timeout_ms > 0) {
|
rv = net_mod_socket_pub_timeout(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content), timeout_ms);
|
|
} else if (timeout_ms == 0) {
|
rv = net_mod_socket_pub_nowait(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
|
|
} else {
|
rv = net_mod_socket_pub(gNetmod_socket, &node_arr, node_arr_len, topic, strlen(topic), content, strlen(content));
|
}
|
#endif
|
|
pthread_mutex_unlock(&mutex);
|
|
if (rv > 0)
|
return true;
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
return false;
|
}
|
|
int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms)
|
{
|
int rv;
|
int len;
|
void *buf;
|
int key;
|
int size;
|
int sec, nsec;
|
char topics_buf[MAX_STR_LEN] = { 0x00 };
|
char data_buf[MAX_STR_LEN * 3] = { 0x00 };
|
|
struct _ReadSubReply
|
{
|
std::string proc_id;
|
std::string topic;
|
std::string data;
|
} rsr;
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
if (timeout_ms > 0) {
|
sec = timeout_ms / 1000;
|
nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
|
|
rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
|
|
} else if (timeout_ms == 0) {
|
|
rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
|
|
} else {
|
|
rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
|
}
|
|
if (rv == 0) {
|
|
len = strlen((char *)buf);
|
if (len > size) {
|
len = size;
|
}
|
strncpy(topics_buf, (char *)buf, len > (sizeof(topics_buf) - 1) ? (sizeof(topics_buf) - 1) : len);
|
|
if (len < size) {
|
len = strlen(topics_buf) + 1;
|
|
strncpy(data_buf, (char *)buf + len, size - len);
|
}
|
|
free(buf);
|
|
#if defined(PRO_DE_SERIALIZE)
|
rsr.topic = topics_buf;
|
rsr.data = data_buf;
|
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
sprintf(topics_buf, "%d", key);
|
|
rsr.proc_id = topics_buf;
|
*proc_id_len = rsr.proc_id.size();
|
*proc_id = malloc(*proc_id_len);
|
memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len);
|
|
::bhome_msg::MsgPublish Mp;
|
Mp.set_topic(rsr.topic);
|
Mp.set_data(rsr.data.data());
|
*msgpub_len = Mp.ByteSizeLong();
|
*msgpub = malloc(*msgpub_len);
|
Mp.SerializePartialToArray(*msgpub, *msgpub_len);
|
#else
|
void *ptr;
|
if (len < size) {
|
ptr = malloc(size - len);
|
len = size - len;
|
memcpy(ptr, data_buf, len);
|
} else {
|
ptr = malloc(len);
|
memcpy(ptr, topics_buf, len);
|
}
|
*msgpub = ptr;
|
*msgpub_len = len;
|
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
sprintf(topics_buf, "%d", key);
|
|
*proc_id_len = strlen(topics_buf);
|
*proc_id = malloc(*proc_id_len);
|
memcpy(*proc_id, topics_buf, *proc_id_len);
|
|
#endif
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len)
|
{
|
int rv;
|
void *buf;
|
int size;
|
int val;
|
int len;
|
int min;
|
int sec, nsec;
|
std::string MsgID;
|
int timeout_ms = 3000;
|
char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _BHAddress
|
{
|
unsigned long long mq_id;
|
long long abs_addr;
|
const char *ip;
|
int port;
|
}_input0;
|
|
struct MsgRequestTopic
|
{
|
const char *topic;
|
const char *data;
|
}_input1;
|
|
::bhome_msg::BHAddress input0;
|
::bhome_msg::MsgRequestTopic input1;
|
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
_input0.mq_id = input0.mq_id();
|
_input0.abs_addr = input0.abs_addr();
|
_input0.ip = input0.ip().c_str();
|
_input0.port = input0.port();
|
_input1.topic = input1.topic().c_str();
|
_input1.data = input1.data().c_str();
|
|
#else
|
if ((request == NULL) || (request_len == 0)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
#if defined(PRO_DE_SERIALIZE)
|
strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1));
|
#else
|
strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1));
|
#endif
|
|
rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
|
if (rv == 0) {
|
|
val = atoi((char *)buf);
|
|
free(buf);
|
|
if (val > 0) {
|
|
len = strlen(topics_buf);
|
#if defined(PRO_DE_SERIALIZE)
|
min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
|
strncpy(topics_buf + len + 1, _input1.data, min);
|
len += (min + 1);
|
#endif
|
if (timeout_ms > 0) {
|
|
sec = timeout_ms / 1000;
|
nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
|
|
rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec);
|
|
} else if (timeout_ms == 0) {
|
|
rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val);
|
|
} else {
|
|
rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val);
|
}
|
} else {
|
|
rv = EBUS_RES_UNSUPPORT;
|
|
}
|
}
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
|
if((msg_id == NULL) || (msg_id_len == NULL)) {
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
if (rv == 0) {
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
sprintf(topics_buf, "%d", val);
|
MsgID = topics_buf;
|
|
*msg_id_len = MsgID.size();
|
*msg_id = malloc(*msg_id_len);
|
memcpy(*msg_id, MsgID.data(), *msg_id_len);
|
|
return true;
|
}
|
|
return false;
|
|
}
|
|
int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len,
|
void **reply, int *reply_len, const int timeout_ms)
|
{
|
int rv;
|
void *buf;
|
int size;
|
int val;
|
int min, len;
|
net_node_t node;
|
int node_size;
|
int recv_arr_size;
|
net_mod_recv_msg_t *recv_arr;
|
net_mod_err_t *errarr;
|
int errarr_size = 0;
|
int sec, nsec;
|
char topics_buf[MAX_STR_LEN] = { 0x00 };
|
|
struct _RequestReply
|
{
|
std::string proc_id;
|
std::string data;
|
}rr;
|
|
#if defined(PRO_DE_SERIALIZE)
|
struct _BHAddress
|
{
|
unsigned long long mq_id;
|
long long abs_addr;
|
const char *ip;
|
int port;
|
}_input0;
|
|
struct _MsgRequestTopic
|
{
|
const char *topic;
|
const char *data;
|
}_input1;
|
|
::bhome_msg::BHAddress input0;
|
::bhome_msg::MsgRequestTopic input1;
|
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
_input0.mq_id = input0.mq_id();
|
_input0.abs_addr = input0.abs_addr();
|
_input0.ip = input0.ip().c_str();
|
_input0.port = input0.port();
|
_input1.topic = input1.topic().c_str();
|
_input1.data = input1.data().c_str();
|
|
#else
|
if ((request == NULL) || (request_len == 0)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
#if defined(PRO_DE_SERIALIZE)
|
strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1));
|
#else
|
strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1));
|
#endif
|
|
rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS);
|
if (rv == 0) {
|
val = atoi((char *)buf);
|
|
free(buf);
|
|
if (val > 0) {
|
memset(&node, 0x00, sizeof(node));
|
|
len = strlen(topics_buf);
|
#if defined(PRO_DE_SERIALIZE)
|
min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len );
|
strncpy(topics_buf + len + 1, _input1.data, min);
|
len += (min + 1);
|
#endif
|
|
node.key = val;
|
rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size);
|
if (rv > 0) {
|
if (recv_arr_size > 0) {
|
|
node.key = recv_arr[0].key;
|
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
size = recv_arr[0].content_length;
|
buf = (char *)malloc(size);
|
strncpy((char *)buf, (char *)recv_arr[0].content, size);
|
#if !defined(PRO_DE_SERIALIZE)
|
*reply = buf;
|
*reply_len = size;
|
#endif
|
}
|
|
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
|
|
if(errarr_size > 0) {
|
free(errarr);
|
}
|
|
rv = 0;
|
|
} else {
|
rv = EBUS_TIMEOUT;
|
}
|
|
} else {
|
rv = EBUS_RES_UNSUPPORT;
|
}
|
}
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
if (rv == 0) {
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
sprintf(topics_buf, "%d", node.key);
|
|
rr.proc_id = topics_buf;
|
*proc_id_len = rr.proc_id.size();
|
*proc_id = malloc(*proc_id_len);
|
memcpy(*proc_id, rr.proc_id.data(), *proc_id_len);
|
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
memcpy(topics_buf, buf, size);
|
rr.data = topics_buf;
|
}
|
|
pthread_mutex_unlock(&mutex);
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgRequestTopicReply mrt;
|
mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
|
mrt.mutable_errmsg()->set_errstring(errString);
|
mrt.set_data(rr.data.data());
|
*reply_len = mrt.ByteSizeLong();
|
*reply = malloc(*reply_len);
|
mrt.SerializePartialToArray(*reply, *reply_len);
|
#else
|
if (rv > 0) {
|
min = strlen(errString) + 1;
|
buf = malloc(min);
|
memcpy(buf, errString, strlen(errString));
|
*((char *)buf + min - 1) = '\0';
|
|
*reply = buf;
|
*reply_len = min;
|
}
|
#endif
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms)
|
{
|
int rv;
|
void *buf;
|
int key;
|
int size;
|
int sec, nsec;
|
char topics_buf[MAX_STR_LEN] = { 0x00 };
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
if (timeout_ms > 0) {
|
|
sec = timeout_ms / 1000;
|
nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
|
|
rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
|
|
} else if (timeout_ms == 0) {
|
|
rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
|
|
} else {
|
|
rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
|
}
|
|
if (rv == 0) {
|
struct _ReadRequestReply
|
{
|
std::string proc_id;
|
std::string topic;
|
std::string data;
|
void *src;
|
} rrr;
|
|
sprintf(topics_buf, "%d", key);
|
rrr.proc_id = topics_buf;
|
|
*proc_id_len = rrr.proc_id.size();
|
*proc_id = malloc(*proc_id_len);
|
memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len);
|
|
memset(topics_buf, 0x00, sizeof(topics_buf));
|
memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size);
|
rrr.topic = topics_buf;
|
rrr.data = topics_buf;
|
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgRequestTopic mrt;
|
mrt.set_topic(rrr.topic);
|
mrt.set_data(rrr.data.data());
|
*request_len = mrt.ByteSizeLong();
|
*request = malloc(*request_len);
|
mrt.SerializePartialToArray(*request,*request_len);
|
#else
|
*request = buf;
|
*request_len = size;
|
#endif
|
|
buf = malloc(sizeof(int));
|
*(int *)buf = key;
|
*src = buf;
|
}
|
|
pthread_mutex_unlock(&mutex);
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHSendReply(void *src, const void *reply, const int reply_len)
|
{
|
int rv;
|
|
#if defined(PRO_DE_SERIALIZE)
|
::bhome_msg::MsgRequestTopicReply input;
|
if (!input.ParseFromArray(reply, reply_len)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
const char *_input;
|
_input = input.data().data();
|
|
#else
|
if ((src == NULL) || (reply == NULL) || (reply_len == 0)) {
|
|
rv = EBUS_INVALID_PARA;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
#endif
|
|
if (gRun_stat == 0) {
|
logger->error("the process has not been registered yet!\n");
|
|
rv = EBUS_RES_NO;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
return false;
|
}
|
|
rv = pthread_mutex_trylock(&mutex);
|
if (rv == 0) {
|
|
rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src);
|
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
|
pthread_mutex_unlock(&mutex);
|
} else {
|
|
rv = EBUS_RES_BUSY;
|
memset(errString, 0x00, sizeof(errString));
|
strncpy(errString, bus_strerror(rv), sizeof(errString));
|
}
|
|
if (rv == 0)
|
return true;
|
|
return false;
|
}
|
|
int BHCleanup() {
|
|
return true;
|
}
|
|
void BHFree(void *buf, int size) {
|
free(buf);
|
}
|
|
int BHGetLastError(void **msg, int *msg_len)
|
{
|
void *buf = NULL;
|
|
buf = malloc(strlen(errString) + 1);
|
|
memset(buf, 0x00, strlen(errString) + 1);
|
memcpy(buf, errString, strlen(errString));
|
|
if ((msg != NULL) && (msg_len != NULL)) {
|
*msg = buf;
|
*msg_len = strlen(errString);
|
|
return true;
|
}
|
|
return false;
|
|
}
|