lichao
2021-04-06 bb9a7e348892eb5c4fccb063380aa6fcd9612b71
src/topic_request.cpp
File was renamed from src/reqrep.cpp
@@ -1,9 +1,9 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep.cpp
 *       Filename:  topic_request.cpp
 *
 *    Description:  topic request/reply sockets
 *    Description:  topic request sockets
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时35分35秒
@@ -15,7 +15,7 @@
 *
 * =====================================================================================
 */
#include "reqrep.h"
#include "topic_request.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
@@ -40,10 +40,10 @@
      };
      RecvBHMsgCB cb;
      if (Find(cb)) {
      if (Find(cb) && cb) {
         cb(msg);
      } else if (msg.type() == kMsgTypeReply) {
         DataReply reply;
      } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
         MsgRequestTopicReply reply;
         if (reply.ParseFromString(msg.body())) {
            rrcb(reply.data());
         }
@@ -74,8 +74,8 @@
   auto Call = [&](const void *remote) {
      const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
      auto onRecv = [cb](BHMsg &msg) {
         if (msg.type() == kMsgTypeReply) {
            DataReply reply;
         if (msg.type() == kMsgTypeRequestTopicReply) {
            MsgRequestTopicReply reply;
            if (reply.ParseFromString(msg.body())) {
               cb(reply.data());
            }
@@ -103,16 +103,22 @@
      if (QueryRPCTopic(topic, addr, timeout_ms)) {
         const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
         BHMsg reply;
         if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
            DataReply dr;
         if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
            MsgRequestTopicReply dr;
            if (dr.ParseFromString(reply.body())) {
               dr.mutable_data()->swap(out);
               return true;
            } else {
               printf("error parse reply.\n");
            }
         } else {
            printf("error recv data. line: %d\n", __LINE__);
         }
      } else {
         printf("error recv data. line: %d\n", __LINE__);
      }
   } catch (...) {
      printf("error recv data. line: %d\n", __LINE__);
   }
   return false;
}
@@ -186,8 +192,8 @@
   BHMsg result;
   const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
   if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
      if (result.type() == kMsgTypeProcQueryTopicReply) {
         DataProcQueryTopicReply reply;
      if (result.type() == kMsgTypeQueryTopicReply) {
         MsgQueryTopicReply reply;
         if (reply.ParseFromString(result.body())) {
            addr = reply.address();
            if (addr.mq_id().empty()) {
@@ -202,79 +208,3 @@
   }
   return false;
}
// reply socket
namespace
{
struct SrcInfo {
   std::vector<BHAddress> route;
   std::string msg_id;
};
} // namespace
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
   //TODO check reply?
   return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
   return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
   auto onRecv = [this, rcb](BHMsg &msg) {
      if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
         DataRequest req;
         if (req.ParseFromString(msg.body())) {
            std::string out;
            if (rcb(req.topic(), req.data(), out)) {
               BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
               for (int i = 0; i < msg.route_size() - 1; ++i) {
                  msg.add_route()->Swap(msg.mutable_route(i));
               }
               SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
            }
         }
      } else {
         // ignored, or dropped
      }
   };
   return rcb && Start(onRecv, nworker);
}
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
   BHMsg msg;
   if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
      DataRequest request;
      if (request.ParseFromString(msg.body())) {
         request.mutable_topic()->swap(topic);
         request.mutable_data()->swap(data);
         SrcInfo *p = new SrcInfo;
         p->route.assign(msg.route().begin(), msg.route().end());
         p->msg_id = msg.msg_id();
         src_info = p;
         return true;
      }
   }
   return false;
}
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
{
   SrcInfo *p = static_cast<SrcInfo *>(src_info);
   DEFER1(delete p);
   if (!p || p->route.empty()) {
      return false;
   }
   BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
   for (unsigned i = 0; i < p->route.size() - 1; ++i) {
      msg.add_route()->Swap(&p->route[i]);
   }
   return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
}