From d33a69463f1a75134d01191be0b9e1bdd757dd4b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 15:27:59 +0800 Subject: [PATCH] add atomic queue, no lock, unorder. --- src/bh_api.cpp | 189 +++++++++++++++++++++++++++------------------- 1 files changed, 111 insertions(+), 78 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 2abe66d..c29afd9 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -8,10 +8,15 @@ namespace { +std::unique_ptr<TopicNode> &ProcNodePtr() +{ + static bool init = GlobalInit(BHomeShm()); + static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm())); + return ptr; +} TopicNode &ProcNode() { - static TopicNode node(BHomeShm()); - return node; + return *ProcNodePtr(); } class TmpPtr : private boost::noncopyable @@ -70,12 +75,10 @@ } template <class MsgIn, class MsgOut = MsgCommonReply> -bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), - const void *request, - const int request_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), + const void *request, const int request_len, + void **reply, int *reply_len, + const int timeout_ms) { MsgIn input; if (!input.ParseFromArray(request, request_len)) { @@ -83,44 +86,83 @@ return false; } MsgOut msg_reply; - if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { - return PackOutput(msg_reply, reply, reply_len); + return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && + PackOutput(msg_reply, reply, reply_len); +} - } else { +template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply> +bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int), + const void *in0, const int in0_len, + const void *in1, const int in1_len, + void **reply, int *reply_len, + const int timeout_ms) +{ + MsgIn0 input0; + MsgIn1 input1; + if (!input0.ParseFromArray(in0, in0_len) || + !input1.ParseFromArray(in1, in1_len)) { + SetLastError(eInvalidInput, "invalid input."); return false; } + MsgOut msg_reply; + return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) && + PackOutput(msg_reply, reply, reply_len); } } // namespace -bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, + const void *request, + const int request_len, + void **reply, + int *reply_len, + const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return (*func)(request, request_len, reply, reply_len, timeout_ms); } -bool BHHeartBeatEasy(const int timeout_ms) +int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); +} +int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); +} + +int BHHeartbeatEasy(const int timeout_ms) { return ProcNode().Heartbeat(timeout_ms); } -bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); } -bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); } -bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +int BHQueryTopicAddress(const void *remote, const int remote_len, + const void *topic, const int topic_len, + void **reply, int *reply_len, + const int timeout_ms) { - return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>( + &TopicNode::QueryTopicAddress, + remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); } -bool BHPublish(const void *msgpub, - const int msgpub_len, - const int timeout_ms) +int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); +} + +int BHPublish(const void *msgpub, + const int msgpub_len, + const int timeout_ms) { MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)) { @@ -130,11 +172,11 @@ return ProcNode().Publish(pub, timeout_ms); } -bool BHReadSub(void **proc_id, - int *proc_id_len, - void **msgpub, - int *msgpub_len, - const int timeout_ms) +int BHReadSub(void **proc_id, + int *proc_id_len, + void **msgpub, + int *msgpub_len, + const int timeout_ms) { std::string proc; MsgPublish pub; @@ -151,19 +193,23 @@ return false; } -bool BHAsyncRequest(const void *request, - const int request_len, - void **msg_id, - int *msg_id_len) +int BHAsyncRequest(const void *remote, + const int remote_len, + const void *request, + const int request_len, + void **msg_id, + int *msg_id_len) { + BHAddress dest; MsgRequestTopic req; - if (!req.ParseFromArray(request, request_len)) { + if (!dest.ParseFromArray(remote, remote_len) || + !req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string str_msg_id; MsgRequestTopicReply out_msg; - if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { + if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) { if (!msg_id || !msg_id_len) { return true; } @@ -178,22 +224,26 @@ return false; } -bool BHRequest(const void *request, - const int request_len, - void **proc_id, - int *proc_id_len, - void **reply, - int *reply_len, - const int timeout_ms) +int BHRequest(const void *remote, + const int remote_len, + const void *request, + const int request_len, + void **proc_id, + int *proc_id_len, + void **reply, + int *reply_len, + const int timeout_ms) { + BHAddress dest; MsgRequestTopic req; - if (!req.ParseFromArray(request, request_len)) { + if (!dest.ParseFromArray(remote, remote_len) || + !req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string proc; MsgRequestTopicReply out_msg; - if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { + if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { pproc.ReleaseTo(proc_id, proc_id_len); @@ -205,12 +255,12 @@ return false; } -bool BHReadRequest(void **proc_id, - int *proc_id_len, - void **request, - int *request_len, - void **src, - const int timeout_ms) +int BHReadRequest(void **proc_id, + int *proc_id_len, + void **request, + int *request_len, + void **src, + const int timeout_ms) { void *src_info = 0; std::string proc; @@ -228,9 +278,9 @@ return false; } -bool BHSendReply(void *src, - const void *reply, - const int reply_len) +int BHSendReply(void *src, + const void *reply, + const int reply_len) { MsgRequestTopicReply rep; if (!rep.ParseFromArray(reply, reply_len)) { @@ -240,31 +290,15 @@ return ProcNode().ServerSendReply(src, rep); } -int BHCleanUp() -{ - return 0; -} - -namespace -{ -typedef std::function<bool(const void *, const int)> ServerSender; -} // namespace - void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { - TopicNode::ServerCB on_req; + TopicNode::ServerAsyncCB on_req; TopicNode::SubDataCB on_sub; TopicNode::RequestResultCB on_reply; if (server_cb) { - on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { + on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) { std::string sreq(request.SerializeAsString()); - bool r = false; - ServerSender sender = [&](const void *p, const int len) { - r = reply.ParseFromArray(p, len); - return r; - }; - server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); - return r; + server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src); }; } if (sub_cb) { @@ -284,19 +318,18 @@ ProcNode().Start(on_req, on_sub, on_reply); } -bool BHServerCallbackReply(const BHServerCallbackTag *tag, - const void *data, - const int data_len) -{ - auto &sender = *(const ServerSender *) (tag); - return sender(data, data_len); -} void BHFree(void *data, int size) { free(data); } +int BHCleanup() +{ + ProcNodePtr().reset(); + return 0; +} + int BHGetLastError(void **msg, int *msg_len) { int ec = 0; -- Gitblit v1.8.0