/* * ===================================================================================== * * Filename: topic_reply.cpp * * Description: * * Version: 1.0 * Created: 2021年04月06日 14时40分52秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "topic_reply.h" #include #include using namespace bhome_msg; using namespace std::chrono; using namespace std::chrono_literals; namespace { struct SrcInfo { std::vector route; std::string msg_id; }; class FailedQ { struct FailedMsg { steady_clock::time_point xpr; std::string remote_; BHMsg msg_; FailedMsg(const std::string &addr, BHMsg &&msg) : xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {} bool Expired() { return steady_clock::now() > xpr; } }; typedef std::list Queue; Synced queue_; public: void Push(const std::string &remote, BHMsg &&msg) { queue_->emplace_back(remote, std::move(msg)); } void TrySend(ShmSocket &socket, const int timeout_ms = 0) { queue_.Apply([&](Queue &q) { if (!q.empty()) { auto it = q.begin(); do { if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) { it = q.erase(it); } else { ++it; } } while (it != q.end()); } }); } }; } // namespace bool SocketReply::Register(const ProcInfo &proc_info, const std::vector &topics, const int timeout_ms) { //TODO check reply? return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms); } bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) { return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms); } bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) { auto failed_q = std::make_shared(); auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) { if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) { MsgRequestTopic req; if (req.ParseFromString(msg.body())) { std::string out; if (rcb(req.topic(), req.data(), out)) { BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); for (int i = 0; i < msg.route_size() - 1; ++i) { msg.add_route()->Swap(msg.mutable_route(i)); } if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) { failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply)); } } } } else { // ignored, or dropped } onIdle(*this); }; return rcb && Start(onRecv, onIdle, nworker); } bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) { BHMsg msg; if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) { MsgRequestTopic request; if (request.ParseFromString(msg.body())) { request.mutable_topic()->swap(topic); request.mutable_data()->swap(data); SrcInfo *p = new SrcInfo; p->route.assign(msg.route().begin(), msg.route().end()); p->msg_id = msg.msg_id(); src_info = p; return true; } } return false; } bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) { SrcInfo *p = static_cast(src_info); DEFER1(delete p); if (!p || p->route.empty()) { return false; } BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); for (unsigned i = 0; i < p->route.size() - 1; ++i) { msg.add_route()->Swap(&p->route[i]); } return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); }