| | |
| | | bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) |
| | | { |
| | | auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { |
| | | auto Find = [&](RecvCB &cb) { |
| | | auto Find = [&](RecvBHMsgCB &cb) { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | const std::string &msgid = msg.msg_id(); |
| | | auto pos = async_cbs_.find(msgid); |
| | |
| | | } |
| | | }; |
| | | |
| | | RecvCB cb; |
| | | if (Find(cb) && cb) { |
| | | RecvBHMsgCB cb; |
| | | if (Find(cb)) { |
| | | cb(msg); |
| | | } else if (rrcb && msg.type() == kMsgTypeReply) { |
| | | } else if (msg.type() == kMsgTypeReply) { |
| | | DataReply reply; |
| | | if (reply.ParseFromString(msg.body())) { |
| | | rrcb(reply.data()); |
| | |
| | | return Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | | return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); |
| | | } else { |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) |
| | | { |
| | | assert(remote && pmsg); |
| | | try { |
| | | const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); |
| | | return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) |
| | | { |
| | | assert(remote && pmsg); |
| | | try { |