From 5c912c70e9333298ff48f7ea15424f72ca977b99 Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期五, 17 九月 2021 09:43:55 +0800 Subject: [PATCH] Add the heartbeat logic feature. --- src/bus_proxy_start.cpp | 66 ++++ src/queue/array_lock_free_queue.h | 12 src/socket/bus_server_socket.cpp | 209 ++++++++++++++++ src/socket/bus_server_socket_wrapper.h | 5 src/socket/bus_server_socket_wrapper.cpp | 36 ++ src/bus_error.h | 3 src/socket/bus_server_socket.h | 44 +++ src/bh_api.cpp | 287 +++++++++++----------- src/proc_def.h | 1 src/bus_error.cpp | 46 +++ src/socket/shm_socket.cpp | 13 11 files changed, 554 insertions(+), 168 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 73c7772..635fe95 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -13,6 +13,8 @@ #include "../proto/source/bhome_msg.pb.h" #include "../proto/source/bhome_msg_api.pb.h" +#define TIME_WAIT 3 + static Logger *logger = LoggerFactory::getLogger(); static int gRun_stat = 0; @@ -20,7 +22,43 @@ static pthread_mutex_t mutex; -static char errString[100] = { 0x00 }; +static pthread_t gTids; + +static void *client_run_check(void *skptr) { + + pthread_detach(pthread_self()); + + int data; + int sec, nsec; + int rv; + int key; + char buf[MAX_STR_LEN] = { 0x00 }; + void *buf_temp = NULL; + int size; + + sec = TIME_WAIT; + nsec = 0; + sprintf(buf, "%s", "Success"); + data = net_mod_socket_int_get(gNetmod_socket); + while(true) { + + rv = net_mod_socket_recvfrom(gNetmod_socket, &buf_temp, &size, &key, SVR_STR, data); + if (rv == 0) { + + BHFree(buf_temp, size); + + rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data); + if (rv != 0) { + logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + } + + } else { + + logger->error("the process check failed with error: %s!\n", bus_strerror(rv)); + + } + } +} int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { @@ -29,6 +67,7 @@ int count = 0; void *buf = NULL; int min = 0; + char *errString = NULL; ProcInfo pData; #if defined(PRO_DE_SERIALIZE) @@ -43,10 +82,8 @@ ::bhome_msg::ProcInfo input; if ((!input.ParseFromArray(proc_info, proc_info_len)) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); + return false; } @@ -58,10 +95,8 @@ #else if ((proc_info == NULL) || (proc_info_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x90, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); + return false; } #endif @@ -74,8 +109,7 @@ logger->error("the process has already registered!\n"); rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -136,6 +170,16 @@ } #endif + if (pData.proc_id == NULL) { + rv = EBUS_INVALID_PARA; + + bus_errorset(rv); + + pthread_mutex_unlock(&mutex); + + return false; + } + gNetmod_socket = net_mod_socket_open(); hashtable_t *hashtable = mm_get_hashtable(); key = hashtable_alloc_key(hashtable); @@ -149,19 +193,19 @@ rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); pthread_mutex_unlock(&mutex); } else { rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } + + errString = bus_strerror(0, 1); #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgCommonReply mcr; @@ -180,7 +224,9 @@ *reply_len = min; #endif - + + pthread_create(&gTids, NULL, client_run_check, NULL); + return true; } @@ -190,6 +236,7 @@ int rv; int min; void *buf = NULL; + char *errString = NULL; #if defined(PRO_DE_SERIALIZE) struct _ProcInfo_proto @@ -205,9 +252,7 @@ if(!input.ParseFromArray(proc_info, proc_info_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -219,9 +264,7 @@ #else if ((reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -232,8 +275,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -250,21 +292,19 @@ gRun_stat = 0; } - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); pthread_mutex_unlock(&mutex); } else { rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } + errString = bus_strerror(0, 1); #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -293,6 +333,7 @@ void *buf = NULL; int total = 0; int count = 0; + char *errString = NULL; char *topics_buf = NULL; #if defined(PRO_DE_SERIALIZE) @@ -306,11 +347,9 @@ if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); - return false; + return false; } _input.amount = input.topic_list_size(); @@ -327,9 +366,7 @@ if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -341,8 +378,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -351,8 +387,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRegisterTopics: Out of memory!\n"); @@ -382,10 +417,10 @@ rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - free(topics_buf); + + bus_errorset(rv); + errString = bus_strerror(0, 1); #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgCommonReply mcr; @@ -414,6 +449,7 @@ int min; void *buf = NULL; int size; + char *errString = NULL; char topics_buf[MAX_STR_LEN] = { 0x00 }; ProcInfo_query *ptr = NULL; ProcInfo *Proc_ptr = NULL; @@ -433,9 +469,7 @@ ::bhome_msg::MsgQueryTopic input1; if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(topic, topic_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -449,9 +483,7 @@ #else if ((topic == NULL) || (topic_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -461,8 +493,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -477,9 +508,7 @@ #endif rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); #if defined(PRO_DE_SERIALIZE) struct _MsgQueryTopicReply @@ -510,6 +539,8 @@ mtr_list[i].port = 5000; } } + + errString = bus_strerror(0, 1); ::bhome_msg::MsgQueryTopicReply mtr; mtr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -554,6 +585,7 @@ void *buf = NULL; int size; int min; + char *errString = NULL; ProcInfo_sum *Proc_ptr = NULL; char data_buf[MAX_STR_LEN] = { 0x00 }; @@ -573,8 +605,7 @@ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(query, query_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -587,8 +618,7 @@ #else if ((reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -598,9 +628,8 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); + return false; } @@ -609,9 +638,7 @@ } rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS); - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); #if defined(PRO_DE_SERIALIZE) struct _MsgQueryProcReply @@ -657,6 +684,8 @@ } } } + + errString = bus_strerror(0, 1); ::bhome_msg::MsgQueryProcReply mpr; mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -707,6 +736,7 @@ int count = 0; int len, i; void *buf = NULL; + char *errString = NULL; char *topics_buf = NULL; #if defined(PRO_DE_SERIALIZE) @@ -720,8 +750,7 @@ if(!input.ParseFromArray(topics, topics_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -741,8 +770,7 @@ #else if ((topics == NULL) || (topics_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -752,8 +780,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -762,8 +789,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHSubscribeTopics: Out of memory!\n"); @@ -806,8 +832,7 @@ } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + errString = bus_strerror(0, 1); free(topics_buf); @@ -848,6 +873,7 @@ int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { int rv; + char *errString = NULL; #if defined(PRO_DE_SERIALIZE) struct _ProcInfo_proto @@ -862,9 +888,7 @@ if(!input.ParseFromArray(proc_info,proc_info_len)) { rv = EBUS_INVALID_PARA; - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -875,8 +899,8 @@ _input.private_info = input.private_info().c_str(); rv = 0; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); + errString = bus_strerror(0, 1); ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); @@ -912,8 +936,7 @@ if(!input.ParseFromArray(msgpub, msgpub_len)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -924,8 +947,7 @@ if ((topic == NULL) || (content == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -935,8 +957,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -966,8 +987,7 @@ if (rv > 0) return true; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -995,16 +1015,14 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } if ((msgpub == NULL) || (msgpub_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1035,8 +1053,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1052,8 +1069,7 @@ if (data_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1124,8 +1140,8 @@ } else { - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); + } if (rv == 0) @@ -1169,8 +1185,7 @@ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1186,8 +1201,7 @@ if ((request == NULL) || (request_len == 0)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1197,8 +1211,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1227,8 +1240,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1267,9 +1279,7 @@ } } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); if((msg_id == NULL) || (msg_id_len == NULL)) { if (rv == 0) return true; @@ -1309,6 +1319,7 @@ net_mod_recv_msg_t *recv_arr; net_mod_err_t *errarr; int errarr_size = 0; + char *errString = NULL; char buf_temp[MAX_STR_LEN] = { 0x00 }; char *topics_buf = NULL; @@ -1338,25 +1349,23 @@ if (!input0.ParseFromArray(remote, remote_len) || !input1.ParseFromArray(request, request_len) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } _input0.mq_id = input0.mq_id(); - _input0.abs_addr = input0.abs_addr(); - _input0.ip = input0.ip().c_str(); - _input0.port = input0.port(); - _input1.topic = input1.topic().c_str(); - _input1.data = input1.data().c_str(); + _input0.abs_addr = input0.abs_addr(); + _input0.ip = input0.ip().c_str(); + _input0.port = input0.port(); + _input1.topic = input1.topic().c_str(); + _input1.data = input1.data().c_str(); #else if ((request == NULL) || (request_len == 0) || (reply == NULL) || (reply_len == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1366,8 +1375,7 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1398,8 +1406,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1463,9 +1470,7 @@ free(topics_buf); } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - + bus_errorset(rv); if (rv == 0) { if ((proc_id != NULL) && (proc_id_len != NULL)) { memset(buf_temp, 0x00, sizeof(buf_temp)); @@ -1481,8 +1486,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHRequest: Out of memory!\n"); @@ -1497,6 +1501,8 @@ free(topics_buf); } + errString = bus_strerror(0, 1); + #if defined(PRO_DE_SERIALIZE) if (rv == 0) { ::bhome_msg::MsgRequestTopicReply mrt; @@ -1541,16 +1547,14 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } if ((request == NULL) || (request_len == 0) || (src == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1594,8 +1598,7 @@ if (topics_buf == NULL) { rv = EBUS_NO_MEM; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); logger->error("in BHReadRequest: Out of memory!\n"); @@ -1637,8 +1640,7 @@ *src = buf; } - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); if (rv == 0) return true; @@ -1650,6 +1652,8 @@ { int rv; int data; + int sec = 3; + int nsec = 0; const char *_input; #if defined(PRO_DE_SERIALIZE) @@ -1657,8 +1661,7 @@ if (!input.ParseFromArray(reply, reply_len) || (src == NULL)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1669,8 +1672,7 @@ if ((src == NULL) || (reply == NULL) || (reply_len == 0)) { rv = EBUS_INVALID_PARA; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } @@ -1683,17 +1685,15 @@ logger->error("the process has not been registered yet!\n"); rv = EBUS_RES_NO; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); return false; } data = net_mod_socket_svr_get(gNetmod_socket); - rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data); + rv = net_mod_socket_sendto_timeout(gNetmod_socket, _input, strlen(_input), *(int *)src, sec, nsec, SVR_STR, data); - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + bus_errorset(rv); if (rv == 0) return true; @@ -1713,6 +1713,7 @@ int BHGetLastError(void **msg, int *msg_len) { void *buf = NULL; + char *errString = bus_strerror(0, 1); buf = malloc(strlen(errString) + 1); diff --git a/src/bus_error.cpp b/src/bus_error.cpp index 29d5683..3836ebf 100644 --- a/src/bus_error.cpp +++ b/src/bus_error.cpp @@ -49,15 +49,11 @@ } char * -bus_strerror(int err) +bus_strerror(int err, int flag) { int s; char *buf; /* Make first caller allocate key for thread-specific data */ - - if (err == 0) { - err = EBUS_BASE; - } s = pthread_once(&once, createKey); if (s != 0) @@ -68,15 +64,23 @@ { /* If first call from this thread, allocate buffer for thread, and save its location */ - buf = (char *)malloc(MAX_ERROR_LEN); + buf = (char *)malloc(MAX_ERROR_LEN + sizeof(int)); if (buf == NULL) err_exit(errno, "malloc"); + memset(buf, 0x00, MAX_ERROR_LEN + sizeof(int)); s = pthread_setspecific(strerrorKey, buf); if (s != 0) err_exit(s, "pthread_setspecific"); } + if (flag != 0) { + err = *(int *)(buf + MAX_ERROR_LEN); + } + + if (err == 0) { + err = EBUS_BASE; + } if(err < EBUS_BASE) { // libc閿欒 @@ -106,3 +110,33 @@ return buf; } + +void bus_errorset(int err) +{ + int s; + char *buf; + /* Make first caller allocate key for thread-specific data */ + s = pthread_once(&once, createKey); + if (s != 0) + err_exit(s, "pthread_once"); + + buf = (char *)pthread_getspecific(strerrorKey); + if (buf == NULL) + { + /* If first call from this thread, allocate + buffer for thread, and save its location */ + buf = (char *)malloc(MAX_ERROR_LEN + sizeof(int)); + if (buf == NULL) + err_exit(errno, "malloc"); + + s = pthread_setspecific(strerrorKey, buf); + if (s != 0) + err_exit(s, "pthread_setspecific"); + } + + *(int *)(buf + MAX_ERROR_LEN) = err; + +} + + + diff --git a/src/bus_error.h b/src/bus_error.h index e625790..dfcfe06 100644 --- a/src/bus_error.h +++ b/src/bus_error.h @@ -21,6 +21,7 @@ extern int bus_errno; -char *bus_strerror(int eno) ; +char *bus_strerror(int eno, int flag = 0); +void bus_errorset(int err); #endif \ No newline at end of file diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp index a04edad..c3104a9 100644 --- a/src/bus_proxy_start.cpp +++ b/src/bus_proxy_start.cpp @@ -11,9 +11,13 @@ #include <getopt.h> #include <stdlib.h> +using namespace std; + #define SVR_PORT 5000 -#define TOTAL_THREADS 2 +#define TOTAL_THREADS 3 + +#define MAX_RETRIES 3 static void *gBusServer_socket = NULL; static void *gServer_socket = NULL; @@ -24,8 +28,10 @@ static int gBusServer_act = 0; static int gBusServer_stat = 0; -pthread_t tids[2]; -void *res[2]; +pthread_t tids[TOTAL_THREADS]; +void *res[TOTAL_THREADS]; + +extern list gLinkedList; void *bus_start(void *skptr) { @@ -48,6 +54,58 @@ gServer_socket = net_mod_server_socket_open(port); if(net_mod_server_socket_start(gServer_socket) != 0) { printf("start net mod server failed\n"); + } + + return NULL; +} + +void *check_start(void *skptr) { + int i; + int ret; + int val; + int thres; + int data; + int data_ret; + int total; + void *buf; + int size; + char buf_temp[MAX_STR_LEN] = { 0x00 }; + + struct timespec timeout = {.tv_sec = 3, .tv_nsec = 0}; + + while(true) { + total = gLinkedList.NodeNum(); + for (i = 0; i < total; i++) { + + val = gLinkedList.nodeGet(i); + if (val > 0) { + data_ret = bus_server_socket_wrapper_data_get(gBusServer_socket, val); + thres = gLinkedList.dataGet(val); + if ((data_ret == true) && (thres < MAX_RETRIES)) { + + data = gLinkedList.dataFixGet(val); + sprintf(buf_temp, "%d", i + 1); + ret = bus_server_socket_wrapper_proc_check(gBusServer_socket, data, buf_temp, strlen(buf_temp), &buf, &size, &timeout, BUS_TIMEOUT_FLAG); + if (ret == 0) { + gLinkedList.dataSet(val, 0x00); + + free(buf); + } else { + + gLinkedList.dataSet(val, ++thres); + } + + } else { + + gLinkedList.Delete(val); + if (thres >= MAX_RETRIES) { + bus_server_socket_wrapper_proc_release(gBusServer_socket, val); + } + } + } + } + + sleep(10); } return NULL; @@ -110,6 +168,8 @@ if (gBusServer_stat >= 0) { pthread_create(&tids[1], NULL, svr_start, (void *)&gPort); + + pthread_create(&tids[0], NULL, check_start, NULL); } for (i = 0; i< TOTAL_THREADS; i++) { diff --git a/src/proc_def.h b/src/proc_def.h index 2b3f57b..1251617 100644 --- a/src/proc_def.h +++ b/src/proc_def.h @@ -67,7 +67,6 @@ } #endif -#define INT_STR 0x01 #define SVR_STR 0x02 #endif //end of file diff --git a/src/queue/array_lock_free_queue.h b/src/queue/array_lock_free_queue.h index 24efd10..8e415fa 100644 --- a/src/queue/array_lock_free_queue.h +++ b/src/queue/array_lock_free_queue.h @@ -235,7 +235,9 @@ } #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - AtomicAdd(&m_count, 1); + if (m_count < Q_SIZE) { + AtomicAdd(&m_count, 1); + } #endif return true; } @@ -275,7 +277,9 @@ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE // m_count.fetch_sub(1); - AtomicSub(&m_count, 1); + if (m_count > 0) { + AtomicSub(&m_count, 1); + } #endif return true; } @@ -295,6 +299,7 @@ template<typename ELEM_T, typename Allocator> ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) { +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE int currentCount = m_count; uint32_t currentReadIndex = m_readIndex; if (i >= currentCount) { @@ -302,6 +307,9 @@ << " is out of range\n"; std::exit(EXIT_FAILURE); } +#else + uint32_t currentReadIndex = m_readIndex; +#endif return m_theQueue[countToIndex(currentReadIndex + i)]; } diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index d5e757d..cfb7419 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -6,6 +6,7 @@ static Logger *logger = LoggerFactory::getLogger(); +list gLinkedList; void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); SHMKeySet *subscripter_set; @@ -296,6 +297,169 @@ return dataBuf; } +void list::Insert(int aData, int bData) +{ + LinkNode *pHead = NULL; + LinkNode *pNew = NULL; + LinkNode *pCur = NULL; + + pNew = new(LinkNode); + pNew->data = aData; + pNew->data_fix = bData; + pNew->count = 0; + + pHead = head; + pCur = pHead; + if(pHead == NULL) { + head = pNew; + + pNew->next = NULL; + + } else { + while(pCur->next != NULL) { + pCur = pCur->next; + } + + pCur->next = pNew; + pNew->next = NULL; + } +} + +void list::Delete(int data) +{ + LinkNode *pHead; + LinkNode *pCur; + LinkNode *pNext; + + pHead = head; + pCur = pHead; + if(pHead == NULL) + return; + + while((pCur != NULL) && (pCur->data == data)) { + + head = pCur->next; + + delete(pCur); + + pCur = head; + + } + + while((pCur != NULL) && (pCur->next != NULL)) { + pNext = pCur->next; + + if(pNext->data == data) { + pCur->next = pNext->next; + pCur = pNext->next; + + delete(pNext); + } else { + + pCur = pNext; + + } + } +} + +void list::dataSet(int data, int val) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return; + + while(pCur != NULL) { + + if(pCur->data == data) { + pCur->count = val; + } + + pCur = pCur->next; + } +} + +int list::dataGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->count; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::dataFixGet(int data) +{ + LinkNode *pCur; + + pCur = head; + if(pCur == NULL) + return 0; + + while(pCur != NULL) { + + if(pCur->data == data) { + return pCur->data_fix; + } + + pCur = pCur->next; + } + + return 0; +} + +int list::NodeNum(void) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while(pCur != NULL) { + + ++count; + pCur = pCur->next; + } + + return count; +} + +int list::nodeGet(int index) +{ + int count = 0; + LinkNode *pCur = head; + + if (pCur == NULL) { + return 0; + } + + while((pCur != NULL) && (count <= index)) { + + if (count == index) { + return pCur->data; + } + + ++count; + pCur = pCur->next; + } + + return 0; +} + void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag) { char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; @@ -340,7 +504,10 @@ memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; - + + if (flag == PROC_REG) { + gLinkedList.Insert(key, atoi(Data_stru.int_info)); + } } ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); @@ -685,14 +852,44 @@ } } +int BusServerSocket::get_data(int val) { + + ProcZone::iterator proc_iter; + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); + + if ((proc_iter = proc->find(val)) != proc->end()) { + return true; + } + + return false; + +} + +int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag) { + int ret; + + ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag); + + return ret; +} + +void BusServerSocket::remove_proc(int val) { + BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG); +} + // 杩愯浠g悊 int BusServerSocket::_run_proxy_() { int size; int key; int flag; + char buf_temp[MAX_STR_LEN] = { 0x00 }; char * action, *topic, *topics, *buf, *content; size_t head_len; bus_head_t head; + int val; + ProcDataZone::iterator proc_que_iter; + ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); int rv; char send_buf[512] = { 0x00 }; @@ -762,6 +959,16 @@ } + if (flag == PROC_REG) { + memcpy(buf_temp, content, strlen(content) + 1); + + if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) { + + val = proc_que_iter->second; + _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG); + } + } + _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); } diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h index e60c700..ba6ebe8 100644 --- a/src/socket/bus_server_socket.h +++ b/src/socket/bus_server_socket.h @@ -18,6 +18,44 @@ typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap; +typedef struct _LinkNode +{ + int data; + int data_fix; + int count; + + _LinkNode *next; +} LinkNode; + +class list +{ + +private: + + LinkNode *head; + +public: + + list() {head = NULL;}; + + void Insert(int aDate, int bDate); + + void Delete(int Data); + + int dataFixGet(int data); + + int dataGet(int data); + + void dataSet(int data, int val); + + int NodeNum(void); + + int nodeGet(int index); + + LinkNode *getHead() {return head;}; + +}; + class BusServerSocket { private: shm_socket_t *shm_socket; @@ -66,6 +104,7 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int start(); + int get_data(int val); /** * 鍋滄bus @@ -73,8 +112,9 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ int stop(); - - + int check_proc(int val, const void *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag); + void remove_proc(int val); /** * 鑾峰彇soket key diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp index 6b730a9..db8bfe5 100644 --- a/src/socket/bus_server_socket_wrapper.cpp +++ b/src/socket/bus_server_socket_wrapper.cpp @@ -40,4 +40,38 @@ return -1; } -} \ No newline at end of file +} + +int bus_server_socket_wrapper_data_get(void * _socket, int val) { + int ret; + BusServerSocket *sockt = (BusServerSocket *)_socket; + + ret = sockt->get_data(val); + + return ret; + +} + +int bus_server_socket_wrapper_proc_check(void * _socket, int val, char *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag) { + int ret; + BusServerSocket *sockt = (BusServerSocket *)_socket; + + ret = sockt->check_proc(val, buf, len, buf_ret, len_ret, timeout, flag); + + return ret; + +} + +void bus_server_socket_wrapper_proc_release(void * _socket, int val) { + + BusServerSocket *sockt = (BusServerSocket *)_socket; + + sockt->remove_proc(val); + +} + + + + + diff --git a/src/socket/bus_server_socket_wrapper.h b/src/socket/bus_server_socket_wrapper.h index 06a060e..f91ecb8 100644 --- a/src/socket/bus_server_socket_wrapper.h +++ b/src/socket/bus_server_socket_wrapper.h @@ -40,7 +40,12 @@ */ int bus_server_socket_wrapper_start_bus(void * _socket); +int bus_server_socket_wrapper_data_get(void * _socket, int val); +int bus_server_socket_wrapper_proc_check(void * _socket, int val, char *buf, int len, void **buf_ret, int *len_ret, \ + const struct timespec *timeout, const int flag); + +void bus_server_socket_wrapper_proc_release(void * _socket, int val); #ifdef __cplusplus } diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index dc6d752..6705b96 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -46,7 +46,7 @@ void *tmp_ptr = hashtable_get(hashtable, key); if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { - queue = new LockFreeQueue<shm_packet_t>(32); + queue = new LockFreeQueue<shm_packet_t>(LOCK_FREE_Q_DEFAULT_SIZE); hashtable_put(hashtable, key, (void *)queue); return queue; } else if(force) { @@ -76,7 +76,6 @@ int s, type; pthread_mutexattr_t mtxAttr; - logger->debug("shm_socket_open\n"); // shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); shm_socket_t *sockt = new shm_socket_t; sockt->socket_type = socket_type; @@ -231,7 +230,7 @@ if (rv != 0) { if(rv == ETIMEDOUT){ - logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT)); + logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT)); return EBUS_TIMEOUT; } @@ -275,7 +274,7 @@ if (rv != 0) { - logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); + logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); return rv; } @@ -368,7 +367,6 @@ recvbufIter = sockt->recvbuf.find(uuid); if(recvbufIter != sockt->recvbuf.end()) { // 鍦ㄧ紦瀛橀噷鏌ュ埌浜哢UID鍖归厤鎴愬姛鐨� -logger->debug("get from recvbuf: %s", uuid.c_str()); recvpak = recvbufIter->second; sockt->recvbuf.erase(recvbufIter); goto LABLE_SUC; @@ -382,11 +380,10 @@ return EBUS_TIMEOUT; } - logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); + logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); return rv; } -logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); if(strlen(recvpak.uuid) == 0) { continue; } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) { @@ -474,7 +471,7 @@ rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); if (rv != 0) { - logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); + logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); return rv; } -- Gitblit v1.8.0