From 5c912c70e9333298ff48f7ea15424f72ca977b99 Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期五, 17 九月 2021 09:43:55 +0800 Subject: [PATCH] Add the heartbeat logic feature. --- src/bh_api.cpp | 287 ++++++++++++++++++++++++++++---------------------------- 1 files changed, 144 insertions(+), 143 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 73c7772..635fe95 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -13,6 +13,8 @@ #include "../proto/source/bhome_msg.pb.h" #include "../proto/source/bhome_msg_api.pb.h" +#define TIME_WAIT 3 + static Logger *logger = LoggerFactory::getLogger(); static int gRun_stat = 0; @@ -20,7 +22,43 @@ static pthread_mutex_t mutex; -static char errString[100] = { 0x00 }; +static pthread_t gTids; + +static void *client_run_check(void *skptr) { + + pthread_detach(pthread_self()); + + int data; + int sec, nsec; + int rv; + int key; + char buf[MAX_STR_LEN] = { 0x00 }; + void *buf_temp = NULL; + int size; + + sec = TIME_WAIT; + nsec = 0; + sprintf(buf, "%s", "Success"); + data = net_mod_socket_int_get(gNetmod_socket); + while(true) { + + rv = net_mod_socket_recvfrom(gNetmod_socket, &buf_temp, &size, &key, SVR_STR, data); + if (rv == 0) { + + BHFree(buf_temp, size); + + rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data); + if (rv != 0) { + logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + } + + } else { + + logger->error("the process check failed with error: %s!\n", bus_strerror(rv)); + + } + } +} int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { @@ -29,6 +67,7 @@ int count = 0; void *buf = NULL; int min = 0; + char *errString = NULL; ProcInfo pData; #if defined(PRO_DE_SERIALIZE) @@ -43,10 +82,8 @@ ::bhome_msg::ProcInfo input; if ((!input.ParseFromArray(proc_info, proc_info_len)) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); + return false; } @@ -58,10 +95,8 @@ #else if ((proc_info == NULL) || (proc_info_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x90, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); + return false; } #endif @@ -74,8 +109,7 @@ logger->error("the process has already registered!\n"); rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -136,6 +170,16 @@ } #endif + if (pData.proc_id == NULL) { + rv = EBUS_INVALID_PARA; + + bus_errorset(rv); + + pthread_mutex_unlock(&mutex); + + return false; + } + gNetmod_socket = net_mod_socket_open(); hashtable_t *hashtable = mm_get_hashtable(); key = hashtable_alloc_key(hashtable); @@ -149,19 +193,19 @@ rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); pthread_mutex_unlock(&mutex); } else { rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } + + errString = bus_strerror(0, 1); #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgCommonReply mcr; @@ -180,7 +224,9 @@ *reply_len = min; #endif - + + pthread_create(&gTids, NULL, client_run_check, NULL); + return true; } @@ -190,6 +236,7 @@ int rv; int min; void *buf = NULL; + char *errString = NULL; #if defined(PRO_DE_SERIALIZE) struct _ProcInfo_proto @@ -205,9 +252,7 @@ if(!input.ParseFromArray(proc_info, proc_info_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -219,9 +264,7 @@ #else if ((reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -232,8 +275,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -250,21 +292,19 @@ gRun_stat = 0; } - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); pthread_mutex_unlock(&mutex); } else { rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } + errString = bus_strerror(0, 1); #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -293,6 +333,7 @@ void *buf = NULL; int total = 0; int count = 0; + char *errString = NULL; char *topics_buf = NULL; #if defined(PRO_DE_SERIALIZE) @@ -306,11 +347,9 @@ if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); - return false; + return false; } _input.amount = input.topic_list_size(); @@ -327,9 +366,7 @@ if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -341,8 +378,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -351,8 +387,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRegisterTopics: Out of memory!\n"); @@ -382,10 +417,10 @@ rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - free(topics_buf); + + bus_errorset(rv); + errString = bus_strerror(0, 1); #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgCommonReply mcr; @@ -414,6 +449,7 @@ int min; void *buf = NULL; int size; + char *errString = NULL; char topics_buf[MAX_STR_LEN] = { 0x00 }; ProcInfo_query *ptr = NULL; ProcInfo *Proc_ptr = NULL; @@ -433,9 +469,7 @@ ::bhome_msg::MsgQueryTopic input1; if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -449,9 +483,7 @@ #else if ((topic == NULL) || (topic_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -461,8 +493,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -477,9 +508,7 @@ #endif rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); #if defined(PRO_DE_SERIALIZE) struct _MsgQueryTopicReply @@ -510,6 +539,8 @@ mtr_list[i].port = 5000; } } + + errString = bus_strerror(0, 1); ::bhome_msg::MsgQueryTopicReply mtr; mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -554,6 +585,7 @@ void *buf = NULL; int size; int min; + char *errString = NULL; ProcInfo_sum *Proc_ptr = NULL; char data_buf[MAX_STR_LEN] = { 0x00 }; @@ -573,8 +605,7 @@ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -587,8 +618,7 @@ #else if ((reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -598,9 +628,8 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); + return false; } @@ -609,9 +638,7 @@ } rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS); - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); #if defined(PRO_DE_SERIALIZE) struct _MsgQueryProcReply @@ -657,6 +684,8 @@ } } } + + errString = bus_strerror(0, 1); ::bhome_msg::MsgQueryProcReply mpr; mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -707,6 +736,7 @@ int count = 0; int len, i; void *buf = NULL; + char *errString = NULL; char *topics_buf = NULL; #if defined(PRO_DE_SERIALIZE) @@ -720,8 +750,7 @@ if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -741,8 +770,7 @@ #else if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -752,8 +780,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -762,8 +789,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHSubscribeTopics: Out of memory!\n"); @@ -806,8 +832,7 @@ } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + errString = bus_strerror(0, 1); free(topics_buf); @@ -848,6 +873,7 @@ int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { int rv; + char *errString = NULL; #if defined(PRO_DE_SERIALIZE) struct _ProcInfo_proto @@ -862,9 +888,7 @@ if(!input.ParseFromArray(proc_info,proc_info_len)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -875,8 +899,8 @@ _input.private_info = input.private_info().c_str(); rv = 0; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); + errString = bus_strerror(0, 1); ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -912,8 +936,7 @@ if(!input.ParseFromArray(msgpub, msgpub_len)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -924,8 +947,7 @@ if ((topic == NULL) || (content == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -935,8 +957,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -966,8 +987,7 @@ if (rv > 0) return true; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -995,16 +1015,14 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } if ((msgpub == NULL) || (msgpub_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1035,8 +1053,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1052,8 +1069,7 @@ if (data_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1124,8 +1140,8 @@ } else { - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); + } if (rv == 0) @@ -1169,8 +1185,7 @@ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1186,8 +1201,7 @@ if ((request == NULL) || (request_len == 0)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1197,8 +1211,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1227,8 +1240,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1267,9 +1279,7 @@ } } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); if((msg_id == NULL) || (msg_id_len == NULL)) { if (rv == 0) return true; @@ -1309,6 +1319,7 @@ net_mod_recv_msg_t *recv_arr; net_mod_err_t *errarr; int errarr_size = 0; + char *errString = NULL; char buf_temp[MAX_STR_LEN] = { 0x00 }; char *topics_buf = NULL; @@ -1338,25 +1349,23 @@ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } _input0.mq_id = input0.mq_id(); - _input0.abs_addr = input0.abs_addr(); - _input0.ip = input0.ip().c_str(); - _input0.port = input0.port(); - _input1.topic = input1.topic().c_str(); - _input1.data = input1.data().c_str(); + _input0.abs_addr = input0.abs_addr(); + _input0.ip = input0.ip().c_str(); + _input0.port = input0.port(); + _input1.topic = input1.topic().c_str(); + _input1.data = input1.data().c_str(); #else if ((request == NULL) || (request_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1366,8 +1375,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1398,8 +1406,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1463,9 +1470,7 @@ free(topics_buf); } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); if (rv == 0) { if ((proc_id != NULL) && (proc_id_len != NULL)) { memset(buf_temp, 0x00, sizeof(buf_temp)); @@ -1481,8 +1486,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1497,6 +1501,8 @@ free(topics_buf); } + errString = bus_strerror(0, 1); + #if defined(PRO_DE_SERIALIZE) if (rv == 0) { ::bhome_msg::MsgRequestTopicReply mrt; @@ -1541,16 +1547,14 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } if ((request == NULL) || (request_len == 0) || (src == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1594,8 +1598,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHReadRequest: Out of memory!\n"); @@ -1637,8 +1640,7 @@ *src = buf; } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); if (rv == 0) return true; @@ -1650,6 +1652,8 @@ { int rv; int data; + int sec = 3; + int nsec = 0; const char *_input; #if defined(PRO_DE_SERIALIZE) @@ -1657,8 +1661,7 @@ if (!input.ParseFromArray(reply, reply_len) || (src == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1669,8 +1672,7 @@ if ((src == NULL) || (reply == NULL) || (reply_len == 0)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1683,17 +1685,15 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } data = net_mod_socket_svr_get(gNetmod_socket); - rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data); + rv = net_mod_socket_sendto_timeout(gNetmod_socket, _input, strlen(_input), *(int *)src, sec, nsec, SVR_STR, data); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); if (rv == 0) return true; @@ -1713,6 +1713,7 @@ int BHGetLastError(void **msg, int *msg_len) { void *buf = NULL; + char *errString = bus_strerror(0, 1); buf = malloc(strlen(errString) + 1); -- Gitblit v1.8.0