From f552a0ea788225d8fe80942d6e8bd6500f788393 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 11 四月 2022 09:08:20 +0800
Subject: [PATCH] add log
---
src/nng_wrap.cpp | 22 +++++++++++++++-------
1 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index d5e1928..785ef1d 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -69,6 +69,10 @@
*dest_len = src_len;
}
+void set_last_error(const std::string& emsg){
+ verbose_info += emsg;
+}
+
void get_last_error(int* ec, void** emsg, int* emsg_len){
*emsg = NULL;
*emsg_len = 0;
@@ -145,7 +149,7 @@
rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0);
if (rc > 0){
nn_freemsg(tmp);
- printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
+ // printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
lock_guard<mutex> l{pub->mtx_msg_};
pub->msg_.pop_front();
continue;
@@ -162,6 +166,7 @@
}
int publish(const std::string& topic, const void* data, const int data_len, void* arg/*=NULL*/){
+ // printf("======>> publish topic %s\n", topic.c_str());
_ps* pub = (_ps*)arg;
if (!pub) pub = singleton<_ps>();
@@ -212,15 +217,15 @@
const auto topic{tmp_msg.c_str()};
string msg{};
{
- lock_guard<mutex> l{(*sub)()};
+ lock_guard<mutex> l{sub->operator()()};
for(auto && i : sub->topics_){
- if (i.compare(topic) == 0){
+ if (!!!i.compare(topic)){
msg = move(tmp_msg.substr(i.size()+1));
break;
}
}
}
- printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
+ // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
if (!msg.empty()){
lock_guard<mutex> l(sub->mtx_msg_);
sub->msg_.emplace_back(topic, move(msg));
@@ -232,7 +237,7 @@
lock_guard<mutex> l{sub->mtx_failed_topics_};
if (!sub->failed_topics_.empty()){
for(auto iter = sub->failed_topics_.begin(); iter != sub->failed_topics_.end();){
- if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, iter->c_str(), iter->length()) >= 0){
+ if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_SUBSCRIBE, iter->c_str(), iter->length()) >= 0){
iter = sub->failed_topics_.erase(iter);
}else{
iter++;
@@ -512,6 +517,7 @@
get<1>(get<1>(rep->socks_)) = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+ // printf("======>> create server for remote port %d\n", port);
}else {
get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
}
@@ -521,11 +527,13 @@
constexpr int idle = 10;
const auto data = rr_unblocking_msg_.data();
const auto data_size = rr_unblocking_msg_.size();
+ constexpr int life_span = timeout_req_rep*10;
+
auto f = [rep]{
vector<struct work*> tmp{};
lock_guard<mutex> l{rep->mtx_msg_};
for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
- if ((iter->second+=idle) > timeout_req_rep){
+ if ((iter->second+=idle) > life_span){
tmp.push_back(iter->second.w_);
iter = rep->works_.erase(iter);
}else {
@@ -564,7 +572,7 @@
return !rep->msg_.empty();
});
if (!status){
- PRNTVITAG("subscribe_read timeout");
+ PRNTVITAG("read_request timeout");
return -1;
}
auto iter = rep->msg_.begin();
--
Gitblit v1.8.0