From 77a6c3512a44dfe6540dde71946e6484fe4f173f Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 10 五月 2021 16:05:28 +0800 Subject: [PATCH] test lock code. --- src/bh_api.cpp | 95 +++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 82 insertions(+), 13 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index f0ba26d..c9ceb20 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -1,6 +1,7 @@ #include "bh_api.h" #include "defs.h" #include "topic_node.h" +#include <cstdio> #include <memory> using namespace bhome_shm; @@ -8,11 +9,42 @@ namespace { +std::string GetProcExe() +{ + auto f = fopen("/proc/self/stat", "rb"); + if (f) { + DEFER1(fclose(f)); + char buf[100] = {0}; + int n = fread(buf, 1, sizeof(buf), f); + if (n > 0) { + std::string s(buf, n); + auto start = s.find('('); + if (start != std::string::npos) { + ++start; + auto end = s.find(')', start); + return s.substr(start, end - start); + } + } + } + return std::to_string(getpid()); +} +std::unique_ptr<TopicNode> &ProcNodePtr() +{ + static bool init = GlobalInit(BHomeShm()); + auto InitLog = []() { + auto id = GetProcExe(); + char path[200] = {0}; + sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str()); + ns_log::AddLog(path); + return true; + }; + static bool init_log = InitLog(); + static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm())); + return ptr; +} TopicNode &ProcNode() { - static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm()); - static TopicNode node(BHomeShm()); - return node; + return *ProcNodePtr(); } class TmpPtr : private boost::noncopyable @@ -71,12 +103,10 @@ } template <class MsgIn, class MsgOut = MsgCommonReply> -bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), - const void *request, - const int request_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), + const void *request, const int request_len, + void **reply, int *reply_len, + const int timeout_ms) { MsgIn input; if (!input.ParseFromArray(request, request_len)) { @@ -85,6 +115,25 @@ } MsgOut msg_reply; return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && + PackOutput(msg_reply, reply, reply_len); +} + +template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply> +bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int), + const void *in0, const int in0_len, + const void *in1, const int in1_len, + void **reply, int *reply_len, + const int timeout_ms) +{ + MsgIn0 input0; + MsgIn1 input1; + if (!input0.ParseFromArray(in0, in0_len) || + !input1.ParseFromArray(in1, in1_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + MsgOut msg_reply; + return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) && PackOutput(msg_reply, reply, reply_len); } @@ -102,7 +151,11 @@ int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); +} +int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHHeartbeatEasy(const int timeout_ms) @@ -112,17 +165,27 @@ int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); +} + +int BHQueryTopicAddress(const void *remote, const int remote_len, + const void *topic, const int topic_len, + void **reply, int *reply_len, + const int timeout_ms) +{ + return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>( + &TopicNode::QueryTopicAddress, + remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); } int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } int BHPublish(const void *msgpub, @@ -289,6 +352,12 @@ free(data); } +int BHCleanup() +{ + ProcNodePtr().reset(); + return 0; +} + int BHGetLastError(void **msg, int *msg_len) { int ec = 0; -- Gitblit v1.8.0