lichao
2021-04-01 d26327b3cde043a9470dcd7fea8e704ea517fdae
src/reqrep.cpp
@@ -16,6 +16,7 @@
 * =====================================================================================
 */
#include "reqrep.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
#include <condition_variable>
@@ -73,30 +74,33 @@
      BHAddress addr;
      if (QueryRPCTopic(topic, addr, timeout_ms)) {
         return Call(addr.mq_id().data());
      } else {
         return false;
      }
   } catch (...) {
      return false;
   }
}
bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
{
   try {
      BHAddress addr;
      if (QueryRPCTopic(topic, addr, timeout_ms)) {
         const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
         const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
         BHMsg reply;
         if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
         if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
            DataReply dr;
            if (dr.ParseFromString(msg.body())) {
            if (dr.ParseFromString(reply.body())) {
               dr.mutable_data()->swap(out);
               return true;
            }
         }
      } else {
      }
   } catch (...) {
      return false;
   }
   return false;
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
@@ -132,11 +136,13 @@
         if (!st->canceled) {
            static_cast<BHMsg *>(result)->Swap(&msg);
            st->cv.notify_one();
         } // else result is no longer valid.
         } else {
         }
      };
      std::unique_lock<std::mutex> lk(st->mutex);
      if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
      bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
      if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
         return true;
      } else {
         st->canceled = true;
@@ -149,16 +155,100 @@
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
   if (tmp_cache_.first == topic) {
      addr = tmp_cache_.second;
      return true;
   }
   BHMsg result;
   const BHMsg &msg = MakeQueryTopic(topic);
   if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
      if (result.type() == kMsgTypeQueryTopicReply) {
         DataQueryTopicReply reply;
   const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
   if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
      if (result.type() == kMsgTypeProcQueryTopicReply) {
         DataProcQueryTopicReply reply;
         if (reply.ParseFromString(result.body())) {
            addr = reply.address();
            tmp_cache_.first = topic;
            tmp_cache_.second = addr;
            return !addr.mq_id().empty();
         }
      }
   } else {
   }
   return false;
}
// reply socket
namespace
{
struct SrcInfo {
   std::vector<BHAddress> route;
   std::string msg_id;
};
} // namespace
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
   //TODO check reply?
   return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
   return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
   auto onRecv = [this, rcb](BHMsg &msg) {
      if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
         DataRequest req;
         if (req.ParseFromString(msg.body())) {
            std::string out;
            if (rcb(req.topic(), req.data(), out)) {
               BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
               for (int i = 0; i < msg.route_size() - 1; ++i) {
                  msg.add_route()->Swap(msg.mutable_route(i));
               }
               SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
            }
         }
      } else {
         // ignored, or dropped
      }
   };
   return rcb && Start(onRecv, nworker);
}
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
   BHMsg msg;
   if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
      DataRequest request;
      if (request.ParseFromString(msg.body())) {
         request.mutable_topic()->swap(topic);
         request.mutable_data()->swap(data);
         SrcInfo *p = new SrcInfo;
         p->route.assign(msg.route().begin(), msg.route().end());
         p->msg_id = msg.msg_id();
         src_info = p;
         return true;
      }
   }
   return false;
}
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
{
   SrcInfo *p = static_cast<SrcInfo *>(src_info);
   DEFER1(delete p);
   if (!p || p->route.empty()) {
      return false;
   }
   BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
   for (unsigned i = 0; i < p->route.size() - 1; ++i) {
      msg.add_route()->Swap(&p->route[i]);
   }
   return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
}