lichao
2021-04-02 83085f2ce99cca05d40a19482151873a55e6393a
src/reqrep.cpp
@@ -26,7 +26,7 @@
bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
{
   auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
      auto Find = [&](RecvCB &cb) {
      auto Find = [&](RecvBHMsgCB &cb) {
         std::lock_guard<std::mutex> lock(mutex());
         const std::string &msgid = msg.msg_id();
         auto pos = async_cbs_.find(msgid);
@@ -39,10 +39,10 @@
         }
      };
      RecvCB cb;
      if (Find(cb) && cb) {
      RecvBHMsgCB cb;
      if (Find(cb)) {
         cb(msg);
      } else if (rrcb && msg.type() == kMsgTypeReply) {
      } else if (msg.type() == kMsgTypeReply) {
         DataReply reply;
         if (reply.ParseFromString(msg.body())) {
            rrcb(reply.data());
@@ -55,6 +55,20 @@
   return Start(AsyncRecvProc, nworker);
}
bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
   try {
      BHAddress addr;
      if (QueryRPCTopic(topic, addr, timeout_ms)) {
         const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
         return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
      } else {
         return false;
      }
   } catch (...) {
      return false;
   }
}
bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
   auto Call = [&](const void *remote) {
@@ -103,7 +117,17 @@
   return false;
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
{
   assert(remote && pmsg);
   try {
      const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
      return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
   } catch (...) {
      return false;
   }
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
{
   assert(remote && pmsg);
   try {