/* * ===================================================================================== * * Filename: reqrep.cpp * * Description: topic request/reply sockets * * Version: 1.0 * Created: 2021年04月01日 09时35分35秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "reqrep.h" #include "bh_util.h" #include "msg.h" #include #include using namespace bhome_msg; bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) { auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { auto Find = [&](RecvCB &cb) { std::lock_guard lock(mutex()); const std::string &msgid = msg.msg_id(); auto pos = async_cbs_.find(msgid); if (pos != async_cbs_.end()) { cb.swap(pos->second); async_cbs_.erase(pos); return true; } else { return false; } }; RecvCB cb; if (Find(cb) && cb) { cb(msg); } else if (rrcb && msg.type() == kMsgTypeReply) { DataReply reply; if (reply.ParseFromString(msg.body())) { rrcb(reply.data()); } } else { // ignored, or dropped } }; return Start(AsyncRecvProc, nworker); } bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); auto onRecv = [cb](BHMsg &msg) { if (msg.type() == kMsgTypeReply) { DataReply reply; if (reply.ParseFromString(msg.body())) { cb(reply.data()); } } }; return AsyncSend(remote, &msg, timeout_ms, onRecv); }; try { BHAddress addr; if (QueryRPCTopic(topic, addr, timeout_ms)) { return Call(addr.mq_id().data()); } else { return false; } } catch (...) { return false; } } bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) { try { BHAddress addr; if (QueryRPCTopic(topic, addr, timeout_ms)) { const BHMsg &req(MakeRequest(mq().Id(), topic, data, size)); BHMsg reply; if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { DataReply dr; if (dr.ParseFromString(reply.body())) { dr.mutable_data()->swap(out); return true; } } } else { } } catch (...) { } return false; } bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) { assert(remote && pmsg); try { const BHMsg &msg = *static_cast(pmsg); auto RegisterCB = [&]() { std::lock_guard lock(mutex()); async_cbs_.emplace(msg.msg_id(), cb); }; return mq().Send(*static_cast(remote), msg, timeout_ms, RegisterCB); } catch (...) { return false; } } bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms) { struct State { std::mutex mutex; std::condition_variable cv; bool canceled = false; }; try { std::shared_ptr st(new State); auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); auto OnRecv = [=](BHMsg &msg) { std::unique_lock lk(st->mutex); if (!st->canceled) { static_cast(result)->Swap(&msg); st->cv.notify_one(); } else { } }; std::unique_lock lk(st->mutex); bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv); if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { return true; } else { st->canceled = true; return false; } } catch (...) { return false; } } bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { if (topic_cache_.Find(topic, addr)) { return true; } BHMsg result; const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) { if (result.type() == kMsgTypeProcQueryTopicReply) { DataProcQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); if (addr.mq_id().empty()) { return false; } else { topic_cache_.Update(topic, addr); return true; } } } } else { } return false; } // reply socket namespace { struct SrcInfo { std::vector route; std::string msg_id; }; } // namespace bool SocketReply::Register(const ProcInfo &proc_info, const std::vector &topics, const int timeout_ms) { //TODO check reply? return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); } bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) { return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); } bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) { auto onRecv = [this, rcb](BHMsg &msg) { if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) { DataRequest req; if (req.ParseFromString(msg.body())) { std::string out; if (rcb(req.topic(), req.data(), out)) { BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); for (int i = 0; i < msg.route_size() - 1; ++i) { msg.add_route()->Swap(msg.mutable_route(i)); } SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100); } } } else { // ignored, or dropped } }; return rcb && Start(onRecv, nworker); } bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) { BHMsg msg; if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) { DataRequest request; if (request.ParseFromString(msg.body())) { request.mutable_topic()->swap(topic); request.mutable_data()->swap(data); SrcInfo *p = new SrcInfo; p->route.assign(msg.route().begin(), msg.route().end()); p->msg_id = msg.msg_id(); src_info = p; return true; } } return false; } bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) { SrcInfo *p = static_cast(src_info); DEFER1(delete p); if (!p || p->route.empty()) { return false; } BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); for (unsigned i = 0; i < p->route.size() - 1; ++i) { msg.add_route()->Swap(&p->route[i]); } return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); }