| | |
| | | } |
| | | size_t size() const { return size_; } |
| | | operator bool() const { return ptr_; } |
| | | bool ReleaseTo(void **pdata, int *psize) |
| | | { |
| | | if (!ptr_) { |
| | | return false; |
| | | } |
| | | if (pdata && psize) { |
| | | *psize = size(); |
| | | *pdata = release(); |
| | | } |
| | | return true; |
| | | } |
| | | }; |
| | | |
| | | template <class Msg> |
| | | bool PackOutput(const Msg &msg, void **out, int *out_len) |
| | | { |
| | | if (!out || !out_len) { |
| | | return true; // not wanted. |
| | | } |
| | | auto size = msg.ByteSizeLong(); |
| | | TmpPtr p(size); |
| | | if (!p) { |
| | |
| | | return false; |
| | | } |
| | | msg.SerializePartialToArray(p.get(), size); |
| | | *out = p.release(); |
| | | *out_len = size; |
| | | p.ReleaseTo(out, out_len); |
| | | return true; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | | template <class MsgIn, class MsgOut = MsgCommonReply> |
| | | bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), |
| | | const void *request, |
| | | const int request_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | ProcInfo pi; |
| | | if (!pi.ParseFromArray(proc_info, proc_info_len)) { |
| | | MsgIn input; |
| | | if (!input.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgCommonReply msg_reply; |
| | | if (ProcNode().Register(pi, msg_reply, timeout_ms)) { |
| | | MsgOut msg_reply; |
| | | if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms) |
| | |
| | | return ProcNode().Heartbeat(timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeat(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | ProcInfo pi; |
| | | if (!pi.ParseFromArray(proc_info, proc_info_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | MsgCommonReply msg_reply; |
| | | if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | } else { |
| | | return false; |
| | | |
| | | bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | |
| | | if (ProcNode().RecvSub(proc, pub, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(pub, msgpub, msgpub_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len) |
| | | { |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | std::string str_msg_id; |
| | | MsgRequestTopicReply out_msg; |
| | | if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { |
| | | if (!msg_id || !msg_id_len) { |
| | | return true; |
| | | } |
| | | TmpPtr ptr(str_msg_id); |
| | | if (ptr) { |
| | | ptr.ReleaseTo(msg_id, msg_id_len); |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | |
| | | if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, reply, reply_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | |
| | | if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, request, request_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | | *src = src_info; |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | |
| | | typedef std::function<bool(const void *, const int)> ServerSender; |
| | | } // namespace |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) |
| | | { |
| | | TopicNode::ServerCB on_req; |
| | | TopicNode::SubDataCB on_sub; |
| | | TopicNode::RequestResultCB on_reply; |
| | | if (server_cb) { |
| | | on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | std::string sreq(request.SerializeAsString()); |
| | |
| | | sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); |
| | | }; |
| | | } |
| | | if (client_cb) { |
| | | on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) { |
| | | std::string s(rep.SerializeAsString()); |
| | | client_cb(head.proc_id().data(), head.proc_id().size(), |
| | | head.msg_id().data(), head.msg_id().size(), |
| | | s.data(), s.size()); |
| | | }; |
| | | } |
| | | |
| | | ProcNode().Start(on_req, on_sub); |
| | | ProcNode().Start(on_req, on_sub, on_reply); |
| | | } |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | |
| | | std::string err_msg; |
| | | GetLastError(ec, err_msg); |
| | | TmpPtr p(err_msg); |
| | | if (p) { |
| | | *msg = p.release(); |
| | | *msg_len = p.size(); |
| | | } |
| | | p.ReleaseTo(msg, msg_len); |
| | | } |
| | | return ec; |
| | | } |