From b386f0c5d35994a54e95d5fe1c4bcbfd1cdea59c Mon Sep 17 00:00:00 2001 From: Fu Juntang <StrongTiger_001@163.com> Date: 星期五, 03 九月 2021 17:43:11 +0800 Subject: [PATCH] Supplementary for the fix of the lock removal. --- src/bh_api.cpp | 292 +++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 187 insertions(+), 105 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index f707e6f..e36daee 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -7,6 +7,9 @@ #include "bh_api.h" #include <pthread.h> #include <getopt.h> +#include "bhome_msg_api.pb.h" +#include "bhome_msg.pb.h" +#include "error_msg.pb.h" #include "../proto/source/bhome_msg.pb.h" #include "../proto/source/bhome_msg_api.pb.h" @@ -338,12 +341,14 @@ topics_buf = (char *)malloc(total); if (topics_buf == NULL) { - rv = EBUS_RES_NO; + rv = EBUS_NO_MEM; memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); logger->error("in BHRegisterTopics: Out of memory!\n"); + pthread_mutex_unlock(&mutex); + goto exit_entry; } memset(topics_buf, 0x00, total); @@ -365,13 +370,14 @@ #else memcpy(topics_buf, topics, topics_len); + count = topics_len; #endif - rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, NULL, 0, timeout_ms, PROC_REG_TCS); + rv = net_mod_socket_reg(gNetmod_socket, topics_buf, count, NULL, 0, timeout_ms, PROC_REG_TCS); memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); - + free(topics_buf); pthread_mutex_unlock(&mutex); @@ -414,7 +420,7 @@ char topics_buf[MAX_STR_LEN] = { 0x00 }; ProcInfo_query *ptr = NULL; ProcInfo *Proc_ptr = NULL; - + #if defined(PRO_DE_SERIALIZE) struct _BHAddress { @@ -475,7 +481,7 @@ buf = const_cast<void *>(topic); strncpy(topics_buf, (const char *)buf, min); #endif - rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, &buf, &size, timeout_ms, PROC_QUE_TCS); + rv = net_mod_socket_reg(gNetmod_socket, topics_buf, min, &buf, &size, timeout_ms, PROC_QUE_TCS); memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); @@ -514,9 +520,9 @@ for(int i = 0; i < mtr_list_num; i++) { mtr_list[i].proc_id = ptr->procData.proc_id; - mtr_list[i].mq_id = 0x00; - mtr_list[i].abs_addr = 0x00; - mtr_list[i].ip = "192.168.1.1"; + mtr_list[i].mq_id = ID_RSV; + mtr_list[i].abs_addr = ABS_ID_RSV; + mtr_list[i].ip = "127.0.0.1"; mtr_list[i].port = 5000; } } @@ -695,7 +701,7 @@ *reply_len = mpr.ByteSizeLong(); *reply = malloc(*reply_len); - mpr.SerializePartialToArray(*reply,*reply_len); + mpr.SerializePartialToArray(*reply, *reply_len); } #else if (rv == 0) { @@ -784,12 +790,14 @@ topics_buf = (char *)malloc(total); if (topics_buf == NULL) { - rv = EBUS_RES_NO; + rv = EBUS_NO_MEM; memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); logger->error("in BHSubscribeTopics: Out of memory!\n"); - + + pthread_mutex_unlock(&mutex); + goto exit_entry; } memset(topics_buf, 0x00, total); @@ -797,7 +805,7 @@ #if defined(PRO_DE_SERIALIZE) for (i = 0; i < _input.amount; i++) { len = strlen(_input.topics[i]); - strncpy(topics_buf + count, _input.topics[i], min); + strncpy(topics_buf + count, _input.topics[i], len); count += len; @@ -810,21 +818,22 @@ #else memcpy(topics_buf, topics, topics_len); + count = topics_len; #endif if (timeout_ms > 0) { sec = timeout_ms / 1000; nsec = (timeout_ms - sec * 1000) * 1000 * 1000; - rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, strlen(topics_buf) + 1, sec, nsec); + rv = net_mod_socket_sub_timeout(gNetmod_socket, topics_buf, count, sec, nsec); } else if (timeout_ms == 0) { - rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, strlen(topics_buf) + 1); + rv = net_mod_socket_sub_nowait(gNetmod_socket, topics_buf, count); } else { - rv = net_mod_socket_sub(gNetmod_socket, topics_buf, strlen(topics_buf) + 1); + rv = net_mod_socket_sub(gNetmod_socket, topics_buf, count); } @@ -924,7 +933,7 @@ } #if defined(PRO_DE_SERIALIZE) -int BHPublish(const char *msgpub, const int msgpub_len, const int timeout_ms) +int BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms) #else int BHPublish(const char *topic, const char *content, const int timeout_ms) #endif @@ -1116,8 +1125,6 @@ #endif - pthread_mutex_unlock(&mutex); - } else { memset(errString, 0x00, sizeof(errString)); @@ -1141,7 +1148,8 @@ int sec, nsec; std::string MsgID; int timeout_ms = 3000; - char topics_buf[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 }; + char buf_temp[MAX_STR_LEN] = { 0x00 }; + char *topics_buf = NULL; #if defined(PRO_DE_SERIALIZE) struct _BHAddress @@ -1200,12 +1208,12 @@ rv = pthread_mutex_trylock(&mutex); if (rv == 0) { #if defined(PRO_DE_SERIALIZE) - strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1)); + strncpy(buf_temp, _input1.topic, (sizeof(buf_temp) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(buf_temp) - 1)); #else - strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(topics_buf) - 1)); + strncpy(buf_temp, (char *)request, (sizeof(buf_temp) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(buf_temp) - 1)); #endif - rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS); + rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); if (rv == 0) { val = atoi((char *)buf); @@ -1214,12 +1222,31 @@ if (val > 0) { - len = strlen(topics_buf); + len = strlen(buf_temp) + 1; #if defined(PRO_DE_SERIALIZE) - min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len ); - strncpy(topics_buf + len + 1, _input1.data, min); - len += (min + 1); + len += strlen(_input1.data); #endif + + topics_buf = (char *)malloc(len); + if (topics_buf == NULL) { + + rv = EBUS_NO_MEM; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + logger->error("in BHRequest: Out of memory!\n"); + + pthread_mutex_unlock(&mutex); + + return false; + } + memset(topics_buf, 0x00, len); + + strncpy(topics_buf, buf_temp, strlen(buf_temp) + 1); +#if defined(PRO_DE_SERIALIZE) + strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data)); +#endif + if (timeout_ms > 0) { sec = timeout_ms / 1000; @@ -1234,7 +1261,10 @@ } else { rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); - } + } + + free(topics_buf); + } else { rv = EBUS_RES_UNSUPPORT; @@ -1290,8 +1320,8 @@ net_mod_recv_msg_t *recv_arr; net_mod_err_t *errarr; int errarr_size = 0; - int sec, nsec; - char topics_buf[MAX_STR_LEN] = { 0x00 }; + char buf_temp[MAX_STR_LEN] = { 0x00 }; + char *topics_buf = NULL; struct _RequestReply { @@ -1356,37 +1386,68 @@ rv = pthread_mutex_trylock(&mutex); if (rv == 0) { #if defined(PRO_DE_SERIALIZE) - strncpy(topics_buf, _input1.topic, (sizeof(topics_buf) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(topics_buf) - 1)); + strncpy(buf_temp, _input1.topic, (sizeof(buf_temp) - 1) > strlen(_input1.topic) ? strlen(_input1.topic) : (sizeof(buf_temp) - 1)); #else - strncpy(topics_buf, (char *)request, (sizeof(topics_buf) - 1) > request_len ? request_len : (sizeof(topics_buf) - 1)); + strncpy(buf_temp, (char *)request, (sizeof(buf_temp) - 1) > request_len ? request_len : (sizeof(buf_temp) - 1)); #endif - rv = net_mod_socket_reg(gNetmod_socket, topics_buf, strlen(topics_buf), &buf, &size, timeout_ms, PROC_QUE_STCS); + rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); if (rv == 0) { + val = atoi((char *)buf); free(buf); if (val > 0) { memset(&node, 0x00, sizeof(node)); - - len = strlen(topics_buf); + + len = strlen(buf_temp) + 1; #if defined(PRO_DE_SERIALIZE) - min = (sizeof(topics_buf) - 1 - len ) > strlen(_input1.data) ? strlen(_input1.data) : (sizeof(topics_buf) - 1 - len ); - strncpy(topics_buf + len + 1, _input1.data, min); - len += (min + 1); + len += strlen(_input1.data); #endif + topics_buf = (char *)malloc(len); + if (topics_buf == NULL) { + + rv = EBUS_NO_MEM; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + logger->error("in BHRequest: Out of memory!\n"); + + pthread_mutex_unlock(&mutex); + + return false; + } + memset(topics_buf, 0x00, len); + + strncpy(topics_buf, buf_temp, strlen(buf_temp) + 1); +#if defined(PRO_DE_SERIALIZE) + strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data)); +#endif + node.key = val; - rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size); + + if (timeout_ms > 0) { + + rv = net_mod_socket_sendandrecv_timeout(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size, timeout_ms); + + } else if (timeout_ms == 0) { + + rv = net_mod_socket_sendandrecv_nowait(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size); + + } else { + + rv = net_mod_socket_sendandrecv(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size); + } if (rv > 0) { if (recv_arr_size > 0) { node.key = recv_arr[0].key; - - memset(topics_buf, 0x00, sizeof(topics_buf)); + size = recv_arr[0].content_length; buf = (char *)malloc(size); + memset(buf, 0x00, size); strncpy((char *)buf, (char *)recv_arr[0].content, size); #if !defined(PRO_DE_SERIALIZE) *reply = buf; @@ -1405,27 +1466,29 @@ } else { rv = EBUS_TIMEOUT; } - + } else { rv = EBUS_RES_UNSUPPORT; } + + free(topics_buf); } memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); if (rv == 0) { - memset(topics_buf, 0x00, sizeof(topics_buf)); - sprintf(topics_buf, "%d", node.key); + memset(buf_temp, 0x00, sizeof(buf_temp)); + sprintf(buf_temp, "%d", node.key); - rr.proc_id = topics_buf; + rr.proc_id = buf_temp; *proc_id_len = rr.proc_id.size(); *proc_id = malloc(*proc_id_len); memcpy(*proc_id, rr.proc_id.data(), *proc_id_len); - memset(topics_buf, 0x00, sizeof(topics_buf)); - memcpy(topics_buf, buf, size); - rr.data = topics_buf; + memset(buf_temp, 0x00, sizeof(buf_temp)); + memcpy(buf_temp, buf, size); + rr.data = buf_temp; } pthread_mutex_unlock(&mutex); @@ -1469,8 +1532,10 @@ void *buf; int key; int size; + int len; int sec, nsec; - char topics_buf[MAX_STR_LEN] = { 0x00 }; + char buf_temp[MAX_STR_LEN] = { 0x00 }; + char *topics_buf = NULL; if (gRun_stat == 0) { logger->error("the process has not been registered yet!\n"); @@ -1482,73 +1547,87 @@ return false; } - rv = pthread_mutex_trylock(&mutex); + if (timeout_ms > 0) { + + sec = timeout_ms / 1000; + nsec = (timeout_ms - sec * 1000) * 1000 * 1000; + + rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); + + } else if (timeout_ms == 0) { + + rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); + + } else { + + rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); + } + if (rv == 0) { - if (timeout_ms > 0) { + struct _ReadRequestReply + { + std::string proc_id; + std::string topic; + std::string data; + void *src; + } rrr; - sec = timeout_ms / 1000; - nsec = (timeout_ms - sec * 1000) * 1000 * 1000; + sprintf(buf_temp, "%d", key); + rrr.proc_id = buf_temp; - rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); + *proc_id_len = rrr.proc_id.size(); + *proc_id = malloc(*proc_id_len); + memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len); - } else if (timeout_ms == 0) { - - rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); + topics_buf = (char *)malloc(size + MIN_STR_LEN); + if (topics_buf == NULL) { + + rv = EBUS_NO_MEM; + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); + + logger->error("in BHReadRequest: Out of memory!\n"); + + return false; + } + memset(topics_buf, 0x00, size + MIN_STR_LEN); - } else { - - rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); + len = strlen((char *)buf); + if (len > size) { + len = size; + } + strncpy(topics_buf, (char *)buf, len); + rrr.topic = topics_buf; + + if (len < size) { + strncpy(topics_buf + len + 1, (char *)buf + len + 1, size - len - 1); } - if (rv == 0) { - struct _ReadRequestReply - { - std::string proc_id; - std::string topic; - std::string data; - void *src; - } rrr; - - sprintf(topics_buf, "%d", key); - rrr.proc_id = topics_buf; - - *proc_id_len = rrr.proc_id.size(); - *proc_id = malloc(*proc_id_len); - memcpy(*proc_id, rrr.proc_id.data(), *proc_id_len); - - memset(topics_buf, 0x00, sizeof(topics_buf)); - memcpy(topics_buf, buf, size > sizeof(topics_buf) ? sizeof(topics_buf) : size); - rrr.topic = topics_buf; - rrr.data = topics_buf; + rrr.data = topics_buf + len + 1; + + free(topics_buf); #if defined(PRO_DE_SERIALIZE) - ::bhome_msg::MsgRequestTopic mrt; - mrt.set_topic(rrr.topic); - mrt.set_data(rrr.data.data()); - *request_len = mrt.ByteSizeLong(); - *request = malloc(*request_len); - mrt.SerializePartialToArray(*request,*request_len); + ::bhome_msg::MsgRequestTopic mrt; + mrt.set_topic(rrr.topic); + mrt.set_data(rrr.data.data()); + *request_len = mrt.ByteSizeLong(); + *request = malloc(*request_len); + mrt.SerializePartialToArray(*request,*request_len); #else - *request = buf; - *request_len = size; + *request = buf; + *request_len = size; #endif - buf = malloc(sizeof(int)); - *(int *)buf = key; - *src = buf; - } + free(buf); - pthread_mutex_unlock(&mutex); - - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); - - } else { - - rv = EBUS_RES_BUSY; - memset(errString, 0x00, sizeof(errString)); - strncpy(errString, bus_strerror(rv), sizeof(errString)); + buf = malloc(sizeof(int)); + *(int *)buf = key; + *src = buf; } + + memset(errString, 0x00, sizeof(errString)); + strncpy(errString, bus_strerror(rv), sizeof(errString)); if (rv == 0) return true; @@ -1559,7 +1638,8 @@ int BHSendReply(void *src, const void *reply, const int reply_len) { int rv; - + const char *_input; + #if defined(PRO_DE_SERIALIZE) ::bhome_msg::MsgRequestTopicReply input; if (!input.ParseFromArray(reply, reply_len)) { @@ -1571,9 +1651,8 @@ return false; } - const char *_input; _input = input.data().data(); - + #else if ((src == NULL) || (reply == NULL) || (reply_len == 0)) { @@ -1583,6 +1662,9 @@ return false; } + + _input = (char *)reply; + #endif if (gRun_stat == 0) { @@ -1598,7 +1680,7 @@ rv = pthread_mutex_trylock(&mutex); if (rv == 0) { - rv = net_mod_socket_sendto(gNetmod_socket, reply, reply_len, *(int *)src); + rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src); memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); -- Gitblit v1.8.0