From 5c912c70e9333298ff48f7ea15424f72ca977b99 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 17 九月 2021 09:43:55 +0800
Subject: [PATCH] Add the heartbeat logic feature.
---
src/bus_proxy_start.cpp | 66 ++++
src/queue/array_lock_free_queue.h | 12
src/socket/bus_server_socket.cpp | 209 ++++++++++++++++
src/socket/bus_server_socket_wrapper.h | 5
src/socket/bus_server_socket_wrapper.cpp | 36 ++
src/bus_error.h | 3
src/socket/bus_server_socket.h | 44 +++
src/bh_api.cpp | 287 +++++++++++-----------
src/proc_def.h | 1
src/bus_error.cpp | 46 +++
src/socket/shm_socket.cpp | 13
11 files changed, 554 insertions(+), 168 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 73c7772..635fe95 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -13,6 +13,8 @@
#include "../proto/source/bhome_msg.pb.h"
#include "../proto/source/bhome_msg_api.pb.h"
+#define TIME_WAIT 3
+
static Logger *logger = LoggerFactory::getLogger();
static int gRun_stat = 0;
@@ -20,7 +22,43 @@
static pthread_mutex_t mutex;
-static char errString[100] = { 0x00 };
+static pthread_t gTids;
+
+static void *client_run_check(void *skptr) {
+
+ pthread_detach(pthread_self());
+
+ int data;
+ int sec, nsec;
+ int rv;
+ int key;
+ char buf[MAX_STR_LEN] = { 0x00 };
+ void *buf_temp = NULL;
+ int size;
+
+ sec = TIME_WAIT;
+ nsec = 0;
+ sprintf(buf, "%s", "Success");
+ data = net_mod_socket_int_get(gNetmod_socket);
+ while(true) {
+
+ rv = net_mod_socket_recvfrom(gNetmod_socket, &buf_temp, &size, &key, SVR_STR, data);
+ if (rv == 0) {
+
+ BHFree(buf_temp, size);
+
+ rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
+ if (rv != 0) {
+ logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
+ }
+
+ } else {
+
+ logger->error("the process check failed with error: %s!\n", bus_strerror(rv));
+
+ }
+ }
+}
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
@@ -29,6 +67,7 @@
int count = 0;
void *buf = NULL;
int min = 0;
+ char *errString = NULL;
ProcInfo pData;
#if defined(PRO_DE_SERIALIZE)
@@ -43,10 +82,8 @@
::bhome_msg::ProcInfo input;
if ((!input.ParseFromArray(proc_info, proc_info_len)) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
+ bus_errorset(rv);
+
return false;
}
@@ -58,10 +95,8 @@
#else
if ((proc_info == NULL) || (proc_info_len == 0) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x90, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
+ bus_errorset(rv);
+
return false;
}
#endif
@@ -74,8 +109,7 @@
logger->error("the process has already registered!\n");
rv = EBUS_RES_BUSY;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -136,6 +170,16 @@
}
#endif
+ if (pData.proc_id == NULL) {
+ rv = EBUS_INVALID_PARA;
+
+ bus_errorset(rv);
+
+ pthread_mutex_unlock(&mutex);
+
+ return false;
+ }
+
gNetmod_socket = net_mod_socket_open();
hashtable_t *hashtable = mm_get_hashtable();
key = hashtable_alloc_key(hashtable);
@@ -149,19 +193,19 @@
rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
pthread_mutex_unlock(&mutex);
} else {
rv = EBUS_RES_BUSY;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
+
+ errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
::bhome_msg::MsgCommonReply mcr;
@@ -180,7 +224,9 @@
*reply_len = min;
#endif
-
+
+ pthread_create(&gTids, NULL, client_run_check, NULL);
+
return true;
}
@@ -190,6 +236,7 @@
int rv;
int min;
void *buf = NULL;
+ char *errString = NULL;
#if defined(PRO_DE_SERIALIZE)
struct _ProcInfo_proto
@@ -205,9 +252,7 @@
if(!input.ParseFromArray(proc_info, proc_info_len) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -219,9 +264,7 @@
#else
if ((reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -232,8 +275,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -250,21 +292,19 @@
gRun_stat = 0;
}
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
pthread_mutex_unlock(&mutex);
} else {
rv = EBUS_RES_BUSY;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
+ errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
::bhome_msg::MsgCommonReply mcr;
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -293,6 +333,7 @@
void *buf = NULL;
int total = 0;
int count = 0;
+ char *errString = NULL;
char *topics_buf = NULL;
#if defined(PRO_DE_SERIALIZE)
@@ -306,11 +347,9 @@
if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
- return false;
+ return false;
}
_input.amount = input.topic_list_size();
@@ -327,9 +366,7 @@
if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -341,8 +378,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -351,8 +387,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHRegisterTopics: Out of memory!\n");
@@ -382,10 +417,10 @@
rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS);
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
free(topics_buf);
+
+ bus_errorset(rv);
+ errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
::bhome_msg::MsgCommonReply mcr;
@@ -414,6 +449,7 @@
int min;
void *buf = NULL;
int size;
+ char *errString = NULL;
char topics_buf[MAX_STR_LEN] = { 0x00 };
ProcInfo_query *ptr = NULL;
ProcInfo *Proc_ptr = NULL;
@@ -433,9 +469,7 @@
::bhome_msg::MsgQueryTopic input1;
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -449,9 +483,7 @@
#else
if ((topic == NULL) || (topic_len == 0) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -461,8 +493,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -477,9 +508,7 @@
#endif
rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS);
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
+ bus_errorset(rv);
#if defined(PRO_DE_SERIALIZE)
struct _MsgQueryTopicReply
@@ -510,6 +539,8 @@
mtr_list[i].port = 5000;
}
}
+
+ errString = bus_strerror(0, 1);
::bhome_msg::MsgQueryTopicReply mtr;
mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -554,6 +585,7 @@
void *buf = NULL;
int size;
int min;
+ char *errString = NULL;
ProcInfo_sum *Proc_ptr = NULL;
char data_buf[MAX_STR_LEN] = { 0x00 };
@@ -573,8 +605,7 @@
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -587,8 +618,7 @@
#else
if ((reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -598,9 +628,8 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
+ bus_errorset(rv);
+
return false;
}
@@ -609,9 +638,7 @@
}
rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
#if defined(PRO_DE_SERIALIZE)
struct _MsgQueryProcReply
@@ -657,6 +684,8 @@
}
}
}
+
+ errString = bus_strerror(0, 1);
::bhome_msg::MsgQueryProcReply mpr;
mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -707,6 +736,7 @@
int count = 0;
int len, i;
void *buf = NULL;
+ char *errString = NULL;
char *topics_buf = NULL;
#if defined(PRO_DE_SERIALIZE)
@@ -720,8 +750,7 @@
if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -741,8 +770,7 @@
#else
if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -752,8 +780,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -762,8 +789,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHSubscribeTopics: Out of memory!\n");
@@ -806,8 +832,7 @@
}
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ errString = bus_strerror(0, 1);
free(topics_buf);
@@ -848,6 +873,7 @@
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
int rv;
+ char *errString = NULL;
#if defined(PRO_DE_SERIALIZE)
struct _ProcInfo_proto
@@ -862,9 +888,7 @@
if(!input.ParseFromArray(proc_info,proc_info_len)) {
rv = EBUS_INVALID_PARA;
-
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -875,8 +899,8 @@
_input.private_info = input.private_info().c_str();
rv = 0;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
+ errString = bus_strerror(0, 1);
::bhome_msg::MsgCommonReply mcr;
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -912,8 +936,7 @@
if(!input.ParseFromArray(msgpub, msgpub_len)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -924,8 +947,7 @@
if ((topic == NULL) || (content == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -935,8 +957,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -966,8 +987,7 @@
if (rv > 0)
return true;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -995,16 +1015,14 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
if ((msgpub == NULL) || (msgpub_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1035,8 +1053,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHRequest: Out of memory!\n");
@@ -1052,8 +1069,7 @@
if (data_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHRequest: Out of memory!\n");
@@ -1124,8 +1140,8 @@
} else {
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
+
}
if (rv == 0)
@@ -1169,8 +1185,7 @@
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1186,8 +1201,7 @@
if ((request == NULL) || (request_len == 0)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1197,8 +1211,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1227,8 +1240,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHRequest: Out of memory!\n");
@@ -1267,9 +1279,7 @@
}
}
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
+ bus_errorset(rv);
if((msg_id == NULL) || (msg_id_len == NULL)) {
if (rv == 0)
return true;
@@ -1309,6 +1319,7 @@
net_mod_recv_msg_t *recv_arr;
net_mod_err_t *errarr;
int errarr_size = 0;
+ char *errString = NULL;
char buf_temp[MAX_STR_LEN] = { 0x00 };
char *topics_buf = NULL;
@@ -1338,25 +1349,23 @@
if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
_input0.mq_id = input0.mq_id();
- _input0.abs_addr = input0.abs_addr();
- _input0.ip = input0.ip().c_str();
- _input0.port = input0.port();
- _input1.topic = input1.topic().c_str();
- _input1.data = input1.data().c_str();
+ _input0.abs_addr = input0.abs_addr();
+ _input0.ip = input0.ip().c_str();
+ _input0.port = input0.port();
+ _input1.topic = input1.topic().c_str();
+ _input1.data = input1.data().c_str();
#else
if ((request == NULL) || (request_len == 0) || (reply == NULL) || (reply_len == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1366,8 +1375,7 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1398,8 +1406,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHRequest: Out of memory!\n");
@@ -1463,9 +1470,7 @@
free(topics_buf);
}
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
-
+ bus_errorset(rv);
if (rv == 0) {
if ((proc_id != NULL) && (proc_id_len != NULL)) {
memset(buf_temp, 0x00, sizeof(buf_temp));
@@ -1481,8 +1486,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHRequest: Out of memory!\n");
@@ -1497,6 +1501,8 @@
free(topics_buf);
}
+ errString = bus_strerror(0, 1);
+
#if defined(PRO_DE_SERIALIZE)
if (rv == 0) {
::bhome_msg::MsgRequestTopicReply mrt;
@@ -1541,16 +1547,14 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
if ((request == NULL) || (request_len == 0) || (src == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1594,8 +1598,7 @@
if (topics_buf == NULL) {
rv = EBUS_NO_MEM;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
logger->error("in BHReadRequest: Out of memory!\n");
@@ -1637,8 +1640,7 @@
*src = buf;
}
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
if (rv == 0)
return true;
@@ -1650,6 +1652,8 @@
{
int rv;
int data;
+ int sec = 3;
+ int nsec = 0;
const char *_input;
#if defined(PRO_DE_SERIALIZE)
@@ -1657,8 +1661,7 @@
if (!input.ParseFromArray(reply, reply_len) || (src == NULL)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1669,8 +1672,7 @@
if ((src == NULL) || (reply == NULL) || (reply_len == 0)) {
rv = EBUS_INVALID_PARA;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
@@ -1683,17 +1685,15 @@
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
return false;
}
data = net_mod_socket_svr_get(gNetmod_socket);
- rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data);
+ rv = net_mod_socket_sendto_timeout(gNetmod_socket, _input, strlen(_input), *(int *)src, sec, nsec, SVR_STR, data);
- memset(errString, 0x00, sizeof(errString));
- strncpy(errString, bus_strerror(rv), sizeof(errString));
+ bus_errorset(rv);
if (rv == 0)
return true;
@@ -1713,6 +1713,7 @@
int BHGetLastError(void **msg, int *msg_len)
{
void *buf = NULL;
+ char *errString = bus_strerror(0, 1);
buf = malloc(strlen(errString) + 1);
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index 29d5683..3836ebf 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -49,15 +49,11 @@
}
char *
-bus_strerror(int err)
+bus_strerror(int err, int flag)
{
int s;
char *buf;
/* Make first caller allocate key for thread-specific data */
-
- if (err == 0) {
- err = EBUS_BASE;
- }
s = pthread_once(&once, createKey);
if (s != 0)
@@ -68,15 +64,23 @@
{
/* If first call from this thread, allocate
buffer for thread, and save its location */
- buf = (char *)malloc(MAX_ERROR_LEN);
+ buf = (char *)malloc(MAX_ERROR_LEN + sizeof(int));
if (buf == NULL)
err_exit(errno, "malloc");
+ memset(buf, 0x00, MAX_ERROR_LEN + sizeof(int));
s = pthread_setspecific(strerrorKey, buf);
if (s != 0)
err_exit(s, "pthread_setspecific");
}
+ if (flag != 0) {
+ err = *(int *)(buf + MAX_ERROR_LEN);
+ }
+
+ if (err == 0) {
+ err = EBUS_BASE;
+ }
if(err < EBUS_BASE) {
// libc閿欒
@@ -106,3 +110,33 @@
return buf;
}
+
+void bus_errorset(int err)
+{
+ int s;
+ char *buf;
+ /* Make first caller allocate key for thread-specific data */
+ s = pthread_once(&once, createKey);
+ if (s != 0)
+ err_exit(s, "pthread_once");
+
+ buf = (char *)pthread_getspecific(strerrorKey);
+ if (buf == NULL)
+ {
+ /* If first call from this thread, allocate
+ buffer for thread, and save its location */
+ buf = (char *)malloc(MAX_ERROR_LEN + sizeof(int));
+ if (buf == NULL)
+ err_exit(errno, "malloc");
+
+ s = pthread_setspecific(strerrorKey, buf);
+ if (s != 0)
+ err_exit(s, "pthread_setspecific");
+ }
+
+ *(int *)(buf + MAX_ERROR_LEN) = err;
+
+}
+
+
+
diff --git a/src/bus_error.h b/src/bus_error.h
index e625790..dfcfe06 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -21,6 +21,7 @@
extern int bus_errno;
-char *bus_strerror(int eno) ;
+char *bus_strerror(int eno, int flag = 0);
+void bus_errorset(int err);
#endif
\ No newline at end of file
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
index a04edad..c3104a9 100644
--- a/src/bus_proxy_start.cpp
+++ b/src/bus_proxy_start.cpp
@@ -11,9 +11,13 @@
#include <getopt.h>
#include <stdlib.h>
+using namespace std;
+
#define SVR_PORT 5000
-#define TOTAL_THREADS 2
+#define TOTAL_THREADS 3
+
+#define MAX_RETRIES 3
static void *gBusServer_socket = NULL;
static void *gServer_socket = NULL;
@@ -24,8 +28,10 @@
static int gBusServer_act = 0;
static int gBusServer_stat = 0;
-pthread_t tids[2];
-void *res[2];
+pthread_t tids[TOTAL_THREADS];
+void *res[TOTAL_THREADS];
+
+extern list gLinkedList;
void *bus_start(void *skptr) {
@@ -48,6 +54,58 @@
gServer_socket = net_mod_server_socket_open(port);
if(net_mod_server_socket_start(gServer_socket) != 0) {
printf("start net mod server failed\n");
+ }
+
+ return NULL;
+}
+
+void *check_start(void *skptr) {
+ int i;
+ int ret;
+ int val;
+ int thres;
+ int data;
+ int data_ret;
+ int total;
+ void *buf;
+ int size;
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
+
+ struct timespec timeout = {.tv_sec = 3, .tv_nsec = 0};
+
+ while(true) {
+ total = gLinkedList.NodeNum();
+ for (i = 0; i < total; i++) {
+
+ val = gLinkedList.nodeGet(i);
+ if (val > 0) {
+ data_ret = bus_server_socket_wrapper_data_get(gBusServer_socket, val);
+ thres = gLinkedList.dataGet(val);
+ if ((data_ret == true) && (thres < MAX_RETRIES)) {
+
+ data = gLinkedList.dataFixGet(val);
+ sprintf(buf_temp, "%d", i + 1);
+ ret = bus_server_socket_wrapper_proc_check(gBusServer_socket, data, buf_temp, strlen(buf_temp), &buf, &size, &timeout, BUS_TIMEOUT_FLAG);
+ if (ret == 0) {
+ gLinkedList.dataSet(val, 0x00);
+
+ free(buf);
+ } else {
+
+ gLinkedList.dataSet(val, ++thres);
+ }
+
+ } else {
+
+ gLinkedList.Delete(val);
+ if (thres >= MAX_RETRIES) {
+ bus_server_socket_wrapper_proc_release(gBusServer_socket, val);
+ }
+ }
+ }
+ }
+
+ sleep(10);
}
return NULL;
@@ -110,6 +168,8 @@
if (gBusServer_stat >= 0) {
pthread_create(&tids[1], NULL, svr_start, (void *)&gPort);
+
+ pthread_create(&tids[0], NULL, check_start, NULL);
}
for (i = 0; i< TOTAL_THREADS; i++) {
diff --git a/src/proc_def.h b/src/proc_def.h
index 2b3f57b..1251617 100644
--- a/src/proc_def.h
+++ b/src/proc_def.h
@@ -67,7 +67,6 @@
}
#endif
-#define INT_STR 0x01
#define SVR_STR 0x02
#endif //end of file
diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h
index 24efd10..8e415fa 100644
--- a/src/queue/array_lock_free_queue.h
+++ b/src/queue/array_lock_free_queue.h
@@ -235,7 +235,9 @@
}
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
- AtomicAdd(&m_count, 1);
+ if (m_count < Q_SIZE) {
+ AtomicAdd(&m_count, 1);
+ }
#endif
return true;
}
@@ -275,7 +277,9 @@
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
// m_count.fetch_sub(1);
- AtomicSub(&m_count, 1);
+ if (m_count > 0) {
+ AtomicSub(&m_count, 1);
+ }
#endif
return true;
}
@@ -295,6 +299,7 @@
template<typename ELEM_T, typename Allocator>
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
int currentCount = m_count;
uint32_t currentReadIndex = m_readIndex;
if (i >= currentCount) {
@@ -302,6 +307,9 @@
<< " is out of range\n";
std::exit(EXIT_FAILURE);
}
+#else
+ uint32_t currentReadIndex = m_readIndex;
+#endif
return m_theQueue[countToIndex(currentReadIndex + i)];
}
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index d5e757d..cfb7419 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
static Logger *logger = LoggerFactory::getLogger();
+list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) {
SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
SHMKeySet *subscripter_set;
@@ -296,6 +297,169 @@
return dataBuf;
}
+void list::Insert(int aData, int bData)
+{
+ LinkNode *pHead = NULL;
+ LinkNode *pNew = NULL;
+ LinkNode *pCur = NULL;
+
+ pNew = new(LinkNode);
+ pNew->data = aData;
+ pNew->data_fix = bData;
+ pNew->count = 0;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL) {
+ head = pNew;
+
+ pNew->next = NULL;
+
+ } else {
+ while(pCur->next != NULL) {
+ pCur = pCur->next;
+ }
+
+ pCur->next = pNew;
+ pNew->next = NULL;
+ }
+}
+
+void list::Delete(int data)
+{
+ LinkNode *pHead;
+ LinkNode *pCur;
+ LinkNode *pNext;
+
+ pHead = head;
+ pCur = pHead;
+ if(pHead == NULL)
+ return;
+
+ while((pCur != NULL) && (pCur->data == data)) {
+
+ head = pCur->next;
+
+ delete(pCur);
+
+ pCur = head;
+
+ }
+
+ while((pCur != NULL) && (pCur->next != NULL)) {
+ pNext = pCur->next;
+
+ if(pNext->data == data) {
+ pCur->next = pNext->next;
+ pCur = pNext->next;
+
+ delete(pNext);
+ } else {
+
+ pCur = pNext;
+
+ }
+ }
+}
+
+void list::dataSet(int data, int val)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ pCur->count = val;
+ }
+
+ pCur = pCur->next;
+ }
+}
+
+int list::dataGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->count;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::dataFixGet(int data)
+{
+ LinkNode *pCur;
+
+ pCur = head;
+ if(pCur == NULL)
+ return 0;
+
+ while(pCur != NULL) {
+
+ if(pCur->data == data) {
+ return pCur->data_fix;
+ }
+
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
+int list::NodeNum(void)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while(pCur != NULL) {
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return count;
+}
+
+int list::nodeGet(int index)
+{
+ int count = 0;
+ LinkNode *pCur = head;
+
+ if (pCur == NULL) {
+ return 0;
+ }
+
+ while((pCur != NULL) && (count <= index)) {
+
+ if (count == index) {
+ return pCur->data;
+ }
+
+ ++count;
+ pCur = pCur->next;
+ }
+
+ return 0;
+}
+
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
@@ -340,7 +504,10 @@
memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
count += strlen(buf + count) + 1;
-
+
+ if (flag == PROC_REG) {
+ gLinkedList.Insert(key, atoi(Data_stru.int_info));
+ }
}
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -685,14 +852,44 @@
}
}
+int BusServerSocket::get_data(int val) {
+
+ ProcZone::iterator proc_iter;
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
+
+ if ((proc_iter = proc->find(val)) != proc->end()) {
+ return true;
+ }
+
+ return false;
+
+}
+
+int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag) {
+ int ret;
+
+ ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
+
+ return ret;
+}
+
+void BusServerSocket::remove_proc(int val) {
+ BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
+}
+
// 杩愯浠g悊
int BusServerSocket::_run_proxy_() {
int size;
int key;
int flag;
+ char buf_temp[MAX_STR_LEN] = { 0x00 };
char * action, *topic, *topics, *buf, *content;
size_t head_len;
bus_head_t head;
+ int val;
+ ProcDataZone::iterator proc_que_iter;
+ ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
int rv;
char send_buf[512] = { 0x00 };
@@ -762,6 +959,16 @@
}
+ if (flag == PROC_REG) {
+ memcpy(buf_temp, content, strlen(content) + 1);
+
+ if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
+
+ val = proc_que_iter->second;
+ _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
+ }
+ }
+
_proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
}
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index e60c700..ba6ebe8 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -18,6 +18,44 @@
typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
+typedef struct _LinkNode
+{
+ int data;
+ int data_fix;
+ int count;
+
+ _LinkNode *next;
+} LinkNode;
+
+class list
+{
+
+private:
+
+ LinkNode *head;
+
+public:
+
+ list() {head = NULL;};
+
+ void Insert(int aDate, int bDate);
+
+ void Delete(int Data);
+
+ int dataFixGet(int data);
+
+ int dataGet(int data);
+
+ void dataSet(int data, int val);
+
+ int NodeNum(void);
+
+ int nodeGet(int index);
+
+ LinkNode *getHead() {return head;};
+
+};
+
class BusServerSocket {
private:
shm_socket_t *shm_socket;
@@ -66,6 +104,7 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int start();
+ int get_data(int val);
/**
* 鍋滄bus
@@ -73,8 +112,9 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
int stop();
-
-
+ int check_proc(int val, const void *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag);
+ void remove_proc(int val);
/**
* 鑾峰彇soket key
diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp
index 6b730a9..db8bfe5 100644
--- a/src/socket/bus_server_socket_wrapper.cpp
+++ b/src/socket/bus_server_socket_wrapper.cpp
@@ -40,4 +40,38 @@
return -1;
}
-}
\ No newline at end of file
+}
+
+int bus_server_socket_wrapper_data_get(void * _socket, int val) {
+ int ret;
+ BusServerSocket *sockt = (BusServerSocket *)_socket;
+
+ ret = sockt->get_data(val);
+
+ return ret;
+
+}
+
+int bus_server_socket_wrapper_proc_check(void * _socket, int val, char *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag) {
+ int ret;
+ BusServerSocket *sockt = (BusServerSocket *)_socket;
+
+ ret = sockt->check_proc(val, buf, len, buf_ret, len_ret, timeout, flag);
+
+ return ret;
+
+}
+
+void bus_server_socket_wrapper_proc_release(void * _socket, int val) {
+
+ BusServerSocket *sockt = (BusServerSocket *)_socket;
+
+ sockt->remove_proc(val);
+
+}
+
+
+
+
+
diff --git a/src/socket/bus_server_socket_wrapper.h b/src/socket/bus_server_socket_wrapper.h
index 06a060e..f91ecb8 100644
--- a/src/socket/bus_server_socket_wrapper.h
+++ b/src/socket/bus_server_socket_wrapper.h
@@ -40,7 +40,12 @@
*/
int bus_server_socket_wrapper_start_bus(void * _socket);
+int bus_server_socket_wrapper_data_get(void * _socket, int val);
+int bus_server_socket_wrapper_proc_check(void * _socket, int val, char *buf, int len, void **buf_ret, int *len_ret, \
+ const struct timespec *timeout, const int flag);
+
+void bus_server_socket_wrapper_proc_release(void * _socket, int val);
#ifdef __cplusplus
}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index dc6d752..6705b96 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -46,7 +46,7 @@
void *tmp_ptr = hashtable_get(hashtable, key);
if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) {
- queue = new LockFreeQueue<shm_packet_t>(32);
+ queue = new LockFreeQueue<shm_packet_t>(LOCK_FREE_Q_DEFAULT_SIZE);
hashtable_put(hashtable, key, (void *)queue);
return queue;
} else if(force) {
@@ -76,7 +76,6 @@
int s, type;
pthread_mutexattr_t mtxAttr;
- logger->debug("shm_socket_open\n");
// shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
shm_socket_t *sockt = new shm_socket_t;
sockt->socket_type = socket_type;
@@ -231,7 +230,7 @@
if (rv != 0) {
if(rv == ETIMEDOUT){
- logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
+ logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
return EBUS_TIMEOUT;
}
@@ -275,7 +274,7 @@
if (rv != 0) {
- logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+ logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
return rv;
}
@@ -368,7 +367,6 @@
recvbufIter = sockt->recvbuf.find(uuid);
if(recvbufIter != sockt->recvbuf.end()) {
// 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨�
-logger->debug("get from recvbuf: %s", uuid.c_str());
recvpak = recvbufIter->second;
sockt->recvbuf.erase(recvbufIter);
goto LABLE_SUC;
@@ -382,11 +380,10 @@
return EBUS_TIMEOUT;
}
- logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+ logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
return rv;
}
-logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
if(strlen(recvpak.uuid) == 0) {
continue;
} else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) {
@@ -474,7 +471,7 @@
rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags);
if (rv != 0) {
- logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
+ logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
return rv;
}
--
Gitblit v1.8.0