From 2561a007b8d8999a4750046d0cfb3b1ad5af50ac Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 09 四月 2024 15:29:32 +0800 Subject: [PATCH] test for perf --- src/bh_api.cpp | 202 ++++++++++++++++++++++++++++++++++---------------- 1 files changed, 138 insertions(+), 64 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index b774d4e..c450bc2 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -3,10 +3,12 @@ #include "bus_server_socket_wrapper.h" #include "shm_mm_wrapper.h" #include "proc_def.h" +#include "mm.h" #include "usg_common.h" #include "bh_api.h" #include <pthread.h> #include <getopt.h> +#include "msg_mgr.h" #include "../proto/source/error_msg.pb.h" #include "../proto/source/bhome_msg.pb.h" #include "../proto/source/bhome_msg_api.pb.h" @@ -16,12 +18,13 @@ static Logger *logger = LoggerFactory::getLogger(); static int gRun_stat = 0; +static int gRun_flag = true; static void *gNetmod_socket = NULL; -static std::map<std::string, int> gRecvbuf; static pthread_mutex_t mutex; static pthread_t gTids; + static void *client_run_check(void *skptr) { pthread_detach(pthread_self()); @@ -36,19 +39,24 @@ sec = TIME_WAIT; nsec = 0; - sprintf(buf, "%s", "Success"); + sprintf(buf, "%s", STR_EXEC); data = net_mod_socket_int_get(gNetmod_socket); - while(true) { + while(gRun_flag == true) { rv = net_mod_socket_recvfrom(gNetmod_socket, &buf_temp, &size, &key, SVR_STR, data); if (rv == 0) { - BHFree(buf_temp, size); + if (strncmp((char *)buf_temp, STR_RET, strlen(STR_RET)) != 0) { - rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data); - if (rv != 0) { - logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data); + if (rv != 0) { + logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + } + } else { + gRun_flag = false; } + + BHFree(buf_temp, size); } else { @@ -102,6 +110,10 @@ memset(&pData, 0x00, sizeof(ProcInfo)); if (gRun_stat == 0) { pthread_mutex_init(&mutex, NULL); + +#if defined(MSG_HANDLER) + msg_init(); +#endif } else { logger->error("the process has already registered!\n"); @@ -119,25 +131,25 @@ shm_mm_wrapper_init(SHM_RES_SIZE); #if defined(PRO_DE_SERIALIZE) - if (_input.proc_id != NULL) { + if (strlen(_input.proc_id) > 0) { count = strlen(_input.proc_id) + 1; min = count > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : count; strncpy(pData.proc_id, _input.proc_id, min); } - if (_input.name != NULL) { + if (strlen(_input.name) > 0) { count = strlen(_input.name) + 1; min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN -1) : count; strncpy(pData.name, _input.name, min); } - if (_input.public_info != NULL) { + if (strlen(_input.public_info) > 0) { count = strlen(_input.public_info) + 1; min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1) : count; strncpy(pData.public_info, _input.public_info, min); } - if (_input.private_info != NULL) { + if (strlen(_input.private_info) > 0) { count = strlen(_input.private_info) + 1; min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1): count; strncpy(pData.private_info, _input.private_info, min); @@ -168,11 +180,12 @@ } #endif - if (pData.proc_id == NULL) { + if (strlen(pData.proc_id) == 0) { rv = EBUS_INVALID_PARA; bus_errorset(rv); + gRun_stat = 0; pthread_mutex_unlock(&mutex); goto exit_entry; @@ -181,13 +194,13 @@ gNetmod_socket = net_mod_socket_open(); hashtable_t *hashtable = mm_get_hashtable(); key = hashtable_alloc_key(hashtable); + net_mod_socket_bind(gNetmod_socket, key); count = hashtable_alloc_key(hashtable); rv = hashtable_alloc_key(hashtable); net_mod_socket_int_set(gNetmod_socket, count); net_mod_socket_svr_set(gNetmod_socket, rv); sprintf(pData.int_info, "%d", count); sprintf(pData.svr_info, "%d", rv); - net_mod_socket_bind(gNetmod_socket, key); rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); @@ -224,6 +237,7 @@ #endif if (rv == 0) { + gRun_flag = true; pthread_create(&gTids, NULL, client_run_check, NULL); return true; @@ -236,8 +250,11 @@ { int rv; int min; + int data; + int diff; void *buf = NULL; char *errString = NULL; + struct timeval start, end; #if defined(PRO_DE_SERIALIZE) struct _ProcInfo_proto @@ -285,7 +302,23 @@ if (rv == 0) { rv = net_mod_socket_reg(gNetmod_socket, NULL, 0, NULL, 0, timeout_ms, PROC_UNREG); if (rv == 0) { - + gettimeofday(&start, NULL); + data = net_mod_socket_int_get(gNetmod_socket); + rv = net_mod_socket_sendto_timeout(gNetmod_socket, STR_RET, strlen(STR_RET), data, 3, 0); + if (rv != 0) { + logger->error("the process check response failed with error: %s!\n", bus_strerror(rv)); + } + + while(gRun_flag == true) { + sleep(1); + + gettimeofday(&end, NULL); + + diff = end.tv_sec - start.tv_sec; + if (diff >= TIME_DUR) + break; + }; + net_mod_socket_close(gNetmod_socket); gNetmod_socket = NULL; @@ -535,21 +568,28 @@ if (rv == 0) { - ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); - mtr_list_num = ptr->num; + min = *(int *)buf; + if (min > 0) { + ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); + mtr_list_num = ptr->num; + + if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { + mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); + } - if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { - mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); + Proc_ptr = &(ptr->procData); + for(int i = 0; i < mtr_list_num; i++) { + mtr_list[i].proc_id = (Proc_ptr + i)->proc_id; + mtr_list[i].mq_id = ID_RSV; + mtr_list[i].abs_addr = ABS_ID_RSV; + mtr_list[i].ip = "127.0.0.1"; + mtr_list[i].port = 5000; + } + } else { + mtr_list_num = 0; } - - Proc_ptr = &(ptr->procData); - for(int i = 0; i < mtr_list_num; i++) { - mtr_list[i].proc_id = (Proc_ptr + i)->proc_id; - mtr_list[i].mq_id = ID_RSV; - mtr_list[i].abs_addr = ABS_ID_RSV; - mtr_list[i].ip = "127.0.0.1"; - mtr_list[i].port = 5000; - } + + free(buf); } exit_entry: @@ -680,27 +720,31 @@ if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) { mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]); } - - Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); - for(int i = 0; i < mpr_list_num; i++) { - mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id; - mpr_list[i].name = (Proc_ptr + i)->procData.name; - mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info; - mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info; - mpr_list[i].online = (Proc_ptr + i)->stat; - mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num; - - for(int j = 0; j < mpr_list[i].topic_list_num; j++) - { - if (j == 0) { - mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info; - } else if (j == 1) { - mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info; - } else if (j == 2) { - mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info; + + if (mpr_list_num > 0) { + Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); + for(int i = 0; i < mpr_list_num; i++) { + mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id; + mpr_list[i].name = (Proc_ptr + i)->procData.name; + mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info; + mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info; + mpr_list[i].online = (Proc_ptr + i)->stat; + mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num; + + for(int j = 0; j < mpr_list[i].topic_list_num; j++) + { + if (j == 0) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info; + } else if (j == 1) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info; + } else if (j == 2) { + mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info; + } } } } + + free(buf); } errString = bus_strerror(0, 1); @@ -862,8 +906,8 @@ ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); mcr.mutable_errmsg()->set_errstring(errString); - *reply_len=mcr.ByteSizeLong(); - *reply=malloc(*reply_len); + *reply_len = mcr.ByteSizeLong(); + *reply = malloc(*reply_len); mcr.SerializePartialToArray(*reply,*reply_len); #else len = strlen(errString) + 1; @@ -1197,11 +1241,11 @@ std::string str; std::string MsgID; int timeout_ms = 3000; - std::map<std::string, int>::iterator recvIter; char data_buf[MAX_STR_LEN] = { 0x00 }; char buf_temp[MAX_STR_LEN] = { 0x00 }; char *topics_buf = NULL; - + hashtable_t *hashtable = mm_get_hashtable(); + #if defined(PRO_DE_SERIALIZE) struct _BHAddress { @@ -1258,15 +1302,19 @@ #else strncpy(buf_temp, (char *)request, (sizeof(buf_temp) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(buf_temp) - 1)); #endif - + str = buf_temp; - recvIter = gRecvbuf.find(str); - if(recvIter != gRecvbuf.end()) { - + val = net_mod_socket_buf_data_get(gNetmod_socket, str); + if ((val > 0) && (hashtable_get(hashtable, val) != NULL)) { + rv = 0; - val = recvIter->second; } else { + + if ((val > 0) && (hashtable_get(hashtable, val) == NULL)) { + net_mod_socket_buf_data_del(gNetmod_socket, str); + } + rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); if (rv == 0) { @@ -1275,7 +1323,7 @@ val = atoi((char *)data_buf); if (val > 0) { str = buf_temp; - gRecvbuf.insert({str, val}); + net_mod_socket_buf_data_set(gNetmod_socket, str, val); } free(buf); @@ -1376,10 +1424,10 @@ net_mod_err_t *errarr; int errarr_size = 0; char *errString = NULL; - std::map<std::string, int>::iterator recvIter; char buf_temp[MAX_STR_LEN] = { 0x00 }; char *topics_buf = NULL; - + hashtable_t *hashtable = mm_get_hashtable(); + struct _RequestReply { std::string proc_id; @@ -1445,13 +1493,14 @@ #endif str = buf_temp; - recvIter = gRecvbuf.find(str); - if(recvIter != gRecvbuf.end()) { - + val = net_mod_socket_buf_data_get(gNetmod_socket, str); + if ((val > 0) && (hashtable_get(hashtable, val) != NULL)) { rv = 0; - val = recvIter->second; - } else { + if ((val > 0) && (hashtable_get(hashtable, val) == NULL)) { + net_mod_socket_buf_data_del(gNetmod_socket, str); + } + rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); if (rv == 0) { @@ -1460,7 +1509,7 @@ val = atoi((char *)data_buf); if (val > 0) { str = buf_temp; - gRecvbuf.insert({str, val}); + net_mod_socket_buf_data_set(gNetmod_socket, str, val); } free(buf); @@ -1582,6 +1631,13 @@ exit_entry: errString = bus_strerror(0, 1); + + if (rv != 0) { + if ((proc_id != NULL) && (proc_id_len != NULL)) { + *proc_id_len = 0; + *proc_id = NULL; + } + } #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgRequestTopicReply mrt; @@ -1707,6 +1763,12 @@ if (rv != 0) { rrr.topic = STR_RSV; rrr.data = STR_RSV; + + if ((proc_id != NULL) && (proc_id_len != NULL)) { + + *proc_id_len = 0; + *proc_id = NULL; + } } #if defined(PRO_DE_SERIALIZE) @@ -1816,7 +1878,19 @@ int inter_key_get(void) { - return net_mod_socket_get_key(gNetmod_socket); + if (gNetmod_socket != NULL) + return net_mod_socket_get_key(gNetmod_socket); + + return SHM_BUS_KEY; } +void *socket_data_get(void) +{ + return gNetmod_socket; +} + +void inter_key_set(int key) +{ + net_mod_socket_bind(gNetmod_socket, key); +} -- Gitblit v1.8.0