| | |
| | | bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) |
| | | { |
| | | auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { |
| | | auto Find = [&](RecvCB &cb) { |
| | | auto Find = [&](RecvBHMsgCB &cb) { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | const std::string &msgid = msg.msg_id(); |
| | | auto pos = async_cbs_.find(msgid); |
| | |
| | | } |
| | | }; |
| | | |
| | | RecvCB cb; |
| | | if (Find(cb) && cb) { |
| | | RecvBHMsgCB cb; |
| | | if (Find(cb)) { |
| | | cb(msg); |
| | | } else if (rrcb && msg.type() == kMsgTypeReply) { |
| | | } else if (msg.type() == kMsgTypeReply) { |
| | | DataReply reply; |
| | | if (reply.ParseFromString(msg.body())) { |
| | | rrcb(reply.data()); |
| | |
| | | return Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | | return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); |
| | | } else { |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | |
| | | } |
| | | } |
| | | |
| | | bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) |
| | | { |
| | | assert(remote && pmsg); |
| | | try { |
| | | const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); |
| | | return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) |
| | | { |
| | | assert(remote && pmsg); |
| | | try { |
| | |
| | | } |
| | | } |
| | | |
| | | bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (tmp_cache_.first == topic) { |
| | | addr = tmp_cache_.second; |
| | | if (topic_cache_.Find(topic, addr)) { |
| | | return true; |
| | | } |
| | | |
| | |
| | | DataProcQueryTopicReply reply; |
| | | if (reply.ParseFromString(result.body())) { |
| | | addr = reply.address(); |
| | | tmp_cache_.first = topic; |
| | | tmp_cache_.second = addr; |
| | | return !addr.mq_id().empty(); |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_cache_.Update(topic, addr); |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | } else { |