From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 12 四月 2021 18:29:41 +0800
Subject: [PATCH] add fail-resend support.
---
utest/speed_test.cpp | 12 +-
src/failed_msg.h | 47 +++++++
src/socket.h | 9 +
.vscode/launch.json | 2
src/socket.cpp | 8
utest/utest.cpp | 26 ++++
src/failed_msg.cpp | 33 +++++
src/timed_queue.h | 75 ++++++++++++
src/topic_node.cpp | 48 +------
src/center.cpp | 47 ++++---
10 files changed, 235 insertions(+), 72 deletions(-)
diff --git a/.vscode/launch.json b/.vscode/launch.json
index ef42f7b..12aa21d 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -11,7 +11,7 @@
"program": "${workspaceFolder}/debug/bin/utest",
"args": [
"-t",
- "ReqRepTest"
+ "SRTest"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
diff --git a/src/center.cpp b/src/center.cpp
index d2aad0a..7865e57 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -18,6 +18,7 @@
#include "center.h"
#include "bh_util.h"
#include "defs.h"
+#include "failed_msg.h"
#include "shm.h"
#include <chrono>
#include <set>
@@ -364,28 +365,31 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
-
auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
-
- auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
+ auto center_failed_q = std::make_shared<FailedMsgQ>();
+ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
- bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
- if (!r) {
- printf("send reply failed.\n");
+ MsgI msg;
+ if (msg.Make(socket.shm(), reply_head, rep_body)) {
+ auto &remote = head.route(0).mq_id();
+ bool r = socket.Send(remote.data(), msg, timeout_ms);
+ if (!r) {
+ failq.Push(remote, msg, 60s); // for later retry.
+ }
}
- //TODO resend failed.
};
};
- auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
+ auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) {
auto ¢er = *center_ptr;
+ center_failed_q->TrySend(socket);
center->OnTimer();
};
auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
- auto replyer = MakeReplyer(socket, head, center->id());
+ auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q);
switch (head.type()) {
CASE_ON_MSG_TYPE(Register);
CASE_ON_MSG_TYPE(Heartbeat);
@@ -396,10 +400,11 @@
}
};
- auto OnBusIdle = [](ShmSocket &socket) {};
+ auto bus_failed_q = std::make_shared<FailedMsgQ>();
+ auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); };
auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
- auto replyer = MakeReplyer(socket, head, center->id());
+ auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q);
auto OnPublish = [&]() {
MsgPublish pub;
NodeCenter::Clients clients;
@@ -407,19 +412,25 @@
if (head.route_size() != 1 || !msg.ParseBody(pub)) {
return;
} else if (!center->FindClients(head, pub, clients, reply)) {
- MakeReplyer(socket, head, center->id())(reply);
+ replyer(reply);
} else {
- MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+ replyer(MakeReply(eSuccess));
if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+ if (clients.empty()) { return; }
- for (auto &cli : clients) {
+ auto it = clients.begin();
+ do {
+ auto &cli = *it;
auto node = cli.weak_node_.lock();
if (node) {
- if (!socket.Send(cli.mq_.data(), msg, 100)) {
- printf("center route publish failed. need resend.\n");
+ if (!socket.Send(cli.mq_.data(), msg, 0)) {
+ bus_failed_q->Push(cli.mq_, msg, 60s);
}
+ ++it;
+ } else {
+ it = clients.erase(it);
}
- }
+ } while (it != clients.end());
}
};
switch (head.type()) {
@@ -484,7 +495,7 @@
{
for (auto &kv : Centers()) {
auto &info = kv.second;
- sockets_[info.name_]->Start(info.handler_);
+ sockets_[info.name_]->Start(info.handler_, info.idle_);
}
return true;
diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp
new file mode 100644
index 0000000..ab4658d
--- /dev/null
+++ b/src/failed_msg.cpp
@@ -0,0 +1,33 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: failed_msg.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�12鏃� 16鏃�10鍒�53绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "failed_msg.h"
+
+FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg)
+{
+ msg.AddRef();
+ return [remote, msg](void *valid_sock) {
+ assert(valid_sock);
+ ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
+ bool r = sock.Send(remote.data(), msg, 0);
+ if (r && msg.IsCounted()) {
+ auto tmp = msg; // Release() is not const, but it's safe to release.
+ tmp.Release(sock.shm());
+ }
+ return r;
+ };
+}
\ No newline at end of file
diff --git a/src/failed_msg.h b/src/failed_msg.h
new file mode 100644
index 0000000..2d57abc
--- /dev/null
+++ b/src/failed_msg.h
@@ -0,0 +1,47 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: failed_msg.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�12鏃� 11鏃�21鍒�30绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef FAILED_MSG_9YOI86AS
+#define FAILED_MSG_9YOI86AS
+
+#include "msg.h"
+#include "socket.h"
+#include "timed_queue.h"
+#include <string>
+
+class FailedMsgQ
+{
+ typedef std::function<bool(void *)> Func;
+ typedef TimedQueue<Func> TimedFuncQ;
+
+public:
+ typedef bhome_msg::MsgI Msg;
+
+ void Push(const std::string &remote, Msg const &msg, TimedFuncQ::TimePoint const &exr) { queue_.Push(PrepareSender(remote, msg), exr); }
+ void Push(const std::string &remote, Msg const &msg, TimedFuncQ::Duration const &exr) { queue_.Push(PrepareSender(remote, msg), exr); }
+ void TrySend(ShmSocket &socket)
+ {
+ queue_.CheckAll([&](Func &f) { return f(&socket); });
+ }
+
+private:
+ Func PrepareSender(const std::string &remote, Msg const &msg);
+
+ TimedFuncQ queue_;
+};
+
+#endif // end of include guard: FAILED_MSG_9YOI86AS
diff --git a/src/socket.cpp b/src/socket.cpp
index 116175d..2c55665 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -45,11 +45,12 @@
{
auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
RecvCB cb;
- if (async_cbs_->Find(head.msg_id(), cb)) {
+ if (per_msg_cbs_->Find(head.msg_id(), cb)) {
cb(socket, imsg, head);
} else if (onData) {
onData(socket, imsg, head);
- } // else ignored, or dropped
+ } else { // else ignored, or dropped
+ }
};
auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
@@ -61,7 +62,8 @@
if (imsg.ParseHead(head)) {
onRecvWithPerMsgCB(*this, imsg, head);
}
- } else if (onIdle) {
+ }
+ if (onIdle) {
onIdle(*this);
}
} catch (...) {
diff --git a/src/socket.h b/src/socket.h
index ee25d81..5973ab6 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -77,7 +77,7 @@
template <class Body>
bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
{
- auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
+ auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); };
MsgI msg;
return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
}
@@ -109,12 +109,15 @@
reply.swap(msg);
reply_head.Swap(&head);
st->cv.notify_one();
- } else {
+ } else { // ignore
}
};
std::unique_lock<std::mutex> lk(st->mutex);
bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
+ if (!sendok) {
+ printf("send timeout\n");
+ }
if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
return true;
} else {
@@ -161,7 +164,7 @@
}
};
- Synced<AsyncCBs> async_cbs_;
+ Synced<AsyncCBs> per_msg_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/timed_queue.h b/src/timed_queue.h
new file mode 100644
index 0000000..14e318d
--- /dev/null
+++ b/src/timed_queue.h
@@ -0,0 +1,75 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: failed_msg.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�12鏃� 09鏃�36鍒�04绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TIMED_QUEUE_Y2YLRBS3
+#define TIMED_QUEUE_Y2YLRBS3
+
+#include "bh_util.h"
+#include <chrono>
+#include <list>
+#include <string>
+
+template <class Data, class ClockType = std::chrono::steady_clock>
+class TimedQueue
+{
+public:
+ typedef ClockType Clock;
+ typedef typename Clock::time_point TimePoint;
+ typedef typename Clock::duration Duration;
+
+private:
+ struct Record {
+ TimePoint expire_;
+ Data data_;
+ Record(const TimePoint &expire, const Data &data) :
+ expire_(expire), data_(data) {}
+ Record(const TimePoint &expire, Data &&data) :
+ expire_(expire), data_(std::move(data)) {}
+ bool Expired() { return Clock::now() > expire_; }
+ };
+ typedef std::list<Record> Queue;
+ Synced<Queue> queue_;
+
+public:
+ void Push(Data &&data, const TimePoint &expire) { queue_->emplace_back(expire, std::move(data)); }
+ void Push(Data const &data, const TimePoint &expire) { queue_->emplace_back(expire, data); }
+
+ void Push(Data &&data, Duration const &timeout) { Push(std::move(data), Clock::now() + timeout); }
+ void Push(Data const &data, Duration const &timeout) { Push(data, Clock::now() + timeout); }
+
+ template <class Func>
+ void CheckAll(Func const &func)
+ {
+ queue_.Apply([&](Queue &q) {
+ if (q.empty()) {
+ return;
+ }
+ auto it = q.begin();
+ do {
+ if (it->Expired()) {
+ it = q.erase(it);
+ } else if (func(it->data_)) {
+ it = q.erase(it);
+ } else {
+ ++it;
+ }
+ } while (it != q.end());
+ });
+ }
+};
+
+#endif // end of include guard: TIMED_QUEUE_Y2YLRBS3
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 5afec3f..8cd5cc4 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -17,6 +17,7 @@
*/
#include "topic_node.h"
#include "bh_util.h"
+#include "failed_msg.h"
#include <chrono>
#include <list>
@@ -32,44 +33,7 @@
std::string msg_id;
};
-class ServerFailedQ
-{
- struct FailedMsg {
- steady_clock::time_point xpr;
- std::string remote_;
- BHMsgHead head_;
- MsgRequestTopicReply body_;
- FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) :
- xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {}
- bool Expired() { return steady_clock::now() > xpr; }
- };
- typedef std::list<FailedMsg> Queue;
- Synced<Queue> queue_;
-
-public:
- void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body)
- {
- queue_->emplace_back(remote, std::move(head), std::move(body));
- }
- void TrySend(ShmSocket &socket, const int timeout_ms = 0)
- {
- queue_.Apply([&](Queue &q) {
- if (!q.empty()) {
- auto it = q.begin();
- do {
- if (it->Expired()) {
- // it->msg_.Release(socket.shm());
- it = q.erase(it);
- } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) {
- it = q.erase(it);
- } else {
- ++it;
- }
- } while (it != q.end());
- }
- });
- }
-};
+typedef FailedMsgQ ServerFailedQ;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
@@ -158,8 +122,12 @@
for (int i = 0; i < head.route_size() - 1; ++i) {
reply_head.add_route()->Swap(head.mutable_route(i));
}
- if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) {
- failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body));
+ MsgI msg;
+ if (msg.Make(sock.shm(), reply_head, reply_body)) {
+ auto &remote = head.route().rbegin()->mq_id();
+ if (!sock.Send(remote.data(), msg, 10)) {
+ failed_q->Push(remote, msg, 10s);
+ }
}
}
}
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index b1f11ac..77c018a 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -26,7 +26,7 @@
ShmRemover auto_remove(shm_name);
const int mem_size = 1024 * 1024 * 50;
MQId id = boost::uuids::random_generator()();
- const int timeout = 100;
+ const int timeout = 1000;
const uint32_t data_size = 4000;
const std::string proc_id = "demo_proc";
@@ -44,7 +44,6 @@
DEFER1(msg.Release(shm););
for (uint64_t i = 0; i < n; ++i) {
- // mq.Send(id, str.data(), str.size(), timeout);
mq.Send(id, msg, timeout);
}
};
@@ -91,6 +90,7 @@
www.Launch(Writer, i, nmsg);
}
www.WaitAll();
+ printf("writer finished\n");
run.store(false);
rrr.WaitAll();
printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
@@ -136,14 +136,18 @@
req_body.set_topic("topic");
req_body.set_data(msg_content);
auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+ req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
request_rc.MakeRC(shm, req_head, req_body);
+ DEFER1(request_rc.Release(shm));
MsgRequestTopic reply_body;
reply_body.set_topic("topic");
reply_body.set_data(msg_content);
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
+ reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size());
MsgI reply_rc;
reply_rc.MakeRC(shm, reply_head, reply_body);
+ DEFER1(reply_rc.Release(shm));
std::atomic<uint64_t> count(0);
@@ -224,9 +228,5 @@
printf("request ok: %ld\n", count.load());
stop = true;
servers.WaitAll();
- BOOST_CHECK(request_rc.IsCounted());
- BOOST_CHECK_EQUAL(request_rc.Count(), 1);
- request_rc.Release(shm);
- BOOST_CHECK(!request_rc.IsCounted());
// BOOST_CHECK_THROW(reply.Count(), int);
}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index a178fab..e0a9023 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,5 +1,6 @@
#include "center.h"
#include "defs.h"
+#include "failed_msg.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -21,8 +22,28 @@
static const bool value = true;
};
+typedef FailedMsgQ ServerFailedQ;
+
BOOST_AUTO_TEST_CASE(Temp)
{
+ const std::string shm_name("ShmTemp");
+ ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
+ SharedMemory shm(shm_name, 1024 * 1024 * 10);
+
+ typedef std::chrono::steady_clock clock;
+ int n = 1000 * 1000;
+ std::vector<clock::time_point> tps(n);
+ {
+ printf("thread switch %d times, ", n);
+ boost::timer::auto_cpu_timer timer;
+ for (auto &tp : tps) {
+ tp = clock::now();
+ std::this_thread::yield();
+ }
+ }
+ printf("time: %ld ns\n", (tps.back() - tps.front()).count());
+ return;
+ // sub topic partial match.
Topic topics[] = {
"",
".",
@@ -131,7 +152,9 @@
bool r = provider.Publish(topic, data.data(), data.size(), timeout);
if (!r) {
- printf("pub ret: %s\n", r ? "ok" : "fail");
+ static std::atomic<int> an(0);
+ int n = ++an;
+ printf("pub %d ret: %s\n", n, r ? "ok" : "fail");
}
}
};
@@ -142,6 +165,7 @@
topics.push_back("t" + std::to_string(i));
}
Topics part;
+ boost::timer::auto_cpu_timer pubsub_timer;
for (size_t i = 0; i < topics.size(); ++i) {
part.push_back(topics[i]);
threads.Launch(Sub, i, topics);
--
Gitblit v1.8.0