| | |
| | | "program": "${workspaceFolder}/debug/bin/utest", |
| | | "args": [ |
| | | "-t", |
| | | "ReqRepTest" |
| | | "SRTest" |
| | | ], |
| | | "stopAtEntry": false, |
| | | "cwd": "${workspaceFolder}", |
| | |
| | | #include "center.h" |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "failed_msg.h" |
| | | #include "shm.h" |
| | | #include <chrono> |
| | | #include <set> |
| | |
| | | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | { |
| | | |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); |
| | | |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | auto center_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); |
| | | if (!r) { |
| | | printf("send reply failed.\n"); |
| | | MsgI msg; |
| | | if (msg.Make(socket.shm(), reply_head, rep_body)) { |
| | | auto &remote = head.route(0).mq_id(); |
| | | bool r = socket.Send(remote.data(), msg, timeout_ms); |
| | | if (!r) { |
| | | failq.Push(remote, msg, 60s); // for later retry. |
| | | } |
| | | } |
| | | //TODO resend failed. |
| | | }; |
| | | }; |
| | | |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | | center_failed_q->TrySend(socket); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | |
| | | } |
| | | }; |
| | | |
| | | auto OnBusIdle = [](ShmSocket &socket) {}; |
| | | auto bus_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | MakeReplyer(socket, head, center->id())(reply); |
| | | replyer(reply); |
| | | } else { |
| | | MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); |
| | | replyer(MakeReply(eSuccess)); |
| | | if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? |
| | | if (clients.empty()) { return; } |
| | | |
| | | for (auto &cli : clients) { |
| | | auto it = clients.begin(); |
| | | do { |
| | | auto &cli = *it; |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | if (!socket.Send(cli.mq_.data(), msg, 100)) { |
| | | printf("center route publish failed. need resend.\n"); |
| | | if (!socket.Send(cli.mq_.data(), msg, 0)) { |
| | | bus_failed_q->Push(cli.mq_, msg, 60s); |
| | | } |
| | | ++it; |
| | | } else { |
| | | it = clients.erase(it); |
| | | } |
| | | } |
| | | } while (it != clients.end()); |
| | | } |
| | | }; |
| | | switch (head.type()) { |
| | |
| | | { |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | sockets_[info.name_]->Start(info.handler_); |
| | | sockets_[info.name_]->Start(info.handler_, info.idle_); |
| | | } |
| | | |
| | | return true; |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: failed_msg.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月12日 16时10分53秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "failed_msg.h" |
| | | |
| | | FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg) |
| | | { |
| | | msg.AddRef(); |
| | | return [remote, msg](void *valid_sock) { |
| | | assert(valid_sock); |
| | | ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); |
| | | bool r = sock.Send(remote.data(), msg, 0); |
| | | if (r && msg.IsCounted()) { |
| | | auto tmp = msg; // Release() is not const, but it's safe to release. |
| | | tmp.Release(sock.shm()); |
| | | } |
| | | return r; |
| | | }; |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: failed_msg.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月12日 11时21分30秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef FAILED_MSG_9YOI86AS |
| | | #define FAILED_MSG_9YOI86AS |
| | | |
| | | #include "msg.h" |
| | | #include "socket.h" |
| | | #include "timed_queue.h" |
| | | #include <string> |
| | | |
| | | class FailedMsgQ |
| | | { |
| | | typedef std::function<bool(void *)> Func; |
| | | typedef TimedQueue<Func> TimedFuncQ; |
| | | |
| | | public: |
| | | typedef bhome_msg::MsgI Msg; |
| | | |
| | | void Push(const std::string &remote, Msg const &msg, TimedFuncQ::TimePoint const &exr) { queue_.Push(PrepareSender(remote, msg), exr); } |
| | | void Push(const std::string &remote, Msg const &msg, TimedFuncQ::Duration const &exr) { queue_.Push(PrepareSender(remote, msg), exr); } |
| | | void TrySend(ShmSocket &socket) |
| | | { |
| | | queue_.CheckAll([&](Func &f) { return f(&socket); }); |
| | | } |
| | | |
| | | private: |
| | | Func PrepareSender(const std::string &remote, Msg const &msg); |
| | | |
| | | TimedFuncQ queue_; |
| | | }; |
| | | |
| | | #endif // end of include guard: FAILED_MSG_9YOI86AS |
| | |
| | | { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (async_cbs_->Find(head.msg_id(), cb)) { |
| | | if (per_msg_cbs_->Find(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | | } // else ignored, or dropped |
| | | } else { // else ignored, or dropped |
| | | } |
| | | }; |
| | | |
| | | auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { |
| | |
| | | if (imsg.ParseHead(head)) { |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | | } |
| | | } else if (onIdle) { |
| | | } |
| | | if (onIdle) { |
| | | onIdle(*this); |
| | | } |
| | | } catch (...) { |
| | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) |
| | | { |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); }; |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | |
| | | reply.swap(msg); |
| | | reply_head.Swap(&head); |
| | | st->cv.notify_one(); |
| | | } else { |
| | | } else { // ignore |
| | | } |
| | | }; |
| | | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | bool sendok = Send(remote, head, body, timeout_ms, OnRecv); |
| | | if (!sendok) { |
| | | printf("send timeout\n"); |
| | | } |
| | | if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { |
| | | return true; |
| | | } else { |
| | |
| | | } |
| | | }; |
| | | |
| | | Synced<AsyncCBs> async_cbs_; |
| | | Synced<AsyncCBs> per_msg_cbs_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: failed_msg.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月12日 09时36分04秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef TIMED_QUEUE_Y2YLRBS3 |
| | | #define TIMED_QUEUE_Y2YLRBS3 |
| | | |
| | | #include "bh_util.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | #include <string> |
| | | |
| | | template <class Data, class ClockType = std::chrono::steady_clock> |
| | | class TimedQueue |
| | | { |
| | | public: |
| | | typedef ClockType Clock; |
| | | typedef typename Clock::time_point TimePoint; |
| | | typedef typename Clock::duration Duration; |
| | | |
| | | private: |
| | | struct Record { |
| | | TimePoint expire_; |
| | | Data data_; |
| | | Record(const TimePoint &expire, const Data &data) : |
| | | expire_(expire), data_(data) {} |
| | | Record(const TimePoint &expire, Data &&data) : |
| | | expire_(expire), data_(std::move(data)) {} |
| | | bool Expired() { return Clock::now() > expire_; } |
| | | }; |
| | | typedef std::list<Record> Queue; |
| | | Synced<Queue> queue_; |
| | | |
| | | public: |
| | | void Push(Data &&data, const TimePoint &expire) { queue_->emplace_back(expire, std::move(data)); } |
| | | void Push(Data const &data, const TimePoint &expire) { queue_->emplace_back(expire, data); } |
| | | |
| | | void Push(Data &&data, Duration const &timeout) { Push(std::move(data), Clock::now() + timeout); } |
| | | void Push(Data const &data, Duration const &timeout) { Push(data, Clock::now() + timeout); } |
| | | |
| | | template <class Func> |
| | | void CheckAll(Func const &func) |
| | | { |
| | | queue_.Apply([&](Queue &q) { |
| | | if (q.empty()) { |
| | | return; |
| | | } |
| | | auto it = q.begin(); |
| | | do { |
| | | if (it->Expired()) { |
| | | it = q.erase(it); |
| | | } else if (func(it->data_)) { |
| | | it = q.erase(it); |
| | | } else { |
| | | ++it; |
| | | } |
| | | } while (it != q.end()); |
| | | }); |
| | | } |
| | | }; |
| | | |
| | | #endif // end of include guard: TIMED_QUEUE_Y2YLRBS3 |
| | |
| | | */ |
| | | #include "topic_node.h" |
| | | #include "bh_util.h" |
| | | #include "failed_msg.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | |
| | |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | class ServerFailedQ |
| | | { |
| | | struct FailedMsg { |
| | | steady_clock::time_point xpr; |
| | | std::string remote_; |
| | | BHMsgHead head_; |
| | | MsgRequestTopicReply body_; |
| | | FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) : |
| | | xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {} |
| | | bool Expired() { return steady_clock::now() > xpr; } |
| | | }; |
| | | typedef std::list<FailedMsg> Queue; |
| | | Synced<Queue> queue_; |
| | | |
| | | public: |
| | | void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body) |
| | | { |
| | | queue_->emplace_back(remote, std::move(head), std::move(body)); |
| | | } |
| | | void TrySend(ShmSocket &socket, const int timeout_ms = 0) |
| | | { |
| | | queue_.Apply([&](Queue &q) { |
| | | if (!q.empty()) { |
| | | auto it = q.begin(); |
| | | do { |
| | | if (it->Expired()) { |
| | | // it->msg_.Release(socket.shm()); |
| | | it = q.erase(it); |
| | | } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) { |
| | | it = q.erase(it); |
| | | } else { |
| | | ++it; |
| | | } |
| | | } while (it != q.end()); |
| | | } |
| | | }); |
| | | } |
| | | }; |
| | | typedef FailedMsgQ ServerFailedQ; |
| | | |
| | | } // namespace |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) { |
| | | failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body)); |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | if (!sock.Send(remote.data(), msg, 10)) { |
| | | failed_q->Push(remote, msg, 10s); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | ShmRemover auto_remove(shm_name); |
| | | const int mem_size = 1024 * 1024 * 50; |
| | | MQId id = boost::uuids::random_generator()(); |
| | | const int timeout = 100; |
| | | const int timeout = 1000; |
| | | const uint32_t data_size = 4000; |
| | | const std::string proc_id = "demo_proc"; |
| | | |
| | |
| | | DEFER1(msg.Release(shm);); |
| | | |
| | | for (uint64_t i = 0; i < n; ++i) { |
| | | // mq.Send(id, str.data(), str.size(), timeout); |
| | | mq.Send(id, msg, timeout); |
| | | } |
| | | }; |
| | |
| | | www.Launch(Writer, i, nmsg); |
| | | } |
| | | www.WaitAll(); |
| | | printf("writer finished\n"); |
| | | run.store(false); |
| | | rrr.WaitAll(); |
| | | printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter); |
| | |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); |
| | | request_rc.MakeRC(shm, req_head, req_body); |
| | | DEFER1(request_rc.Release(shm)); |
| | | |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id)); |
| | | reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size()); |
| | | MsgI reply_rc; |
| | | reply_rc.MakeRC(shm, reply_head, reply_body); |
| | | DEFER1(reply_rc.Release(shm)); |
| | | |
| | | std::atomic<uint64_t> count(0); |
| | | |
| | |
| | | printf("request ok: %ld\n", count.load()); |
| | | stop = true; |
| | | servers.WaitAll(); |
| | | BOOST_CHECK(request_rc.IsCounted()); |
| | | BOOST_CHECK_EQUAL(request_rc.Count(), 1); |
| | | request_rc.Release(shm); |
| | | BOOST_CHECK(!request_rc.IsCounted()); |
| | | // BOOST_CHECK_THROW(reply.Count(), int); |
| | | } |
| | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "failed_msg.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | |
| | | static const bool value = true; |
| | | }; |
| | | |
| | | typedef FailedMsgQ ServerFailedQ; |
| | | |
| | | BOOST_AUTO_TEST_CASE(Temp) |
| | | { |
| | | const std::string shm_name("ShmTemp"); |
| | | ShmRemover auto_remove(shm_name); //remove twice? in case of killed? |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 10); |
| | | |
| | | typedef std::chrono::steady_clock clock; |
| | | int n = 1000 * 1000; |
| | | std::vector<clock::time_point> tps(n); |
| | | { |
| | | printf("thread switch %d times, ", n); |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (auto &tp : tps) { |
| | | tp = clock::now(); |
| | | std::this_thread::yield(); |
| | | } |
| | | } |
| | | printf("time: %ld ns\n", (tps.back() - tps.front()).count()); |
| | | return; |
| | | // sub topic partial match. |
| | | Topic topics[] = { |
| | | "", |
| | | ".", |
| | |
| | | |
| | | bool r = provider.Publish(topic, data.data(), data.size(), timeout); |
| | | if (!r) { |
| | | printf("pub ret: %s\n", r ? "ok" : "fail"); |
| | | static std::atomic<int> an(0); |
| | | int n = ++an; |
| | | printf("pub %d ret: %s\n", n, r ? "ok" : "fail"); |
| | | } |
| | | } |
| | | }; |
| | |
| | | topics.push_back("t" + std::to_string(i)); |
| | | } |
| | | Topics part; |
| | | boost::timer::auto_cpu_timer pubsub_timer; |
| | | for (size_t i = 0; i < topics.size(); ++i) { |
| | | part.push_back(topics[i]); |
| | | threads.Launch(Sub, i, topics); |