From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 19:32:21 +0800 Subject: [PATCH] refactor center; add async request no cb. --- src/reqrep.cpp | 34 +++++++++++++++++++++++++++++----- 1 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/reqrep.cpp b/src/reqrep.cpp index b8e423b..25c0826 100644 --- a/src/reqrep.cpp +++ b/src/reqrep.cpp @@ -26,7 +26,7 @@ bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) { auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { - auto Find = [&](RecvCB &cb) { + auto Find = [&](RecvBHMsgCB &cb) { std::lock_guard<std::mutex> lock(mutex()); const std::string &msgid = msg.msg_id(); auto pos = async_cbs_.find(msgid); @@ -39,10 +39,10 @@ } }; - RecvCB cb; - if (Find(cb) && cb) { + RecvBHMsgCB cb; + if (Find(cb)) { cb(msg); - } else if (rrcb && msg.type() == kMsgTypeReply) { + } else if (msg.type() == kMsgTypeReply) { DataReply reply; if (reply.ParseFromString(msg.body())) { rrcb(reply.data()); @@ -55,6 +55,20 @@ return Start(AsyncRecvProc, nworker); } +bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +{ + try { + BHAddress addr; + if (QueryRPCTopic(topic, addr, timeout_ms)) { + const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); + return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); + } else { + return false; + } + } catch (...) { + return false; + } +} bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { @@ -103,7 +117,17 @@ return false; } -bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) +{ + assert(remote && pmsg); + try { + const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); + return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); + } catch (...) { + return false; + } +} +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) { assert(remote && pmsg); try { -- Gitblit v1.8.0