From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 19:32:21 +0800 Subject: [PATCH] refactor center; add async request no cb. --- src/reqrep.cpp | 52 +++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/reqrep.cpp b/src/reqrep.cpp index bed6496..25c0826 100644 --- a/src/reqrep.cpp +++ b/src/reqrep.cpp @@ -26,7 +26,7 @@ bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) { auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { - auto Find = [&](RecvCB &cb) { + auto Find = [&](RecvBHMsgCB &cb) { std::lock_guard<std::mutex> lock(mutex()); const std::string &msgid = msg.msg_id(); auto pos = async_cbs_.find(msgid); @@ -39,10 +39,10 @@ } }; - RecvCB cb; - if (Find(cb) && cb) { + RecvBHMsgCB cb; + if (Find(cb)) { cb(msg); - } else if (rrcb && msg.type() == kMsgTypeReply) { + } else if (msg.type() == kMsgTypeReply) { DataReply reply; if (reply.ParseFromString(msg.body())) { rrcb(reply.data()); @@ -55,7 +55,21 @@ return Start(AsyncRecvProc, nworker); } -bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) +bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +{ + try { + BHAddress addr; + if (QueryRPCTopic(topic, addr, timeout_ms)) { + const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); + return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); + } else { + return false; + } + } catch (...) { + return false; + } +} +bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); @@ -82,7 +96,7 @@ } } -bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) +bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) { try { BHAddress addr; @@ -103,7 +117,17 @@ return false; } -bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) +{ + assert(remote && pmsg); + try { + const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); + return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); + } catch (...) { + return false; + } +} +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) { assert(remote && pmsg); try { @@ -153,10 +177,9 @@ } } -bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) +bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { - if (tmp_cache_.first == topic) { - addr = tmp_cache_.second; + if (topic_cache_.Find(topic, addr)) { return true; } @@ -167,9 +190,12 @@ DataProcQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); - tmp_cache_.first = topic; - tmp_cache_.second = addr; - return !addr.mq_id().empty(); + if (addr.mq_id().empty()) { + return false; + } else { + topic_cache_.Update(topic, addr); + return true; + } } } } else { -- Gitblit v1.8.0