From a884637d0376d469ee307ebe1d117ae908a4c340 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 13 十二月 2021 13:01:58 +0800
Subject: [PATCH] bug fixed for asyn rep-req timeout
---
src/nng_wrap.cpp | 255 +++++++++++++++++++++++++++---------------
main.cpp | 9
proto/cpp/CMakeLists.txt | 2
src/bn_api.cpp | 42 +++++-
4 files changed, 203 insertions(+), 105 deletions(-)
diff --git a/main.cpp b/main.cpp
index d279be2..ecbaa5a 100644
--- a/main.cpp
+++ b/main.cpp
@@ -6,6 +6,7 @@
#include <vector>
#include <thread>
#include <chrono>
+#include <atomic>
using namespace std;
#include "src/bn_api.h"
@@ -15,10 +16,11 @@
thread([]{
string base_cont("test_req_rep==");
+ atomic<uint64_t> index{0};
vector<thread> v_t;
- for (int i = 0; i < 200; i++){
- v_t.emplace_back([&base_cont, i]{
- int64_t index = 0;
+ for (int i = 0; i < 621; i++){
+ v_t.emplace_back([&base_cont, i, &index]{
+
while (true) {
// printf("start request\n");
// auto s = chrono::steady_clock::now();
@@ -31,7 +33,6 @@
});
}
- int64_t index = 0;
while (true) {
// printf("start request\n");
// auto s = chrono::steady_clock::now();
diff --git a/proto/cpp/CMakeLists.txt b/proto/cpp/CMakeLists.txt
index 4d09144..6acc776 100644
--- a/proto/cpp/CMakeLists.txt
+++ b/proto/cpp/CMakeLists.txt
@@ -23,7 +23,7 @@
foreach(file ${proto_files})
message(${file})
- message(${MESSAGE_DIR})
+
get_filename_component(FIL_WE ${file} NAME_WE)
# message(${FIL_WE})
diff --git a/src/bn_api.cpp b/src/bn_api.cpp
index 292ef4b..93ead09 100644
--- a/src/bn_api.cpp
+++ b/src/bn_api.cpp
@@ -176,7 +176,19 @@
{
if (!topic || topic_len <= 0) return false;
- return simple_request(get_url(URLQueryTopic), topic, topic_len, reply, reply_len, timeout_ms);
+ auto url(get_url(URLQueryTopic));
+
+ if (remote && remote_len > 0){
+ BHAddress addr;
+ if (addr.ParseFromArray(remote, remote_len)){
+ if (!addr.ip().empty() && addr.port() > 0){
+ // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+ printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str());
+ }
+ }
+ }
+
+ return simple_request(url, topic, topic_len, reply, reply_len, timeout_ms);
}
// 璇锋眰鍦ㄧ嚎杩涚▼ request
@@ -190,9 +202,19 @@
{
if (!query || query_len <= 0) return false;
- auto ret = simple_request(get_url(URLQueryProcs), query, query_len, reply, reply_len, timeout_ms);
- // printf("======>> BHQueryProcs *reply %p reply_len %d\n", *reply, *reply_len);
- return ret;
+ auto url(get_url(URLQueryProcs));
+
+ if (remote && remote_len > 0){
+ BHAddress addr;
+ if (addr.ParseFromArray(remote, remote_len)){
+ if (!addr.ip().empty() && addr.port() > 0){
+ // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+ printf("======>> BHQueryProcs use remote address %s\n", url.c_str());
+ }
+ }
+ }
+
+ return simple_request(url, query, query_len, reply, reply_len, timeout_ms);
}
// above communicate with center
@@ -352,11 +374,13 @@
auto url("ipc:///tmp/" + procid);
- BHAddress addr;
- if (addr.ParseFromArray(remote, remote_len)){
- if (!addr.ip().empty() && addr.port() > 0){
- url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
- printf("======>>use remote address %s\n", url.c_str());
+ if (remote && remote_len > 0){
+ BHAddress addr;
+ if (addr.ParseFromArray(remote, remote_len)){
+ if (!addr.ip().empty() && addr.port() > 0){
+ url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+ printf("======>> BHRequest use remote address %s\n", url.c_str());
+ }
}
}
// 浣跨敤procid浣滀负ipc閫氫俊
diff --git a/src/nng_wrap.cpp b/src/nng_wrap.cpp
index 5a54168..a231ae9 100644
--- a/src/nng_wrap.cpp
+++ b/src/nng_wrap.cpp
@@ -51,17 +51,17 @@
}
}
-static int server_socket(const string& url, const int protocol, int family=AF_SP){
- int sock = nn_socket(family, protocol);
- if (sock < 0) return sock;
- remove_exist(url);
- int rc = nn_bind(sock, url.c_str());
- if (rc < 0) {
- nn_close(sock);
- return rc;
- }
- return sock;
-}
+// static int server_socket(const string& url, const int protocol, int family=AF_SP){
+// int sock = nn_socket(family, protocol);
+// if (sock < 0) return sock;
+// remove_exist(url);
+// int rc = nn_bind(sock, url.c_str());
+// if (rc < 0) {
+// nn_close(sock);
+// return rc;
+// }
+// return sock;
+// }
static void set_socket_timeout(int sock, const int to_ms){
nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &to_ms, sizeof(to_ms));
@@ -106,17 +106,16 @@
*dest_len = src_len;
}
-static string verbose_info{};
+static thread_local string verbose_info{};
#ifndef PRNTVITAG
-// #define TAG do{ \
-// if (verbose_info.length() > 8096) verbose_info.clear(); \
-// verbose_info=string("function [")+__FUNCTION__+string("]");}while(0)
-/* #define PRNTVITAG(msg) do{ \
- if (verbose_info.length() > 8096) verbose_info.clear(); \
+#define TAG do{ verbose_info.clear(); \
+ verbose_info=string("function [")+__FUNCTION__+string("]"); \
+ }while(0)
+#define PRNTVITAG(msg) do{ \
verbose_info+=string("-> (") + msg + string(")"); \
- }while(0) */
-#define TAG
-#define PRNTVITAG(args)
+ }while(0)
+// #define TAG
+// #define PRNTVITAG(args)
#endif
void get_last_error(int* ec, void** emsg, int* emsg_len){
*emsg = NULL;
@@ -163,10 +162,12 @@
///////////////////////////////////////////////////////////
// base class
-#define DISABLE_COPY_AND_ASSIGN(className) className(const className&)=delete; \
- className(className&&)=delete; \
- className& operator=(const className&)=delete; \
- className& operator=(className&&)=delete
+#define DISABLE_COPY_AND_ASSIGN(className) \
+ className(const className&)=delete; \
+ className(className&&)=delete; \
+ className& operator=(const className&)=delete; \
+ className& operator=(className&&)=delete
+
class _nn{
public:
DISABLE_COPY_AND_ASSIGN(_nn);
@@ -178,15 +179,17 @@
///////////////////////////////////////////////
// publish
-struct psmsg{
- DISABLE_COPY_AND_ASSIGN(psmsg);
- psmsg()=delete;
- psmsg(const std::string& t, std::string&& m)
- :topic_(t),msg_(std::move(m)){}
- std::string topic_{};
- std::string msg_{};
-};
+
class _ps : public _nn{
+public:
+ struct psmsg{
+ DISABLE_COPY_AND_ASSIGN(psmsg);
+ psmsg()=delete;
+ psmsg(const std::string& t, std::string&& m)
+ :topic_(t),data_(std::move(m)){}
+ std::string topic_{};
+ std::string data_{};
+ };
public:
DISABLE_COPY_AND_ASSIGN(_ps);
_ps()=default;
@@ -217,7 +220,7 @@
pub_.socket_ = sock;
pub_.t_ = thread([]{
while (!pub_.t_quit_.load()) {
- psmsg *msg{NULL};
+ _ps::psmsg *msg{NULL};
{
unique_lock<mutex> l{pub_.mtx_msg_};
pub_.cv_msg_.wait(l, []{
@@ -227,22 +230,14 @@
msg = &pub_.msg_.front();
if (msg->topic_.empty()) {pub_.msg_.pop_front(); continue;}
}
- const auto &topic = msg->topic_;
- const auto topic_size = topic.size();
- const auto &data = msg->msg_;
- const auto data_size = data.size();
-
- char *sndmsg = (char*)malloc(topic_size + data_size);
- memcpy(sndmsg, topic.data(), topic_size);
- memcpy(sndmsg+topic_size, data.data(), data_size);
- int rc = nn_send(pub_.socket_, sndmsg, data_size+topic_size, 0);
- free(sndmsg);
- if (rc == (int)(data_size+topic_size)){
+ string sndmsg(msg->topic_ + msg->data_);
+ int rc = nn_send(pub_.socket_, sndmsg.data(), sndmsg.size(), 0);
+ if (rc == (int)sndmsg.size()){
char* tmp{};
rc = nn_recv(pub_.socket_, &tmp, NN_MSG, 0);
+ nn_freemsg(tmp);
if (rc > 0){
- nn_freemsg(tmp);
- printf("======>> publish topic %s data length %lu\n", topic.c_str(), data_size);
+ printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
lock_guard<mutex> l{pub_.mtx_msg_};
pub_.msg_.pop_front();
continue;
@@ -297,8 +292,10 @@
if (sub_.socket_ > 0) return 0;
sub_.url_ = url;
+ TAG;
int sock = client_socket(url, NN_SUB);
if (sock < 0){
+ PRNTVITAG("client_socket faild\n");
return -1;
}
// set_socket_timeout(sock, timeout_req_rep);
@@ -308,20 +305,20 @@
char* m;
int m_len = nn_recv(sub_.socket_, &m, NN_MSG, NN_DONTWAIT);
if (m_len > 0){
+ string tmp_msg{m, (size_t)m_len};
+ nn_freemsg(m);
string topic{}, msg{};
{
lock_guard<mutex> l{sub_.mtx_topics_};
for(auto && i : sub_.topics_){
- auto topic_len = i.size();
- if (m_len <= (int)topic_len) continue;
- topic.assign(m, topic_len);
+ if (tmp_msg.size() < i.size()) continue;
+ topic = tmp_msg.substr(0, i.size());
if (topic == i){
- msg.assign(m+topic_len, m_len-topic_len);
+ msg = tmp_msg.substr(i.size());
break;
}
}
}
- nn_freemsg(m);
printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length());
if (!msg.empty()){
lock_guard<mutex> l(sub_.mtx_msg_);
@@ -351,14 +348,19 @@
}
int subscribe_topic(const std::string& topic){
+ TAG;
if (sub_.socket_ < 0){
subscribe_center(sub_.url_);
}
- if (sub_.socket_ < 0) return -1;
+ if (sub_.socket_ < 0) {
+ PRNTVITAG("socket_ < 0");
+ return -1;
+ }
auto ret = nn_setsockopt(sub_.socket_, NN_SUB, NN_SUB_SUBSCRIBE, topic.c_str(), topic.length());
// printf("set NN_SUB_SUBSCRIBE topic %s ret %d\n", topic.c_str(), ret);
if (ret < 0){
+ PRNTVITAG("nn_setsockopt failed");
lock_guard<mutex> l{sub_.mtx_failed_topics_};
sub_.failed_topics_.insert(topic);
}
@@ -381,6 +383,8 @@
int subscribe_read(std::string* topic, std::string* msg, const int to_ms){
+ TAG;
+
int tm = to_ms > 0 ? to_ms : 30;
unique_lock<mutex> l(sub_.mtx_msg_);
@@ -393,7 +397,7 @@
}
const auto& tmp = sub_.msg_.front();
*topic = tmp.topic_;
- *msg = tmp.msg_;
+ *msg = tmp.data_;
sub_.msg_.pop_front();
return 0;
@@ -421,21 +425,22 @@
survey_.url_ = url;
survey_.fixed_msg_ = move(fixed_msg);
survey_.t_ = thread([]{
+
+ TAG;
+
int& sock = survey_.socket_;
const auto& msg = survey_.fixed_msg_;
while (!survey_.t_quit_.load()) {
if (sock < 0){
sock = client_socket(survey_.url_, NN_RESPONDENT);
- if (sock > 0){
- set_socket_timeout(sock, 126);
- }
+ if (sock > 0) set_socket_timeout(sock, 126);
}
if (sock < 0) continue;
char* tmp{};
int rc = nn_recv(sock, &tmp, NN_MSG, 0);
+ nn_freemsg(tmp);
if (rc > 0){
- nn_freemsg(tmp);
rc = nn_send(sock, msg.data(), msg.size(), 0);
if (rc < 0){
PRNTVITAG("heartbeat survey failed");
@@ -451,12 +456,6 @@
//////////////////////////////////////////////
// reply for request
-int request2(const std::string &ipc, const void* r, const int r_len,
- void** reply, int* reply_len, const int to_ms)
-{
- return simple_request(ipc, r, r_len, reply, reply_len, to_ms);
-}
-
enum { INIT, RECV, WAIT, SEND };
struct work {
int state{-1};
@@ -471,15 +470,38 @@
DISABLE_COPY_AND_ASSIGN(_rr);
_rr()=default;
~_rr(){
-
+ if(sock_local_.id > 0) nng_close(sock_local_);
+ if(sock_remote_.id > 0) nng_close(sock_remote_);
+ t_quit_.store(true, memory_order_relaxed);
+ if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
}
+
+ const string unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
+
+ unique_ptr<thread> t_unblock_{nullptr};
+ atomic_bool t_quit_{false};
nng_socket sock_local_{0};
nng_socket sock_remote_{0};
int port_{-1};
unordered_map<uint64_t, string> msg_{};
- unordered_map<uint64_t, struct work*> works_{};
+ class worker{
+ worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
+ public:
+ worker()=default;
+ ~worker()=default;
+ worker(struct work* w):w_(w),life_(0){}
+ worker(const worker& w):w_(w.w_),life_(w.life_){}
+ worker(worker&& w):w_(w.w_),life_(w.life_){}
+ worker& operator=(const worker& w){return in_op(w);}
+ worker& operator=(worker&& w){return in_op(w);}
+ operator struct work*() const{return w_;}
+ operator int&() {return life_;}
+ struct work* w_{};
+ int life_{};
+ };
+ unordered_map<uint64_t, worker> works_{};
uint64_t work_index_{0};
mutex mtx_msg_{};
condition_variable cv_msg_{};
@@ -488,13 +510,35 @@
static _rr reply_;
-static void
-server_cb(void *arg)
+int request2(const std::string &ipc, const void* r, const int r_len,
+ void** reply, int* reply_len, const int to_ms)
{
+ const auto suc = simple_request(ipc, r, r_len, reply, reply_len, to_ms);
+ if (suc){
+ const size_t sl = reply_.unblocking_msg_.size();
+ const size_t rl = *reply_len;
+ if (sl != rl) return true;
+
+ const auto& s = reply_.unblocking_msg_;
+ auto r = (const char*)(*reply);
+ if (s.compare(0, sl, r, rl) == 0){
+ free(*reply);
+ *reply = NULL;
+ *reply_len = 0;
+ return false;
+ }
+ }
+ return suc;
+}
+
+static void server_cb(void *arg)
+{
+ if (!arg) return;
+
struct work *work = (struct work*)arg;
nng_msg * msg;
int rv;
- uint32_t when{0};
+ // uint32_t when{0};
switch (work->state) {
case INIT:
@@ -522,7 +566,6 @@
case SEND:
if ((rv = nng_aio_result(work->aio)) != 0) {
nng_msg_free(work->msg);
- break;
}
work->state = RECV;
nng_ctx_recv(work->ctx, work->aio);
@@ -534,12 +577,18 @@
static void cb_recv_for_aio(work* w){
nng_msg *om = w->msg;
+ if (!om) return;
+
string msg{(const char*)nng_msg_body(om), nng_msg_len(om)};
nng_msg_free(om);
lock_guard<mutex> l{reply_.mtx_msg_};
- reply_.works_[reply_.work_index_] = w;
- reply_.msg_[reply_.work_index_] = msg;
+ reply_.works_.emplace(reply_.work_index_, w);
+ reply_.msg_.emplace(reply_.work_index_, move(msg));
+ // reply_.works_.insert({reply_.work_index_, w});
+ // reply_.msg_.insert({reply_.work_index_, msg});
+ // reply_.works_[reply_.work_index_] = w;
+ // reply_.msg_[reply_.work_index_] = msg;
reply_.work_index_++;
reply_.cv_msg_.notify_all();
}
@@ -565,9 +614,10 @@
}
static constexpr int PARALLEL = 62;
-static struct work* works[PARALLEL]{};
+static struct work* works_local[PARALLEL]{};
+static struct work* works_remote[PARALLEL]{};
-static int create_server(nng_socket* sock, const string& url){
+static int create_server(nng_socket* sock, const string& url, work** works){
TAG;
if (sock->id > 0) return 0;
@@ -596,22 +646,54 @@
return 0;
}
+static void aio_unblock(work* w, const void* msg, const int msg_len){
+ nng_msg_alloc(&w->msg, 0);
+ nng_msg_append(w->msg, msg, msg_len);
+
+ nng_sleep_aio(0, w->aio);
+}
+
int start_reply(const std::string& url, const int port){
- TAG;
string ipc = "ipc:///tmp/" + url;
if (url.find("ipc://") == 0){
ipc = url;
}
reply_.url_ = ipc;
- if(create_server(&reply_.sock_local_, ipc) != 0) return -1;
+ if(create_server(&reply_.sock_local_, ipc, works_local) != 0) return -1;
if (port > 0){
reply_.port_ = port;
ipc = "tcp://0.0.0.0:" + to_string(port);
- if(create_server(&reply_.sock_remote_, ipc) != 0) return -1;
+ if(create_server(&reply_.sock_remote_, ipc, works_remote) != 0) return -1;
}else {
reply_.sock_remote_.id = numeric_limits<int32_t>::max();
+ }
+
+ if (!reply_.t_unblock_){
+ reply_.t_unblock_.reset(new thread([]{
+ constexpr int idle = 10;
+ const auto data = reply_.unblocking_msg_.data();
+ const auto data_size = reply_.unblocking_msg_.size();
+ while (!reply_.t_quit_.load()) {
+ this_thread::sleep_for(chrono::milliseconds{10});
+ vector<struct work*> tmp{};
+ {
+ lock_guard<mutex> l{reply_.mtx_msg_};
+ for(auto iter = reply_.works_.begin(); iter != reply_.works_.end();){
+ if ((iter->second+=idle) > timeout_req_rep){
+ tmp.push_back(iter->second.w_);
+ iter = reply_.works_.erase(iter);
+ }else {
+ ++iter;
+ }
+ }
+ }
+ for(auto && w : tmp){
+ aio_unblock(w, data, data_size);
+ }
+ }
+ }));
}
return 0;
@@ -619,15 +701,13 @@
int read_request(void** src, std::string* msg, const int to_ms){
- if (reply_.sock_local_.id == 0 || reply_.sock_remote_.id == 0) {
+ if (reply_.sock_local_.id == 0 || reply_.sock_remote_.id == 0)
if (start_reply(reply_.url_, reply_.port_) != 0)
return -1;
- }
int tm = to_ms > 0 ? to_ms : 30;
uint64_t key{};
- string tmpmsg;
{
unique_lock<mutex> l(reply_.mtx_msg_);
auto status = reply_.cv_msg_.wait_for(l, chrono::milliseconds{tm}, []{
@@ -637,16 +717,14 @@
PRNTVITAG("subscribe_read timeout");
return -1;
}
- const auto& iter = reply_.msg_.begin();
+ auto iter = reply_.msg_.begin();
key = iter->first;
- tmpmsg = iter->second;
+ *msg = move(iter->second);
reply_.msg_.erase(iter);
}
- *msg = move(tmpmsg);
- auto s = (uint64_t*)malloc(sizeof(uint64_t));
- *s = key;
- *src = s;
+ *src = malloc(sizeof(uint64_t));
+ *(uint64_t*)(*src) = key;
return 0;
}
@@ -663,12 +741,7 @@
reply_.works_.erase(iter);
}
- TAG;
-
- nng_msg_alloc(&w->msg, 0);
- nng_msg_append(w->msg, msg, msg_len);
-
- nng_sleep_aio(0, w->aio);
+ aio_unblock(w, msg, msg_len);
return 0;
}
--
Gitblit v1.8.0