From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 17:32:35 +0800 Subject: [PATCH] server resend failed; rename msgs; refactor. --- src/topic_request.cpp | 106 +++++++++-------------------------------------------- 1 files changed, 18 insertions(+), 88 deletions(-) diff --git a/src/reqrep.cpp b/src/topic_request.cpp similarity index 65% rename from src/reqrep.cpp rename to src/topic_request.cpp index 25c0826..ce7c1a8 100644 --- a/src/reqrep.cpp +++ b/src/topic_request.cpp @@ -1,9 +1,9 @@ /* * ===================================================================================== * - * Filename: reqrep.cpp + * Filename: topic_request.cpp * - * Description: topic request/reply sockets + * Description: topic request sockets * * Version: 1.0 * Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉� @@ -15,7 +15,7 @@ * * ===================================================================================== */ -#include "reqrep.h" +#include "topic_request.h" #include "bh_util.h" #include "msg.h" #include <chrono> @@ -40,10 +40,10 @@ }; RecvBHMsgCB cb; - if (Find(cb)) { + if (Find(cb) && cb) { cb(msg); - } else if (msg.type() == kMsgTypeReply) { - DataReply reply; + } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) { + MsgRequestTopicReply reply; if (reply.ParseFromString(msg.body())) { rrcb(reply.data()); } @@ -74,8 +74,8 @@ auto Call = [&](const void *remote) { const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); auto onRecv = [cb](BHMsg &msg) { - if (msg.type() == kMsgTypeReply) { - DataReply reply; + if (msg.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply reply; if (reply.ParseFromString(msg.body())) { cb(reply.data()); } @@ -103,16 +103,22 @@ if (QueryRPCTopic(topic, addr, timeout_ms)) { const BHMsg &req(MakeRequest(mq().Id(), topic, data, size)); BHMsg reply; - if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { - DataReply dr; + if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply dr; if (dr.ParseFromString(reply.body())) { dr.mutable_data()->swap(out); return true; + } else { + printf("error parse reply.\n"); } + } else { + printf("error recv data. line: %d\n", __LINE__); } } else { + printf("error recv data. line: %d\n", __LINE__); } } catch (...) { + printf("error recv data. line: %d\n", __LINE__); } return false; } @@ -186,8 +192,8 @@ BHMsg result; const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) { - if (result.type() == kMsgTypeProcQueryTopicReply) { - DataProcQueryTopicReply reply; + if (result.type() == kMsgTypeQueryTopicReply) { + MsgQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); if (addr.mq_id().empty()) { @@ -202,79 +208,3 @@ } return false; } - -// reply socket -namespace -{ -struct SrcInfo { - std::vector<BHAddress> route; - std::string msg_id; -}; - -} // namespace - -bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) -{ - //TODO check reply? - return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); -} -bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) -{ - return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); -} -bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) -{ - auto onRecv = [this, rcb](BHMsg &msg) { - if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) { - DataRequest req; - if (req.ParseFromString(msg.body())) { - std::string out; - if (rcb(req.topic(), req.data(), out)) { - BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); - for (int i = 0; i < msg.route_size() - 1; ++i) { - msg.add_route()->Swap(msg.mutable_route(i)); - } - SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100); - } - } - } else { - // ignored, or dropped - } - }; - - return rcb && Start(onRecv, nworker); -} - -bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) -{ - BHMsg msg; - if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) { - DataRequest request; - if (request.ParseFromString(msg.body())) { - request.mutable_topic()->swap(topic); - request.mutable_data()->swap(data); - SrcInfo *p = new SrcInfo; - p->route.assign(msg.route().begin(), msg.route().end()); - p->msg_id = msg.msg_id(); - src_info = p; - return true; - } - } - return false; -} - -bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) -{ - SrcInfo *p = static_cast<SrcInfo *>(src_info); - DEFER1(delete p); - if (!p || p->route.empty()) { - return false; - } - - BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); - for (unsigned i = 0; i < p->route.size() - 1; ++i) { - msg.add_route()->Swap(&p->route[i]); - } - - return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); -} \ No newline at end of file -- Gitblit v1.8.0