From 5add39f46c8323875fb56bc764a8ff627ad82f18 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期五, 08 十月 2021 11:12:06 +0800
Subject: [PATCH] Adjust the free action independent of the return value from the function return.
---
src/socket/bus_server_socket.cpp | 7
src/bh_api.h | 1
src/bh_api.cpp | 570 +++++++++++++++++++++++++++++----------------------
src/proc_def.h | 2
src/socket/shm_socket.cpp | 31 +-
5 files changed, 347 insertions(+), 264 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index f4215d3..b774d4e 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -17,13 +17,13 @@
static int gRun_stat = 0;
static void *gNetmod_socket = NULL;
+static std::map<std::string, int> gRecvbuf;
static pthread_mutex_t mutex;
static pthread_t gTids;
-static void *client_run_check(void *skptr) {
-
+static void *client_run_check(void *skptr) {
pthread_detach(pthread_self());
int data;
@@ -82,7 +82,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input.proc_id = input.proc_id().c_str();
@@ -95,7 +95,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -109,7 +109,7 @@
rv = EBUS_RES_BUSY;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
rv = pthread_mutex_trylock(&mutex);
@@ -175,7 +175,7 @@
pthread_mutex_unlock(&mutex);
- return false;
+ goto exit_entry;
}
gNetmod_socket = net_mod_socket_open();
@@ -200,11 +200,11 @@
rv = EBUS_RES_BUSY;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
+exit_entry:
errString = bus_strerror(0, 1);
-
#if defined(PRO_DE_SERIALIZE)
::bhome_msg::MsgCommonReply mcr;
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -223,10 +223,13 @@
#endif
- pthread_create(&gTids, NULL, client_run_check, NULL);
+ if (rv == 0) {
+ pthread_create(&gTids, NULL, client_run_check, NULL);
- return true;
+ return true;
+ }
+ return false;
}
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
@@ -252,7 +255,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input.proc_id = input.proc_id().c_str();
@@ -264,7 +267,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -275,7 +278,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
rv = pthread_mutex_trylock(&mutex);
@@ -299,9 +302,10 @@
rv = EBUS_RES_BUSY;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
+exit_entry:
errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
::bhome_msg::MsgCommonReply mcr;
@@ -319,8 +323,12 @@
*reply = buf;
*reply_len = min;
#endif
-
- return true;
+
+ if (rv == 0) {
+ return true;
+ }
+
+ return false;
}
@@ -347,7 +355,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input.amount = input.topic_list_size();
@@ -366,7 +374,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
total = topics_len;
@@ -378,7 +386,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
topics_buf = (char *)malloc(total);
@@ -389,7 +397,7 @@
logger->error("in BHRegisterTopics: Out of memory!\n");
- return false;
+ goto exit_entry;
}
memset(topics_buf, 0x00, total);
@@ -418,6 +426,8 @@
free(topics_buf);
bus_errorset(rv);
+
+exit_entry:
errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
@@ -437,8 +447,11 @@
*reply_len = len;
#endif
- return true;
+ if (rv == 0) {
+ return true;
+ }
+ return false;
}
int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms)
@@ -462,6 +475,17 @@
}_input0;
const char *_input1;
+
+ struct _MsgQueryTopicReply
+ {
+ std::string proc_id;
+
+ unsigned long long mq_id;
+ long long abs_addr;
+ std::string ip;
+ int port;
+ } mtr_list[128];
+ int mtr_list_num = 0;
::bhome_msg::BHAddress input0;
::bhome_msg::MsgQueryTopic input1;
@@ -469,7 +493,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input0.mq_id = input0.mq_id();
@@ -483,7 +507,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -493,7 +517,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#if defined(PRO_DE_SERIALIZE)
@@ -508,17 +532,6 @@
bus_errorset(rv);
#if defined(PRO_DE_SERIALIZE)
-
- struct _MsgQueryTopicReply
- {
- std::string proc_id;
-
- unsigned long long mq_id;
- long long abs_addr;
- std::string ip;
- int port;
- }mtr_list[128];
- int mtr_list_num = 0;
if (rv == 0) {
@@ -539,6 +552,7 @@
}
}
+exit_entry:
errString = bus_strerror(0, 1);
::bhome_msg::MsgQueryTopicReply mtr;
@@ -574,8 +588,11 @@
#endif
- return true;
+ if (rv == 0) {
+ return true;
+ }
+ return false;
}
int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms)
@@ -606,7 +623,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input0.mq_id = input0.mq_id();
@@ -619,7 +636,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -629,7 +646,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
if (query != NULL) {
@@ -638,7 +655,8 @@
rv = net_mod_socket_reg(gNetmod_socket, data_buf, strlen(data_buf), &buf, &size, timeout_ms, PROC_QUE_ATCS);
bus_errorset(rv);
-
+
+exit_entry:
#if defined(PRO_DE_SERIALIZE)
struct _MsgQueryProcReply
{
@@ -683,31 +701,32 @@
}
}
}
-
- errString = bus_strerror(0, 1);
-
- ::bhome_msg::MsgQueryProcReply mpr;
- mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
- mpr.mutable_errmsg()->set_errstring(errString);
-
- for(int i = 0; i < mpr_list_num; i++)
- {
- ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list();
- mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id);
- mpri->mutable_proc()->set_name(mpr_list[i].name);
- mpri->mutable_proc()->set_public_info(mpr_list[i].public_info);
- mpri->mutable_proc()->set_private_info(mpr_list[i].private_info);
- mpri->set_online(mpr_list[i].online);
- for(int j = 0; j < mpr_list[i].topic_list_num; j++)
- {
- mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]);
- }
- }
-
- *reply_len = mpr.ByteSizeLong();
- *reply = malloc(*reply_len);
- mpr.SerializePartialToArray(*reply, *reply_len);
}
+
+ errString = bus_strerror(0, 1);
+
+ ::bhome_msg::MsgQueryProcReply mpr;
+ mpr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mpr.mutable_errmsg()->set_errstring(errString);
+
+ for(int i = 0; i < mpr_list_num; i++)
+ {
+ ::bhome_msg::MsgQueryProcReply_Info *mpri = mpr.add_proc_list();
+ mpri->mutable_proc()->set_proc_id(mpr_list[i].proc_id);
+ mpri->mutable_proc()->set_name(mpr_list[i].name);
+ mpri->mutable_proc()->set_public_info(mpr_list[i].public_info);
+ mpri->mutable_proc()->set_private_info(mpr_list[i].private_info);
+ mpri->set_online(mpr_list[i].online);
+ for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+ {
+ mpri->mutable_topics()->add_topic_list(mpr_list[i].topic_list[j]);
+ }
+ }
+
+ *reply_len = mpr.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mpr.SerializePartialToArray(*reply, *reply_len);
+
#else
if (rv == 0) {
*reply = buf;
@@ -723,8 +742,11 @@
}
#endif
- return true;
+ if (rv == 0) {
+ return true;
+ }
+ return false;
}
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
@@ -751,7 +773,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input.amount = input.topic_list_size();
@@ -771,7 +793,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -781,7 +803,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
topics_buf = (char *)malloc(total);
@@ -792,7 +814,7 @@
logger->error("in BHSubscribeTopics: Out of memory!\n");
- return false;
+ goto exit_entry;
}
memset(topics_buf, 0x00, total);
@@ -831,6 +853,7 @@
}
+exit_entry:
errString = bus_strerror(0, 1);
free(topics_buf);
@@ -851,9 +874,12 @@
*reply = buf;
*reply_len = len;
#endif
-
- return true;
+
+ if (rv == 0) {
+ return true;
+ }
+ return false;
}
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
@@ -889,7 +915,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input.proc_id = input.proc_id().c_str();
@@ -899,6 +925,8 @@
rv = 0;
bus_errorset(rv);
+
+exit_entry:
errString = bus_strerror(0, 1);
::bhome_msg::MsgCommonReply mcr;
@@ -909,7 +937,11 @@
mcr.SerializePartialToArray(*reply,*reply_len);
#endif
- return true;
+ if (rv == 0) {
+ return true;
+ }
+
+ return false;
}
#if defined(PRO_DE_SERIALIZE)
@@ -947,8 +979,8 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
-
- return false;
+
+ return false;
}
#endif
@@ -985,9 +1017,7 @@
if (rv > 0)
return true;
-
- bus_errorset(rv);
-
+
return false;
}
@@ -996,7 +1026,7 @@
int rv;
int len;
void *buf;
- int key;
+ int key = 0;
int size;
int sec, nsec;
char buf_temp[100] = { 0x00 };
@@ -1016,14 +1046,14 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
if ((msgpub == NULL) || (msgpub_len == NULL)) {
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
if (timeout_ms > 0) {
@@ -1054,9 +1084,9 @@
rv = EBUS_NO_MEM;
bus_errorset(rv);
- logger->error("in BHRequest: Out of memory!\n");
+ logger->error("in BHReadSub: Out of memory!\n");
- return false;
+ goto exit_entry;
}
memset(topics_buf, 0x00, len + 10);
@@ -1070,11 +1100,11 @@
rv = EBUS_NO_MEM;
bus_errorset(rv);
- logger->error("in BHRequest: Out of memory!\n");
+ logger->error("in BHReadSub: Out of memory!\n");
free(topics_buf);
- return false;
+ goto exit_entry;
}
memset(data_buf, 0x00, size - len + 10);
@@ -1092,56 +1122,61 @@
rsr.data = topics_buf;
}
- sprintf(buf_temp, "%d", key);
-
- if ((proc_id != NULL) && (proc_id_len != NULL)) {
- rsr.proc_id = buf_temp;
- *proc_id_len = rsr.proc_id.size();
- *proc_id = malloc(*proc_id_len);
- memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len);
- }
-
free(topics_buf);
free(data_buf);
-
- ::bhome_msg::MsgPublish Mp;
- Mp.set_topic(rsr.topic);
- Mp.set_data(rsr.data.data());
- *msgpub_len = Mp.ByteSizeLong();
- *msgpub = malloc(*msgpub_len);
- Mp.SerializePartialToArray(*msgpub, *msgpub_len);
-#else
- void *ptr;
- if (len < size) {
- ptr = malloc(size - len);
- len = size - len;
- memcpy(ptr, data_buf, len);
- } else {
- ptr = malloc(len);
- memcpy(ptr, topics_buf, len);
- }
- *msgpub = ptr;
- *msgpub_len = len;
-
- free(topics_buf);
- free(data_buf);
-
- if ((proc_id != NULL) && (proc_id_len != NULL)) {
- memset(buf_temp, 0x00, sizeof(buf_temp));
- sprintf(buf_temp, "%d", key);
-
- *proc_id_len = strlen(buf_temp);
- *proc_id = malloc(*proc_id_len);
- memcpy(*proc_id, buf_temp, *proc_id_len);
- }
-
-#endif
-
- } else {
-
- bus_errorset(rv);
-
}
+
+exit_entry:
+ sprintf(buf_temp, "%d", key);
+
+ if ((proc_id != NULL) && (proc_id_len != NULL)) {
+ rsr.proc_id = buf_temp;
+ *proc_id_len = rsr.proc_id.size();
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, rsr.proc_id.data(), *proc_id_len);
+ }
+
+ if (rv != 0) {
+ rsr.topic = STR_RSV;
+ if (data_buf != NULL) {
+ rsr.data = data_buf;
+ } else {
+ rsr.data = STR_RSV;
+ }
+ }
+
+ ::bhome_msg::MsgPublish Mp;
+ Mp.set_topic(rsr.topic);
+ Mp.set_data(rsr.data.data());
+ *msgpub_len = Mp.ByteSizeLong();
+ *msgpub = malloc(*msgpub_len);
+ Mp.SerializePartialToArray(*msgpub, *msgpub_len);
+#else
+ void *ptr;
+ if (len < size) {
+ ptr = malloc(size - len);
+ len = size - len;
+ memcpy(ptr, data_buf, len);
+ } else {
+ ptr = malloc(len);
+ memcpy(ptr, topics_buf, len);
+ }
+ *msgpub = ptr;
+ *msgpub_len = len;
+
+ free(topics_buf);
+ free(data_buf);
+
+ if ((proc_id != NULL) && (proc_id_len != NULL)) {
+ memset(buf_temp, 0x00, sizeof(buf_temp));
+ sprintf(buf_temp, "%d", key);
+
+ *proc_id_len = strlen(buf_temp);
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, buf_temp, *proc_id_len);
+ }
+
+#endif
if (rv == 0)
return true;
@@ -1152,15 +1187,17 @@
int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len)
{
int rv;
- void *buf;
+ void *buf = NULL;
int size;
- int val;
+ int val = 0;
int len;
int min;
int data;
int sec, nsec;
+ std::string str;
std::string MsgID;
int timeout_ms = 3000;
+ std::map<std::string, int>::iterator recvIter;
char data_buf[MAX_STR_LEN] = { 0x00 };
char buf_temp[MAX_STR_LEN] = { 0x00 };
char *topics_buf = NULL;
@@ -1187,7 +1224,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
_input0.mq_id = input0.mq_id();
@@ -1203,7 +1240,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -1213,7 +1250,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#if defined(PRO_DE_SERIALIZE)
@@ -1221,16 +1258,31 @@
#else
strncpy(buf_temp, (char *)request, (sizeof(buf_temp) - 1) > strlen((char *)request) ? strlen((char *)request) : (sizeof(buf_temp) - 1));
#endif
-
- rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
+
+ str = buf_temp;
+ recvIter = gRecvbuf.find(str);
+ if(recvIter != gRecvbuf.end()) {
+
+ rv = 0;
+ val = recvIter->second;
+
+ } else {
+ rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
+ if (rv == 0) {
+
+ len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
+ memcpy(data_buf, (char *)buf, len);
+ val = atoi((char *)data_buf);
+ if (val > 0) {
+ str = buf_temp;
+ gRecvbuf.insert({str, val});
+ }
+
+ free(buf);
+ }
+ }
+
if (rv == 0) {
-
- len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
- memcpy(data_buf, (char *)buf, len);
- val = atoi((char *)data_buf);
-
- free(buf);
-
if (val > 0) {
len = strlen(buf_temp) + 1;
@@ -1244,9 +1296,9 @@
rv = EBUS_NO_MEM;
bus_errorset(rv);
- logger->error("in BHRequest: Out of memory!\n");
+ logger->error("in BHAsyncRequest: Out of memory!\n");
- return false;
+ goto exit_entry;
}
memset(topics_buf, 0x00, len);
@@ -1282,6 +1334,8 @@
}
bus_errorset(rv);
+
+exit_entry:
if((msg_id == NULL) || (msg_id_len == NULL)) {
if (rv == 0)
return true;
@@ -1289,18 +1343,16 @@
return false;
}
- if (rv == 0) {
-
- memset(buf_temp, 0x00, sizeof(buf_temp));
- sprintf(buf_temp, "%d", val);
- MsgID = buf_temp;
+ memset(buf_temp, 0x00, sizeof(buf_temp));
+ sprintf(buf_temp, "%d", val);
+ MsgID = buf_temp;
- *msg_id_len = MsgID.size();
- *msg_id = malloc(*msg_id_len);
- memcpy(*msg_id, MsgID.data(), *msg_id_len);
+ *msg_id_len = MsgID.size();
+ *msg_id = malloc(*msg_id_len);
+ memcpy(*msg_id, MsgID.data(), *msg_id_len);
+ if (rv == 0)
return true;
- }
return false;
@@ -1310,11 +1362,12 @@
void **reply, int *reply_len, const int timeout_ms)
{
int rv;
- void *buf;
+ void *buf = NULL;
int size;
int val;
int min, len;
int data;
+ std::string str;
net_node_t node;
int node_size;
int recv_arr_size;
@@ -1323,6 +1376,7 @@
net_mod_err_t *errarr;
int errarr_size = 0;
char *errString = NULL;
+ std::map<std::string, int>::iterator recvIter;
char buf_temp[MAX_STR_LEN] = { 0x00 };
char *topics_buf = NULL;
@@ -1354,9 +1408,10 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
+ memset(&node, 0x00, sizeof(node));
_input0.mq_id = input0.mq_id();
_input0.abs_addr = input0.abs_addr();
_input0.ip = input0.ip().c_str();
@@ -1370,7 +1425,7 @@
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#endif
@@ -1380,7 +1435,7 @@
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
#if defined(PRO_DE_SERIALIZE)
@@ -1388,19 +1443,35 @@
#else
strncpy(buf_temp, (char *)request, (sizeof(buf_temp) - 1) > request_len ? request_len : (sizeof(buf_temp) - 1));
#endif
-
- rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
+
+ str = buf_temp;
+ recvIter = gRecvbuf.find(str);
+ if(recvIter != gRecvbuf.end()) {
+
+ rv = 0;
+ val = recvIter->second;
+
+ } else {
+ rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
+ if (rv == 0) {
+
+ len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
+ memcpy(data_buf, (char *)buf, len);
+ val = atoi((char *)data_buf);
+ if (val > 0) {
+ str = buf_temp;
+ gRecvbuf.insert({str, val});
+ }
+
+ free(buf);
+ }
+ }
+
if (rv == 0) {
- len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size;
- memcpy(data_buf, (char *)buf, len);
- val = atoi((char *)data_buf);
-
- free(buf);
-
if (val > 0) {
memset(&node, 0x00, sizeof(node));
-
+
len = strlen(buf_temp) + 1;
#if defined(PRO_DE_SERIALIZE)
len += strlen(_input1.data);
@@ -1408,23 +1479,22 @@
topics_buf = (char *)malloc(len);
if (topics_buf == NULL) {
-
+
rv = EBUS_NO_MEM;
bus_errorset(rv);
-
+
logger->error("in BHRequest: Out of memory!\n");
-
- return false;
+
+ goto exit_entry;
}
memset(topics_buf, 0x00, len);
-
+
strncpy(topics_buf, buf_temp, strlen(buf_temp) + 1);
#if defined(PRO_DE_SERIALIZE)
strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data));
-#endif
+#endif
node.key = val;
-
if (timeout_ms > 0) {
rv = net_mod_socket_sendandrecv_timeout(gNetmod_socket, &node, 1, topics_buf, len, &recv_arr, &recv_arr_size, &errarr, &errarr_size, timeout_ms);
@@ -1445,14 +1515,15 @@
size = recv_arr[0].content_length;
buf = (char *)malloc(size);
if (buf == NULL) {
- printf("Out of memory\n");
+ logger->error("in BHRequest: Out of memory!\n");
- exit(0);
+ free(topics_buf);
+
+ goto exit_entry;
}
memset((char *)buf, 0x00, size);
strncpy((char *)buf, (char *)recv_arr[0].content, size);
-
}
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
@@ -1462,30 +1533,31 @@
}
rv = 0;
-
} else {
+
rv = EBUS_TIMEOUT;
}
} else {
rv = EBUS_RES_UNSUPPORT;
}
-
+
free(topics_buf);
}
-
+
bus_errorset(rv);
+
+ if ((proc_id != NULL) && (proc_id_len != NULL)) {
+ memset(buf_temp, 0x00, sizeof(buf_temp));
+ sprintf(buf_temp, "%d", node.key);
+
+ rr.proc_id = buf_temp;
+ *proc_id_len = rr.proc_id.size();
+ *proc_id = malloc(*proc_id_len);
+ memcpy(*proc_id, rr.proc_id.c_str(), *proc_id_len);
+ }
+
if (rv == 0) {
- if ((proc_id != NULL) && (proc_id_len != NULL)) {
- memset(buf_temp, 0x00, sizeof(buf_temp));
- sprintf(buf_temp, "%d", node.key);
-
- rr.proc_id = buf_temp;
- *proc_id_len = rr.proc_id.size();
- *proc_id = malloc(*proc_id_len);
- memcpy(*proc_id, rr.proc_id.c_str(), *proc_id_len);
- }
-
topics_buf = (char *)malloc(size + 10);
if (topics_buf == NULL) {
@@ -1494,7 +1566,7 @@
logger->error("in BHRequest: Out of memory!\n");
- return false;
+ goto exit_entry;
}
memset(topics_buf, 0x00, size + 10);
@@ -1503,33 +1575,33 @@
free(buf);
free(topics_buf);
+ } else {
+
+ rr.data = STR_RSV;
}
+exit_entry:
errString = bus_strerror(0, 1);
#if defined(PRO_DE_SERIALIZE)
- if (rv == 0) {
- ::bhome_msg::MsgRequestTopicReply mrt;
- mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
- mrt.mutable_errmsg()->set_errstring(errString);
- mrt.set_data(rr.data.data());
- *reply_len = mrt.ByteSizeLong();
- *reply = malloc(*reply_len);
- mrt.SerializePartialToArray(*reply, *reply_len);
- }
+ ::bhome_msg::MsgRequestTopicReply mrt;
+ mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
+ mrt.mutable_errmsg()->set_errstring(errString);
+ mrt.set_data(rr.data.data());
+ *reply_len = mrt.ByteSizeLong();
+ *reply = malloc(*reply_len);
+ mrt.SerializePartialToArray(*reply, *reply_len);
#else
- if (rv == 0) {
- min = strlen(errString) + 1;
- buf = malloc(min);
- memcpy(buf, errString, strlen(errString));
- *((char *)buf + min - 1) = '\0';
+ min = strlen(errString) + 1;
+ buf = malloc(min);
+ memcpy(buf, errString, strlen(errString));
+ *((char *)buf + min - 1) = '\0';
- *reply = buf;
- *reply_len = min;
- }
+ *reply = buf;
+ *reply_len = min;
#endif
- if (rv == 0)
+ if (rv == 0)
return true;
return false;
@@ -1546,21 +1618,29 @@
int sec, nsec;
char buf_temp[MAX_STR_LEN] = { 0x00 };
char *topics_buf = NULL;
-
+
+ struct _ReadRequestReply
+ {
+ std::string proc_id;
+ std::string topic;
+ std::string data;
+ void *src;
+ } rrr;
+
if (gRun_stat == 0) {
logger->error("the process has not been registered yet!\n");
rv = EBUS_RES_NO;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
if ((request == NULL) || (request_len == 0) || (src == NULL)) {
rv = EBUS_INVALID_PARA;
bus_errorset(rv);
- return false;
+ goto exit_entry;
}
data = net_mod_socket_svr_get(gNetmod_socket);
@@ -1581,14 +1661,7 @@
}
if (rv == 0) {
- struct _ReadRequestReply
- {
- std::string proc_id;
- std::string topic;
- std::string data;
- void *src;
- } rrr;
-
+
if ((proc_id != NULL) && (proc_id_len != NULL)) {
sprintf(buf_temp, "%d", key);
rrr.proc_id = buf_temp;
@@ -1606,7 +1679,7 @@
logger->error("in BHReadRequest: Out of memory!\n");
- return false;
+ goto exit_entry;
}
memset(topics_buf, 0x00, size + MIN_STR_LEN);
@@ -1624,28 +1697,34 @@
rrr.data = topics_buf + len + 1;
free(topics_buf);
-
-#if defined(PRO_DE_SERIALIZE)
- ::bhome_msg::MsgRequestTopic mrt;
- mrt.set_topic(rrr.topic);
- mrt.set_data(rrr.data.data());
- *request_len = mrt.ByteSizeLong();
- *request = malloc(*request_len);
- mrt.SerializePartialToArray(*request,*request_len);
-#else
- *request = buf;
- *request_len = size;
-#endif
-
free(buf);
- buf = malloc(sizeof(int));
- *(int *)buf = key;
- *src = buf;
+ bus_errorset(rv);
}
- bus_errorset(rv);
-
+exit_entry:
+
+ if (rv != 0) {
+ rrr.topic = STR_RSV;
+ rrr.data = STR_RSV;
+ }
+
+#if defined(PRO_DE_SERIALIZE)
+ ::bhome_msg::MsgRequestTopic mrt;
+ mrt.set_topic(rrr.topic);
+ mrt.set_data(rrr.data.data());
+ *request_len = mrt.ByteSizeLong();
+ *request = malloc(*request_len);
+ mrt.SerializePartialToArray(*request,*request_len);
+#else
+ *request = buf;
+ *request_len = size;
+#endif
+
+ buf = malloc(sizeof(int));
+ *(int *)buf = key;
+ *src = buf;
+
if (rv == 0)
return true;
@@ -1734,3 +1813,10 @@
return false;
}
+
+int inter_key_get(void)
+{
+ return net_mod_socket_get_key(gNetmod_socket);
+}
+
+
diff --git a/src/bh_api.h b/src/bh_api.h
index 40d9ffa..a231b0c 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -109,6 +109,7 @@
int BHGetLastError(void **msg, int *msg_len);
+int inter_key_get(void);
#ifdef __cplusplus
}
#endif
diff --git a/src/proc_def.h b/src/proc_def.h
index 1251617..0fa2fe9 100644
--- a/src/proc_def.h
+++ b/src/proc_def.h
@@ -63,6 +63,8 @@
} ProcInfo_query;
+#define STR_RSV "empty"
+
#ifdef __cplusplus
}
#endif
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 2d552da..315c356 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -462,6 +462,7 @@
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
+ char data_buf[MAX_STR_LEN] = { 0x00 };
char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
int count = 0;
int i = 0;
@@ -704,9 +705,9 @@
count = 0;
}
- memset(buf_temp, 0x00, sizeof(buf_temp));
- sprintf(buf_temp, "%d", count);
- shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
+ memset(data_buf, 0x00, sizeof(data_buf));
+ sprintf(data_buf, "%d", count);
+ shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
} else {
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 1912772..709505f 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -68,7 +68,6 @@
}
queue = ( LockFreeQueue<shm_packet_t> *)tmp_ptr;
- // hashtable_unlock(hashtable);
return queue;
}
@@ -195,7 +194,6 @@
int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,
const int send_size, const int key, void **recv_buf,
int *recv_size, const struct timespec *timeout, int flags) {
- // return _shm_sendandrecv_uuid(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags);
}
@@ -411,7 +409,6 @@
shm_packet_t sendpak;
shm_packet_t recvpak;
std::map<int, shm_packet_t>::iterator recvbufIter;
- // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket = NULL;
rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
@@ -423,15 +420,13 @@
tmp_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_);
if (tmp_socket == NULL)
{
- /* If first call from this thread, allocate buffer for thread, and save its location */
tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
- }
-
- rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
- if ( rv != 0) {
- logger->error(rv, "shm_sendandrecv : pthread_setspecific");
- exit(1);
+ rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
+ if ( rv != 0) {
+ logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+ exit(1);
+ }
}
sendpak.key = tmp_socket->key;
@@ -443,6 +438,10 @@
rv = shm_sendpakto(tmp_socket, &sendpak, key, timeout, flags);
if(rv != 0) {
+ if(send_buf != NULL) {
+ mm_free(sendpak.buf);
+ }
+
return rv;
}
@@ -463,10 +462,11 @@
}
if (key == recvpak.key) {
- // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛
+
goto LABLE_SUC;
+
} else {
- // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
+
tmp_socket->recvbuf2.insert({recvpak.key, recvpak});
continue;
}
@@ -622,13 +622,6 @@
sendpak->key = sockt->key;
}
rv = remoteQueue->push(*sendpak, timeout, flag);
-
- if(rv != 0) {
- mm_free(sendpak->buf);
- }
- if(rv == ETIMEDOUT) {
- return EBUS_TIMEOUT;
- }
return rv;
ERR_CLOSED:
--
Gitblit v1.8.0