From a49287079cb5a97ef65818b70529c9d3bbdd99fa Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 14 十二月 2022 17:37:32 +0800
Subject: [PATCH] remove last error for memory taken
---
src/nng_wrap.cpp | 61 ++++++++++++++++++------------
1 files changed, 37 insertions(+), 24 deletions(-)
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 299ce88..f2c22c4 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -56,9 +56,7 @@
// simple interface
void free_nng(void* data, const int data_len){
- if (data){
- free(data);
- }
+ free(data);
}
void copy_memory(void** dest, int *dest_len, const void* src, const int src_len){
@@ -69,16 +67,22 @@
*dest_len = src_len;
}
+void set_last_error(const std::string& emsg){
+ // verbose_info += emsg;
+}
+
void get_last_error(int* ec, void** emsg, int* emsg_len){
*emsg = NULL;
*emsg_len = 0;
*ec = nn_errno();
const char* msg = nn_strerror(*ec);
- string strMsg(msg);
- strMsg = strMsg + "{" + verbose_info + "}";
- copy_memory(emsg, emsg_len, strMsg.data(), strMsg.size());
- verbose_info.clear();
+ // string strMsg(msg);
+ // strMsg = strMsg + "{" + verbose_info + "}";
+ // copy_memory(emsg, emsg_len, strMsg.data(), strMsg.size());
+ // verbose_info.clear();
+
+ copy_memory(emsg, emsg_len, msg, strlen(msg));
}
///////////////////////////////////////////////////////
@@ -205,12 +209,12 @@
sub->socket_ = sock;
sub->t_ = get_thread([](const auto sub){
while (!sub->t_quit_.load()) {
- char* m;
+ char* m{};
int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
if (m_len > 0){
string tmp_msg{m, (size_t)m_len};
nn_freemsg(m);
- const auto topic{tmp_msg.c_str()};
+ string topic{tmp_msg.c_str()};
string msg{};
{
lock_guard<mutex> l{sub->operator()()};
@@ -224,7 +228,7 @@
// printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
if (!msg.empty()){
lock_guard<mutex> l(sub->mtx_msg_);
- sub->msg_.emplace_back(topic, move(msg));
+ sub->msg_.emplace_back(move(topic), move(msg));
sub->cv_msg_.notify_all();
}
@@ -305,9 +309,9 @@
PRNTVITAG("subscribe_read timeout");
return -1;
}
- const auto& tmp = sub->msg_.front();
- *topic = tmp.topic_;
- *msg = tmp.data_;
+ auto& tmp = sub->msg_.front();
+ *topic = std::move(tmp.topic_);
+ *msg = std::move(tmp.data_);
sub->msg_.pop_front();
return 0;
@@ -419,22 +423,21 @@
static void cb_recv_for_aio(work* w){
nng_msg *om = w->msg;
- if (!om) return;
+ if (!om) {nng_sleep_aio(0, w->aio); return;}
_rr* rep = (_rr*)w->user_data;
-
- string msg{(const char*)nng_msg_body(om), nng_msg_len(om)};
- nng_msg_free(om);
auto t = (*rep)();
lock_guard<mutex> l{rep->mtx_msg_};
rep->works_.emplace(get<0>(t), w);
- get<1>(t).emplace(get<0>(t), move(msg));
+ get<1>(t).emplace(get<0>(t), string{(const char*)nng_msg_body(om), nng_msg_len(om)});
get<0>(t)++;
rep->cv_msg_.notify_all();
+
+ nng_msg_free(om);
}
-static struct work *alloc_work(nng_socket sock, _rr* rep)
+static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode)
{
struct work *w;
int rv;
@@ -454,10 +457,12 @@
return NULL;
}
w->state = INIT;
+ w->mode = mode;
+
return (w);
}
-static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep){
+static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep, const int mode){
TAG;
if (sock->id > 0) return 0;
@@ -470,7 +475,7 @@
work** works = (work**)malloc(sizeof(void*) * count);
for (int i = 0; i < count; i++) {
- works[i] = alloc_work(*sock, rep);
+ works[i] = alloc_work(*sock, rep, mode);
}
remove_exist(url);
@@ -507,12 +512,12 @@
ipc = url;
}
rep->url_ = ipc;
- if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(rep->socks_), ipc, 62, rep, REPLY_IPC) != 0) return -1;
if (port > 0){
get<1>(get<1>(rep->socks_)) = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
- if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep, REPLY_TCP) != 0) return -1;
// printf("======>> create server for remote port %d\n", port);
}else {
get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
@@ -562,6 +567,7 @@
int tm = to_ms > 0 ? to_ms : 30;
uint64_t key{};
+ work* w{};
{
unique_lock<mutex> l(rep->mtx_msg_);
auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
@@ -575,12 +581,18 @@
key = iter->first;
*msg = move(iter->second);
rep->msg_.erase(iter);
+ auto witer = rep->works_.find(key);
+ if (witer != rep->works_.end()){
+ w = witer->second;
+ }
}
+
+ if (!w) return -1;
*src = malloc(sizeof(uint64_t));
*(uint64_t*)(*src) = key;
- return 0;
+ return w->mode;
}
int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){
@@ -590,6 +602,7 @@
struct work* w{};
{
auto key = *(static_cast<uint64_t*>(const_cast<void*>(src)));
+ free(const_cast<void*>(src));
lock_guard<mutex> l{rep->mtx_msg_};
auto iter = rep->works_.find(key);
--
Gitblit v1.8.0