From ab42172c747112e7306efb7aebdc853c3c45bd7a Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 15 十二月 2023 15:17:34 +0800 Subject: [PATCH] remove log --- src/nng_wrap.cpp | 65 ++++++++++++++++++++------------ 1 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp index c3b835f..1a2bc3e 100644 --- a/src/nng_wrap.cpp +++ b/src/nng_wrap.cpp @@ -4,7 +4,6 @@ #include <vector> #include "common.h" -using namespace std; #include <nng/protocol/reqrep0/rep.h> #include <nng/supplemental/util/platform.h> @@ -12,6 +11,8 @@ #include "nng/compat/nanomsg/reqrep.h" #include "nng/compat/nanomsg/pubsub.h" #include "nng/compat/nanomsg/survey.h" + +using namespace std; namespace nng_wrap { @@ -131,6 +132,8 @@ set_socket_timeout(sock, timeout_req_rep); pub->socket_ = sock; pub->t_ = get_thread([](const auto pub){ + string sndmsg{}; + sndmsg.reserve(1024); while (!pub->t_quit_.load()) { _ps::psmsg *msg{NULL}; { @@ -142,9 +145,14 @@ msg = &pub->msg_.front(); if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;} } - string sndmsg = (string{msg->topic_}+='\0')+=msg->data_; + sndmsg += msg->topic_; + sndmsg += '\0'; + sndmsg += msg->data_; + int sndmsgsize = (int)sndmsg.size(); + // string sndmsg = (string{msg->topic_}+='\0')+=msg->data_; int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0); - if (rc == (int)sndmsg.size()){ + sndmsg.clear(); + if (rc == sndmsgsize){ char* tmp{}; rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0); if (rc > 0){ @@ -205,35 +213,43 @@ PRNTVITAG("client_socket faild\n"); return -1; } - // set_socket_timeout(sock, timeout_req_rep); + set_socket_timeout(sock, 300); sub->socket_ = sock; sub->t_ = get_thread([](const auto sub){ while (!sub->t_quit_.load()) { + // for(auto&& i : sub->topics_) { + // printf("======>> sub topic %s\n", i.c_str()); + // } char* m{}; // int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0); if (m_len > 0){ - string tmp_msg{m, (size_t)m_len}; - nn_freemsg(m); - string topic{tmp_msg.c_str()}; - string msg{}; + char* topic = m; + char* msg = m + strlen(topic) + 1; + size_t msgl = m_len - strlen(topic) - 1; + // string tmp_msg{m, (size_t)m_len}; + // nn_freemsg(m); + // string topic{tmp_msg.c_str()}; + // string msg{}; + bool found_topic = false; { lock_guard<mutex> l{sub->operator()()}; for(auto && i : sub->topics_){ - if (!!!i.compare(topic)){ - msg = move(tmp_msg.substr(i.size()+1)); + if (0 == i.compare(topic)){ + found_topic = true; break; } } } - // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length()); - if (!msg.empty()){ + // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msgl); + if (found_topic){ lock_guard<mutex> l(sub->mtx_msg_); - sub->msg_.emplace_back(move(topic), move(msg)); + sub->msg_.emplace_back(string(topic), string(msg, msgl)); sub->cv_msg_.notify_all(); } - + nn_freemsg(m); }else { + if (!sub->failed_topics_.empty()) { lock_guard<mutex> l{sub->mtx_failed_topics_}; if (!sub->failed_topics_.empty()){ @@ -333,17 +349,16 @@ int& sock = sv->socket_; + char tmp[1024] = {0}; while (!sv->t_quit_.load()) { if (sock < 0){ sock = client_socket(sv->url_, NN_RESPONDENT); if (sock > 0) set_socket_timeout(sock, 126); + continue; } - if (sock < 0) continue; - char* tmp{}; int rc = nn_recv(sock, &tmp, NN_MSG, 0); if (rc > 0){ - nn_freemsg(tmp); rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0); if (rc < 0){ PRNTVITAG("heartbeat survey failed"); @@ -526,7 +541,7 @@ if (!rep->t_unblock_){ rep->t_unblock_.reset(new thread(get_thread([](const auto rep){ - constexpr int idle = 10; + constexpr int idle = 216; const auto data = rr_unblocking_msg_.data(); const auto data_size = rr_unblocking_msg_.size(); constexpr int life_span = timeout_req_rep*10; @@ -545,7 +560,7 @@ return tmp; }; while (!rep->t_quit_.load()) { - this_thread::sleep_for(chrono::milliseconds{10}); + this_thread::sleep_for(chrono::milliseconds{idle}); vector<struct work*> tmp = f(); for(auto && w : tmp){ aio_unblock(w, data, data_size); @@ -590,20 +605,22 @@ if (!w) return -1; - *src = malloc(sizeof(uint64_t)); - *(uint64_t*)(*src) = key; + *src = malloc(sizeof(key)); + memcpy(*src, &key, sizeof(key)); return w->mode; } -int send_reply(const void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){ +int send_reply(void* src, const void* msg, const int msg_len, void* arg/*=NULL*/){ _rr* rep = (_rr*)arg; if (!rep) rep = singleton<_rr>(); struct work* w{}; { - auto key = *(static_cast<uint64_t*>(const_cast<void*>(src))); - free(const_cast<void*>(src)); + uint64_t key; + memcpy(&key, src, sizeof(key)); + // auto key = *(static_cast<uint64_t*>(const_cast<void*>(src))); + // free(src); lock_guard<mutex> l{rep->mtx_msg_}; auto iter = rep->works_.find(key); -- Gitblit v1.8.0