From 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 12 四月 2021 18:29:41 +0800 Subject: [PATCH] add fail-resend support. --- utest/speed_test.cpp | 12 +- src/failed_msg.h | 47 +++++++ src/socket.h | 9 + .vscode/launch.json | 2 src/socket.cpp | 8 utest/utest.cpp | 26 ++++ src/failed_msg.cpp | 33 +++++ src/timed_queue.h | 75 ++++++++++++ src/topic_node.cpp | 48 +------ src/center.cpp | 47 ++++--- 10 files changed, 235 insertions(+), 72 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index ef42f7b..12aa21d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,7 +11,7 @@ "program": "${workspaceFolder}/debug/bin/utest", "args": [ "-t", - "ReqRepTest" + "SRTest" ], "stopAtEntry": false, "cwd": "${workspaceFolder}", diff --git a/src/center.cpp b/src/center.cpp index d2aad0a..7865e57 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -18,6 +18,7 @@ #include "center.h" #include "bh_util.h" #include "defs.h" +#include "failed_msg.h" #include "shm.h" #include <chrono> #include <set> @@ -364,28 +365,31 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); - - auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { + auto center_failed_q = std::make_shared<FailedMsgQ>(); + auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); - if (!r) { - printf("send reply failed.\n"); + MsgI msg; + if (msg.Make(socket.shm(), reply_head, rep_body)) { + auto &remote = head.route(0).mq_id(); + bool r = socket.Send(remote.data(), msg, timeout_ms); + if (!r) { + failq.Push(remote, msg, 60s); // for later retry. + } } - //TODO resend failed. }; }; - auto OnCenterIdle = [center_ptr](ShmSocket &socket) { + auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { auto ¢er = *center_ptr; + center_failed_q->TrySend(socket); center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); @@ -396,10 +400,11 @@ } }; - auto OnBusIdle = [](ShmSocket &socket) {}; + auto bus_failed_q = std::make_shared<FailedMsgQ>(); + auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; @@ -407,19 +412,25 @@ if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { - MakeReplyer(socket, head, center->id())(reply); + replyer(reply); } else { - MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); + replyer(MakeReply(eSuccess)); if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? + if (clients.empty()) { return; } - for (auto &cli : clients) { + auto it = clients.begin(); + do { + auto &cli = *it; auto node = cli.weak_node_.lock(); if (node) { - if (!socket.Send(cli.mq_.data(), msg, 100)) { - printf("center route publish failed. need resend.\n"); + if (!socket.Send(cli.mq_.data(), msg, 0)) { + bus_failed_q->Push(cli.mq_, msg, 60s); } + ++it; + } else { + it = clients.erase(it); } - } + } while (it != clients.end()); } }; switch (head.type()) { @@ -484,7 +495,7 @@ { for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_]->Start(info.handler_); + sockets_[info.name_]->Start(info.handler_, info.idle_); } return true; diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp new file mode 100644 index 0000000..ab4658d --- /dev/null +++ b/src/failed_msg.cpp @@ -0,0 +1,33 @@ +/* + * ===================================================================================== + * + * Filename: failed_msg.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�12鏃� 16鏃�10鍒�53绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "failed_msg.h" + +FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg) +{ + msg.AddRef(); + return [remote, msg](void *valid_sock) { + assert(valid_sock); + ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); + bool r = sock.Send(remote.data(), msg, 0); + if (r && msg.IsCounted()) { + auto tmp = msg; // Release() is not const, but it's safe to release. + tmp.Release(sock.shm()); + } + return r; + }; +} \ No newline at end of file diff --git a/src/failed_msg.h b/src/failed_msg.h new file mode 100644 index 0000000..2d57abc --- /dev/null +++ b/src/failed_msg.h @@ -0,0 +1,47 @@ +/* + * ===================================================================================== + * + * Filename: failed_msg.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�12鏃� 11鏃�21鍒�30绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef FAILED_MSG_9YOI86AS +#define FAILED_MSG_9YOI86AS + +#include "msg.h" +#include "socket.h" +#include "timed_queue.h" +#include <string> + +class FailedMsgQ +{ + typedef std::function<bool(void *)> Func; + typedef TimedQueue<Func> TimedFuncQ; + +public: + typedef bhome_msg::MsgI Msg; + + void Push(const std::string &remote, Msg const &msg, TimedFuncQ::TimePoint const &exr) { queue_.Push(PrepareSender(remote, msg), exr); } + void Push(const std::string &remote, Msg const &msg, TimedFuncQ::Duration const &exr) { queue_.Push(PrepareSender(remote, msg), exr); } + void TrySend(ShmSocket &socket) + { + queue_.CheckAll([&](Func &f) { return f(&socket); }); + } + +private: + Func PrepareSender(const std::string &remote, Msg const &msg); + + TimedFuncQ queue_; +}; + +#endif // end of include guard: FAILED_MSG_9YOI86AS diff --git a/src/socket.cpp b/src/socket.cpp index 116175d..2c55665 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -45,11 +45,12 @@ { auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { RecvCB cb; - if (async_cbs_->Find(head.msg_id(), cb)) { + if (per_msg_cbs_->Find(head.msg_id(), cb)) { cb(socket, imsg, head); } else if (onData) { onData(socket, imsg, head); - } // else ignored, or dropped + } else { // else ignored, or dropped + } }; auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { @@ -61,7 +62,8 @@ if (imsg.ParseHead(head)) { onRecvWithPerMsgCB(*this, imsg, head); } - } else if (onIdle) { + } + if (onIdle) { onIdle(*this); } } catch (...) { diff --git a/src/socket.h b/src/socket.h index ee25d81..5973ab6 100644 --- a/src/socket.h +++ b/src/socket.h @@ -77,7 +77,7 @@ template <class Body> bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) { - auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); }; + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); }; MsgI msg; return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); } @@ -109,12 +109,15 @@ reply.swap(msg); reply_head.Swap(&head); st->cv.notify_one(); - } else { + } else { // ignore } }; std::unique_lock<std::mutex> lk(st->mutex); bool sendok = Send(remote, head, body, timeout_ms, OnRecv); + if (!sendok) { + printf("send timeout\n"); + } if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { return true; } else { @@ -161,7 +164,7 @@ } }; - Synced<AsyncCBs> async_cbs_; + Synced<AsyncCBs> per_msg_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/src/timed_queue.h b/src/timed_queue.h new file mode 100644 index 0000000..14e318d --- /dev/null +++ b/src/timed_queue.h @@ -0,0 +1,75 @@ +/* + * ===================================================================================== + * + * Filename: failed_msg.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�12鏃� 09鏃�36鍒�04绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef TIMED_QUEUE_Y2YLRBS3 +#define TIMED_QUEUE_Y2YLRBS3 + +#include "bh_util.h" +#include <chrono> +#include <list> +#include <string> + +template <class Data, class ClockType = std::chrono::steady_clock> +class TimedQueue +{ +public: + typedef ClockType Clock; + typedef typename Clock::time_point TimePoint; + typedef typename Clock::duration Duration; + +private: + struct Record { + TimePoint expire_; + Data data_; + Record(const TimePoint &expire, const Data &data) : + expire_(expire), data_(data) {} + Record(const TimePoint &expire, Data &&data) : + expire_(expire), data_(std::move(data)) {} + bool Expired() { return Clock::now() > expire_; } + }; + typedef std::list<Record> Queue; + Synced<Queue> queue_; + +public: + void Push(Data &&data, const TimePoint &expire) { queue_->emplace_back(expire, std::move(data)); } + void Push(Data const &data, const TimePoint &expire) { queue_->emplace_back(expire, data); } + + void Push(Data &&data, Duration const &timeout) { Push(std::move(data), Clock::now() + timeout); } + void Push(Data const &data, Duration const &timeout) { Push(data, Clock::now() + timeout); } + + template <class Func> + void CheckAll(Func const &func) + { + queue_.Apply([&](Queue &q) { + if (q.empty()) { + return; + } + auto it = q.begin(); + do { + if (it->Expired()) { + it = q.erase(it); + } else if (func(it->data_)) { + it = q.erase(it); + } else { + ++it; + } + } while (it != q.end()); + }); + } +}; + +#endif // end of include guard: TIMED_QUEUE_Y2YLRBS3 diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 5afec3f..8cd5cc4 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -17,6 +17,7 @@ */ #include "topic_node.h" #include "bh_util.h" +#include "failed_msg.h" #include <chrono> #include <list> @@ -32,44 +33,7 @@ std::string msg_id; }; -class ServerFailedQ -{ - struct FailedMsg { - steady_clock::time_point xpr; - std::string remote_; - BHMsgHead head_; - MsgRequestTopicReply body_; - FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) : - xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {} - bool Expired() { return steady_clock::now() > xpr; } - }; - typedef std::list<FailedMsg> Queue; - Synced<Queue> queue_; - -public: - void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body) - { - queue_->emplace_back(remote, std::move(head), std::move(body)); - } - void TrySend(ShmSocket &socket, const int timeout_ms = 0) - { - queue_.Apply([&](Queue &q) { - if (!q.empty()) { - auto it = q.begin(); - do { - if (it->Expired()) { - // it->msg_.Release(socket.shm()); - it = q.erase(it); - } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) { - it = q.erase(it); - } else { - ++it; - } - } while (it != q.end()); - } - }); - } -}; +typedef FailedMsgQ ServerFailedQ; } // namespace TopicNode::TopicNode(SharedMemory &shm) : @@ -158,8 +122,12 @@ for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } - if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) { - failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body)); + MsgI msg; + if (msg.Make(sock.shm(), reply_head, reply_body)) { + auto &remote = head.route().rbegin()->mq_id(); + if (!sock.Send(remote.data(), msg, 10)) { + failed_q->Push(remote, msg, 10s); + } } } } diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index b1f11ac..77c018a 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -26,7 +26,7 @@ ShmRemover auto_remove(shm_name); const int mem_size = 1024 * 1024 * 50; MQId id = boost::uuids::random_generator()(); - const int timeout = 100; + const int timeout = 1000; const uint32_t data_size = 4000; const std::string proc_id = "demo_proc"; @@ -44,7 +44,6 @@ DEFER1(msg.Release(shm);); for (uint64_t i = 0; i < n; ++i) { - // mq.Send(id, str.data(), str.size(), timeout); mq.Send(id, msg, timeout); } }; @@ -91,6 +90,7 @@ www.Launch(Writer, i, nmsg); } www.WaitAll(); + printf("writer finished\n"); run.store(false); rrr.WaitAll(); printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter); @@ -136,14 +136,18 @@ req_body.set_topic("topic"); req_body.set_data(msg_content); auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); + req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); request_rc.MakeRC(shm, req_head, req_body); + DEFER1(request_rc.Release(shm)); MsgRequestTopic reply_body; reply_body.set_topic("topic"); reply_body.set_data(msg_content); auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id)); + reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size()); MsgI reply_rc; reply_rc.MakeRC(shm, reply_head, reply_body); + DEFER1(reply_rc.Release(shm)); std::atomic<uint64_t> count(0); @@ -224,9 +228,5 @@ printf("request ok: %ld\n", count.load()); stop = true; servers.WaitAll(); - BOOST_CHECK(request_rc.IsCounted()); - BOOST_CHECK_EQUAL(request_rc.Count(), 1); - request_rc.Release(shm); - BOOST_CHECK(!request_rc.IsCounted()); // BOOST_CHECK_THROW(reply.Count(), int); } diff --git a/utest/utest.cpp b/utest/utest.cpp index a178fab..e0a9023 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,5 +1,6 @@ #include "center.h" #include "defs.h" +#include "failed_msg.h" #include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> @@ -21,8 +22,28 @@ static const bool value = true; }; +typedef FailedMsgQ ServerFailedQ; + BOOST_AUTO_TEST_CASE(Temp) { + const std::string shm_name("ShmTemp"); + ShmRemover auto_remove(shm_name); //remove twice? in case of killed? + SharedMemory shm(shm_name, 1024 * 1024 * 10); + + typedef std::chrono::steady_clock clock; + int n = 1000 * 1000; + std::vector<clock::time_point> tps(n); + { + printf("thread switch %d times, ", n); + boost::timer::auto_cpu_timer timer; + for (auto &tp : tps) { + tp = clock::now(); + std::this_thread::yield(); + } + } + printf("time: %ld ns\n", (tps.back() - tps.front()).count()); + return; + // sub topic partial match. Topic topics[] = { "", ".", @@ -131,7 +152,9 @@ bool r = provider.Publish(topic, data.data(), data.size(), timeout); if (!r) { - printf("pub ret: %s\n", r ? "ok" : "fail"); + static std::atomic<int> an(0); + int n = ++an; + printf("pub %d ret: %s\n", n, r ? "ok" : "fail"); } } }; @@ -142,6 +165,7 @@ topics.push_back("t" + std::to_string(i)); } Topics part; + boost::timer::auto_cpu_timer pubsub_timer; for (size_t i = 0; i < topics.size(); ++i) { part.push_back(topics[i]); threads.Launch(Sub, i, topics); -- Gitblit v1.8.0