From f552a0ea788225d8fe80942d6e8bd6500f788393 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 11 四月 2022 09:08:20 +0800
Subject: [PATCH] add log
---
src/nng_wrap.cpp | 31 +++++++++++++++++++------------
1 files changed, 19 insertions(+), 12 deletions(-)
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 31fad46..785ef1d 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -69,6 +69,10 @@
*dest_len = src_len;
}
+void set_last_error(const std::string& emsg){
+ verbose_info += emsg;
+}
+
void get_last_error(int* ec, void** emsg, int* emsg_len){
*emsg = NULL;
*emsg_len = 0;
@@ -138,14 +142,14 @@
msg = &pub->msg_.front();
if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;}
}
- string sndmsg(msg->topic_ + msg->data_);
+ string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0);
if (rc == (int)sndmsg.size()){
char* tmp{};
rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0);
if (rc > 0){
nn_freemsg(tmp);
- printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
+ // printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
lock_guard<mutex> l{pub->mtx_msg_};
pub->msg_.pop_front();
continue;
@@ -162,6 +166,7 @@
}
int publish(const std::string& topic, const void* data, const int data_len, void* arg/*=NULL*/){
+ // printf("======>> publish topic %s\n", topic.c_str());
_ps* pub = (_ps*)arg;
if (!pub) pub = singleton<_ps>();
@@ -209,19 +214,18 @@
if (m_len > 0){
string tmp_msg{m, (size_t)m_len};
nn_freemsg(m);
- string topic{}, msg{};
+ const auto topic{tmp_msg.c_str()};
+ string msg{};
{
- lock_guard<mutex> l{(*sub)()};
+ lock_guard<mutex> l{sub->operator()()};
for(auto && i : sub->topics_){
- if (tmp_msg.size() < i.size()) continue;
- topic = move(tmp_msg.substr(0, i.size()));
- if (topic == i){
- msg = move(tmp_msg.substr(i.size()));
+ if (!!!i.compare(topic)){
+ msg = move(tmp_msg.substr(i.size()+1));
break;
}
}
}
- printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length());
+ // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
if (!msg.empty()){
lock_guard<mutex> l(sub->mtx_msg_);
sub->msg_.emplace_back(topic, move(msg));
@@ -233,7 +237,7 @@
lock_guard<mutex> l{sub->mtx_failed_topics_};
if (!sub->failed_topics_.empty()){
for(auto iter = sub->failed_topics_.begin(); iter != sub->failed_topics_.end();){
- if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, iter->c_str(), iter->length()) >= 0){
+ if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_SUBSCRIBE, iter->c_str(), iter->length()) >= 0){
iter = sub->failed_topics_.erase(iter);
}else{
iter++;
@@ -513,6 +517,7 @@
get<1>(get<1>(rep->socks_)) = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
+ // printf("======>> create server for remote port %d\n", port);
}else {
get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
}
@@ -522,11 +527,13 @@
constexpr int idle = 10;
const auto data = rr_unblocking_msg_.data();
const auto data_size = rr_unblocking_msg_.size();
+ constexpr int life_span = timeout_req_rep*10;
+
auto f = [rep]{
vector<struct work*> tmp{};
lock_guard<mutex> l{rep->mtx_msg_};
for(auto iter = rep->works_.begin(); iter != rep->works_.end();){
- if ((iter->second+=idle) > timeout_req_rep){
+ if ((iter->second+=idle) > life_span){
tmp.push_back(iter->second.w_);
iter = rep->works_.erase(iter);
}else {
@@ -565,7 +572,7 @@
return !rep->msg_.empty();
});
if (!status){
- PRNTVITAG("subscribe_read timeout");
+ PRNTVITAG("read_request timeout");
return -1;
}
auto iter = rep->msg_.begin();
--
Gitblit v1.8.0