From a49287079cb5a97ef65818b70529c9d3bbdd99fa Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 14 十二月 2022 17:37:32 +0800
Subject: [PATCH] remove last error for memory taken
---
src/nng_wrap.cpp | 90 +++++++++++++++++++++++++++-----------------
1 files changed, 55 insertions(+), 35 deletions(-)
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 3ae3f62..f2c22c4 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -56,9 +56,7 @@
// simple interface
void free_nng(void* data, const int data_len){
- if (data){
- free(data);
- }
+ free(data);
}
void copy_memory(void** dest, int *dest_len, const void* src, const int src_len){
@@ -69,16 +67,22 @@
*dest_len = src_len;
}
+void set_last_error(const std::string& emsg){
+ // verbose_info += emsg;
+}
+
void get_last_error(int* ec, void** emsg, int* emsg_len){
*emsg = NULL;
*emsg_len = 0;
*ec = nn_errno();
const char* msg = nn_strerror(*ec);
- string strMsg(msg);
- strMsg = strMsg + "{" + verbose_info + "}";
- copy_memory(emsg, emsg_len, strMsg.data(), strMsg.size());
- verbose_info.clear();
+ // string strMsg(msg);
+ // strMsg = strMsg + "{" + verbose_info + "}";
+ // copy_memory(emsg, emsg_len, strMsg.data(), strMsg.size());
+ // verbose_info.clear();
+
+ copy_memory(emsg, emsg_len, msg, strlen(msg));
}
///////////////////////////////////////////////////////
@@ -138,14 +142,14 @@
msg = &pub->msg_.front();
if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;}
}
- string sndmsg(msg->topic_ + msg->data_);
+ string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0);
if (rc == (int)sndmsg.size()){
char* tmp{};
rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0);
if (rc > 0){
nn_freemsg(tmp);
- printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
+ // printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
lock_guard<mutex> l{pub->mtx_msg_};
pub->msg_.pop_front();
continue;
@@ -162,6 +166,7 @@
}
int publish(const std::string& topic, const void* data, const int data_len, void* arg/*=NULL*/){
+ // printf("======>> publish topic %s\n", topic.c_str());
_ps* pub = (_ps*)arg;
if (!pub) pub = singleton<_ps>();
@@ -204,27 +209,26 @@
sub->socket_ = sock;
sub->t_ = get_thread([](const auto sub){
while (!sub->t_quit_.load()) {
- char* m;
+ char* m{};
int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
if (m_len > 0){
string tmp_msg{m, (size_t)m_len};
nn_freemsg(m);
- string topic{}, msg{};
+ string topic{tmp_msg.c_str()};
+ string msg{};
{
- lock_guard<mutex> l{(*sub)()};
+ lock_guard<mutex> l{sub->operator()()};
for(auto && i : sub->topics_){
- if (tmp_msg.size() < i.size()) continue;
- topic = move(tmp_msg.substr(0, i.size()));
- if (topic == i){
- msg = move(tmp_msg.substr(i.size()));
+ if (!!!i.compare(topic)){
+ msg = move(tmp_msg.substr(i.size()+1));
break;
}
}
}
- printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length());
+ // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
if (!msg.empty()){
lock_guard<mutex> l(sub->mtx_msg_);
- sub->msg_.emplace_back(topic, move(msg));
+ sub->msg_.emplace_back(move(topic), move(msg));
sub->cv_msg_.notify_all();
}
@@ -233,7 +237,7 @@
lock_guard<mutex> l{sub->mtx_failed_topics_};
if (!sub->failed_topics_.empty()){
for(auto iter = sub->failed_topics_.begin(); iter != sub->failed_topics_.end();){
- if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, iter->c_str(), iter->length()) >= 0){
+ if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_SUBSCRIBE, iter->c_str(), iter->length()) >= 0){
iter = sub->failed_topics_.erase(iter);
}else{
iter++;
@@ -305,9 +309,9 @@
PRNTVITAG("subscribe_read timeout");
return -1;
}
- const auto& tmp = sub->msg_.front();
- *topic = tmp.topic_;
- *msg = tmp.data_;
+ auto& tmp = sub->msg_.front();
+ *topic = std::move(tmp.topic_);
+ *msg = std::move(tmp.data_);
sub->msg_.pop_front();
return 0;
@@ -407,6 +411,7 @@
case SEND:
if ((rv = nng_aio_result(work->aio)) != 0) {
nng_msg_free(work->msg);
+ work->msg = NULL;
}
work->state = RECV;
nng_ctx_recv(work->ctx, work->aio);
@@ -418,22 +423,21 @@
static void cb_recv_for_aio(work* w){
nng_msg *om = w->msg;
- if (!om) return;
+ if (!om) {nng_sleep_aio(0, w->aio); return;}
_rr* rep = (_rr*)w->user_data;
-
- string msg{(const char*)nng_msg_body(om), nng_msg_len(om)};
- nng_msg_free(om);
auto t = (*rep)();
lock_guard<mutex> l{rep->mtx_msg_};
rep->works_.emplace(get<0>(t), w);
- get<1>(t).emplace(get<0>(t), move(msg));
+ get<1>(t).emplace(get<0>(t), string{(const char*)nng_msg_body(om), nng_msg_len(om)});
get<0>(t)++;
rep->cv_msg_.notify_all();
+
+ nng_msg_free(om);
}
-static struct work *alloc_work(nng_socket sock, _rr* rep)
+static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode)
{
struct work *w;
int rv;
@@ -445,16 +449,20 @@
w->user_data = rep;
if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
+ nng_free(w, sizeof(*w));
return NULL;
}
if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
+ nng_free(w, sizeof(*w));
return NULL;
}
w->state = INIT;
+ w->mode = mode;
+
return (w);
}
-static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep){
+static int create_server(nng_socket* sock, const string& url, const int count, _rr* rep, const int mode){
TAG;
if (sock->id > 0) return 0;
@@ -467,12 +475,13 @@
work** works = (work**)malloc(sizeof(void*) * count);
for (int i = 0; i < count; i++) {
- works[i] = alloc_work(*sock, rep);
+ works[i] = alloc_work(*sock, rep, mode);
}
remove_exist(url);
rv = nng_listen(*sock, url.c_str(), NULL, 0);
if (rv < 0){
+ for(int i = 0; i < count; i++) if(works[i]) nng_free(works[i], sizeof(work));
free(works);
PRNTVITAG("create_server nng_listen failed");
PRNTVITAG(url);
@@ -503,12 +512,13 @@
ipc = url;
}
rep->url_ = ipc;
- if(create_server(&get<0>(rep->socks_), ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(rep->socks_), ipc, 62, rep, REPLY_IPC) != 0) return -1;
if (port > 0){
get<1>(get<1>(rep->socks_)) = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
- if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+ if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep, REPLY_TCP) != 0) return -1;
+ // printf("======>> create server for remote port %d\n", port);
}else {
get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
}
@@ -518,11 +528,13 @@
constexpr int idle = 10;
const auto data = rr_unblocking_msg_.data();
const auto data_size = rr_unblocking_msg_.size();
+ constexpr int life_span = timeout_req_rep*10;
+
auto f = [rep]{
vector<struct work*> tmp{};
lock_guard<mutex> l{rep->mtx_msg_};
for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
- if ((iter->second+=idle) > timeout_req_rep){
+ if ((iter->second+=idle) > life_span){
tmp.push_back(iter->second.w_);
iter = rep->works_.erase(iter);
}else {
@@ -555,25 +567,32 @@
int tm = to_ms > 0 ? to_ms : 30;
uint64_t key{};
+ work* w{};
{
unique_lock<mutex> l(rep->mtx_msg_);
auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
return !rep->msg_.empty();
});
if (!status){
- PRNTVITAG("subscribe_read timeout");
+ PRNTVITAG("read_request timeout");
return -1;
}
auto iter = rep->msg_.begin();
key = iter->first;
*msg = move(iter->second);
rep->msg_.erase(iter);
+ auto witer = rep->works_.find(key);
+ if (witer != rep->works_.end()){
+ w = witer->second;
+ }
}
+
+ if (!w) return -1;
*src = malloc(sizeof(uint64_t));
*(uint64_t*)(*src) = key;
- return 0;
+ return w->mode;
}
int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){
@@ -583,6 +602,7 @@
struct work* w{};
{
auto key = *(static_cast<uint64_t*>(const_cast<void*>(src)));
+ free(const_cast<void*>(src));
lock_guard<mutex> l{rep->mtx_msg_};
auto iter = rep->works_.find(key);
--
Gitblit v1.8.0