From f93a93f959c77aaeaf9154a7d8950691b91e1009 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 三月 2023 13:31:33 +0800
Subject: [PATCH] bug fixed
---
src/nng_wrap.cpp | 40 +++++++++++++++++++++++++---------------
src/common.h | 3 +++
src/bn_api.cpp | 1 -
3 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index 3ecf961..50aec06 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -505,7 +505,6 @@
auto back_msg = "back:" + msg;
BHSendReply(src, back_msg.c_str(), back_msg.size());
- BHFree(src, 0);
}
void TestPub(const char* t, const int t_l, const char* d, const int d_l){
diff --git a/src/common.h b/src/common.h
index ea15849..cea8c28 100644
--- a/src/common.h
+++ b/src/common.h
@@ -134,6 +134,8 @@
DISABLE_COPY_AND_ASSIGN(psmsg);
psmsg(const std::string& t, std::string&& m)
:topic_(t),data_(std::move(m)){}
+ psmsg(std::string&& t, std::string&& m)
+ :topic_(std::move(t)),data_(std::move(m)){}
std::string topic_{};
std::string data_{};
};
@@ -227,6 +229,7 @@
std::mutex mtx_msg_{};
std::condition_variable cv_msg_{};
+ // std::deque<void*> q_src_{};
};
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 53a192a..0a836fc 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -131,6 +131,8 @@
set_socket_timeout(sock, timeout_req_rep);
pub->socket_ = sock;
pub->t_ = get_thread([](const auto pub){
+ string sndmsg{};
+ sndmsg.reserve(1024);
while (!pub->t_quit_.load()) {
_ps::psmsg *msg{NULL};
{
@@ -142,9 +144,14 @@
msg = &pub->msg_.front();
if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;}
}
- string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
+ sndmsg += msg->topic_;
+ sndmsg += '\0';
+ sndmsg += msg->data_;
+ int sndmsgsize = (int)sndmsg.size();
+ // string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0);
- if (rc == (int)sndmsg.size()){
+ sndmsg.clear();
+ if (rc == sndmsgsize){
char* tmp{};
rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0);
if (rc > 0){
@@ -213,26 +220,30 @@
// int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0);
if (m_len > 0){
- string tmp_msg{m, (size_t)m_len};
- nn_freemsg(m);
- string topic{tmp_msg.c_str()};
- string msg{};
+ char* topic = m;
+ char* msg = m + strlen(topic) + 1;
+ size_t msgl = m_len - strlen(topic) - 1;
+ // string tmp_msg{m, (size_t)m_len};
+ // nn_freemsg(m);
+ // string topic{tmp_msg.c_str()};
+ // string msg{};
+ bool found_topic = false;
{
lock_guard<mutex> l{sub->operator()()};
for(auto && i : sub->topics_){
- if (!!!i.compare(topic)){
- msg = move(tmp_msg.substr(i.size()+1));
+ if (0 == i.compare(topic)){
+ found_topic = true;
break;
}
}
}
// printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
- if (!msg.empty()){
+ if (found_topic){
lock_guard<mutex> l(sub->mtx_msg_);
- sub->msg_.emplace_back(move(topic), move(msg));
+ sub->msg_.emplace_back(string(topic), string(msg, msgl));
sub->cv_msg_.notify_all();
}
-
+ nn_freemsg(m);
}else {
{
lock_guard<mutex> l{sub->mtx_failed_topics_};
@@ -333,17 +344,16 @@
int& sock = sv->socket_;
+ char tmp[1024] = {0};
while (!sv->t_quit_.load()) {
if (sock < 0){
sock = client_socket(sv->url_, NN_RESPONDENT);
if (sock > 0) set_socket_timeout(sock, 126);
+ continue;
}
- if (sock < 0) continue;
- char* tmp{};
int rc = nn_recv(sock, &tmp, NN_MSG, 0);
if (rc > 0){
- nn_freemsg(tmp);
rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0);
if (rc < 0){
PRNTVITAG("heartbeat survey failed");
@@ -605,7 +615,7 @@
uint64_t key;
memcpy(&key, src, sizeof(key));
// auto key = *(static_cast<uint64_t*>(const_cast<void*>(src)));
- free(src);
+ // free(src);
lock_guard<mutex> l{rep->mtx_msg_};
auto iter = rep->works_.find(key);
--
Gitblit v1.8.0