From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 15 四月 2021 19:32:16 +0800 Subject: [PATCH] add api; fix send, socknode mem leak. --- src/bh_api.cpp | 142 ++++++++++++++++++++++++++++++++--------------- 1 files changed, 96 insertions(+), 46 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 78b8a59..2abe66d 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -39,11 +39,25 @@ } size_t size() const { return size_; } operator bool() const { return ptr_; } + bool ReleaseTo(void **pdata, int *psize) + { + if (!ptr_) { + return false; + } + if (pdata && psize) { + *psize = size(); + *pdata = release(); + } + return true; + } }; template <class Msg> bool PackOutput(const Msg &msg, void **out, int *out_len) { + if (!out || !out_len) { + return true; // not wanted. + } auto size = msg.ByteSizeLong(); TmpPtr p(size); if (!p) { @@ -51,30 +65,37 @@ return false; } msg.SerializePartialToArray(p.get(), size); - *out = p.release(); - *out_len = size; + p.ReleaseTo(out, out_len); return true; +} + +template <class MsgIn, class MsgOut = MsgCommonReply> +bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), + const void *request, + const int request_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + MsgIn input; + if (!input.ParseFromArray(request, request_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + MsgOut msg_reply; + if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { + return PackOutput(msg_reply, reply, reply_len); + + } else { + return false; + } } } // namespace -bool BHRegister(const void *proc_info, - const int proc_info_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - ProcInfo pi; - if (!pi.ParseFromArray(proc_info, proc_info_len)) { - SetLastError(eInvalidInput, "invalid input."); - return false; - } - MsgCommonReply msg_reply; - if (ProcNode().Register(pi, msg_reply, timeout_ms)) { - return PackOutput(msg_reply, reply, reply_len); - } else { - return false; - } + return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } bool BHHeartBeatEasy(const int timeout_ms) @@ -82,23 +103,19 @@ return ProcNode().Heartbeat(timeout_ms); } -bool BHHeartBeat(const void *proc_info, - const int proc_info_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - ProcInfo pi; - if (!pi.ParseFromArray(proc_info, proc_info_len)) { - SetLastError(eInvalidInput, "invalid input."); - return false; - } - MsgCommonReply msg_reply; - if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { - return PackOutput(msg_reply, reply, reply_len); - } else { - return false; - } + return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); +} + +bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); +} + +bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } bool BHPublish(const void *msgpub, @@ -125,8 +142,35 @@ if (ProcNode().RecvSub(proc, pub, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(pub, msgpub, msgpub_len)) { - *proc_id = pproc.release(); - *proc_id_len = pproc.size(); + pproc.ReleaseTo(proc_id, proc_id_len); + return true; + } else { + SetLastError(ENOMEM, "out of mem"); + } + } + return false; +} + +bool BHAsyncRequest(const void *request, + const int request_len, + void **msg_id, + int *msg_id_len) +{ + MsgRequestTopic req; + if (!req.ParseFromArray(request, request_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + std::string str_msg_id; + MsgRequestTopicReply out_msg; + if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { + if (!msg_id || !msg_id_len) { + return true; + } + TmpPtr ptr(str_msg_id); + if (ptr) { + ptr.ReleaseTo(msg_id, msg_id_len); + return true; } else { SetLastError(ENOMEM, "out of mem"); } @@ -152,8 +196,8 @@ if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { - *proc_id = pproc.release(); - *proc_id_len = pproc.size(); + pproc.ReleaseTo(proc_id, proc_id_len); + return true; } else { SetLastError(ENOMEM, "out of mem"); } @@ -174,9 +218,9 @@ if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, request, request_len)) { - *proc_id = pproc.release(); - *proc_id_len = pproc.size(); + pproc.ReleaseTo(proc_id, proc_id_len); *src = src_info; + return true; } else { SetLastError(ENOMEM, "out of mem"); } @@ -206,10 +250,11 @@ typedef std::function<bool(const void *, const int)> ServerSender; } // namespace -void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) +void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { TopicNode::ServerCB on_req; TopicNode::SubDataCB on_sub; + TopicNode::RequestResultCB on_reply; if (server_cb) { on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { std::string sreq(request.SerializeAsString()); @@ -228,8 +273,16 @@ sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); }; } + if (client_cb) { + on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) { + std::string s(rep.SerializeAsString()); + client_cb(head.proc_id().data(), head.proc_id().size(), + head.msg_id().data(), head.msg_id().size(), + s.data(), s.size()); + }; + } - ProcNode().Start(on_req, on_sub); + ProcNode().Start(on_req, on_sub, on_reply); } bool BHServerCallbackReply(const BHServerCallbackTag *tag, const void *data, @@ -251,10 +304,7 @@ std::string err_msg; GetLastError(ec, err_msg); TmpPtr p(err_msg); - if (p) { - *msg = p.release(); - *msg_len = p.size(); - } + p.ReleaseTo(msg, msg_len); } return ec; } -- Gitblit v1.8.0