From 28f06bc49a4d8d69f1ea2f767863b7921d12f155 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期六, 08 五月 2021 18:30:48 +0800
Subject: [PATCH] add robust FMutex, works fine; use boost circular.
---
src/bh_api.cpp | 143 +++++++++++++++++++++++++++++++++--------------
1 files changed, 99 insertions(+), 44 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 3844000..c9ceb20 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -1,6 +1,7 @@
#include "bh_api.h"
#include "defs.h"
#include "topic_node.h"
+#include <cstdio>
#include <memory>
using namespace bhome_shm;
@@ -8,10 +9,42 @@
namespace
{
+std::string GetProcExe()
+{
+ auto f = fopen("/proc/self/stat", "rb");
+ if (f) {
+ DEFER1(fclose(f));
+ char buf[100] = {0};
+ int n = fread(buf, 1, sizeof(buf), f);
+ if (n > 0) {
+ std::string s(buf, n);
+ auto start = s.find('(');
+ if (start != std::string::npos) {
+ ++start;
+ auto end = s.find(')', start);
+ return s.substr(start, end - start);
+ }
+ }
+ }
+ return std::to_string(getpid());
+}
+std::unique_ptr<TopicNode> &ProcNodePtr()
+{
+ static bool init = GlobalInit(BHomeShm());
+ auto InitLog = []() {
+ auto id = GetProcExe();
+ char path[200] = {0};
+ sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str());
+ ns_log::AddLog(path);
+ return true;
+ };
+ static bool init_log = InitLog();
+ static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm()));
+ return ptr;
+}
TopicNode &ProcNode()
{
- static TopicNode node(BHomeShm());
- return node;
+ return *ProcNodePtr();
}
class TmpPtr : private boost::noncopyable
@@ -70,12 +103,10 @@
}
template <class MsgIn, class MsgOut = MsgCommonReply>
-bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
- const void *request,
- const int request_len,
- void **reply,
- int *reply_len,
- const int timeout_ms)
+bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+ const void *request, const int request_len,
+ void **reply, int *reply_len,
+ const int timeout_ms)
{
MsgIn input;
if (!input.ParseFromArray(request, request_len)) {
@@ -84,6 +115,25 @@
}
MsgOut msg_reply;
return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+ PackOutput(msg_reply, reply, reply_len);
+}
+
+template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
+bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
+ const void *in0, const int in0_len,
+ const void *in1, const int in1_len,
+ void **reply, int *reply_len,
+ const int timeout_ms)
+{
+ MsgIn0 input0;
+ MsgIn1 input1;
+ if (!input0.ParseFromArray(in0, in0_len) ||
+ !input1.ParseFromArray(in1, in1_len)) {
+ SetLastError(eInvalidInput, "invalid input.");
+ return false;
+ }
+ MsgOut msg_reply;
+ return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
PackOutput(msg_reply, reply, reply_len);
}
@@ -101,7 +151,11 @@
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHHeartbeatEasy(const int timeout_ms)
@@ -111,17 +165,27 @@
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+int BHQueryTopicAddress(const void *remote, const int remote_len,
+ const void *topic, const int topic_len,
+ void **reply, int *reply_len,
+ const int timeout_ms)
+{
+ return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
+ &TopicNode::QueryTopicAddress,
+ remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
}
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHPublish(const void *msgpub,
@@ -157,19 +221,23 @@
return false;
}
-int BHAsyncRequest(const void *request,
+int BHAsyncRequest(const void *remote,
+ const int remote_len,
+ const void *request,
const int request_len,
void **msg_id,
int *msg_id_len)
{
+ BHAddress dest;
MsgRequestTopic req;
- if (!req.ParseFromArray(request, request_len)) {
+ if (!dest.ParseFromArray(remote, remote_len) ||
+ !req.ParseFromArray(request, request_len)) {
SetLastError(eInvalidInput, "invalid input.");
return false;
}
std::string str_msg_id;
MsgRequestTopicReply out_msg;
- if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+ if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
if (!msg_id || !msg_id_len) {
return true;
}
@@ -184,7 +252,9 @@
return false;
}
-int BHRequest(const void *request,
+int BHRequest(const void *remote,
+ const int remote_len,
+ const void *request,
const int request_len,
void **proc_id,
int *proc_id_len,
@@ -192,14 +262,16 @@
int *reply_len,
const int timeout_ms)
{
+ BHAddress dest;
MsgRequestTopic req;
- if (!req.ParseFromArray(request, request_len)) {
+ if (!dest.ParseFromArray(remote, remote_len) ||
+ !req.ParseFromArray(request, request_len)) {
SetLastError(eInvalidInput, "invalid input.");
return false;
}
std::string proc;
MsgRequestTopicReply out_msg;
- if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
+ if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
TmpPtr pproc(proc);
if (pproc && PackOutput(out_msg, reply, reply_len)) {
pproc.ReleaseTo(proc_id, proc_id_len);
@@ -246,31 +318,15 @@
return ProcNode().ServerSendReply(src, rep);
}
-int BHCleanUp()
-{
- return 0;
-}
-
-namespace
-{
-typedef std::function<bool(const void *, const int)> ServerSender;
-} // namespace
-
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
- TopicNode::ServerCB on_req;
+ TopicNode::ServerAsyncCB on_req;
TopicNode::SubDataCB on_sub;
TopicNode::RequestResultCB on_reply;
if (server_cb) {
- on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
+ on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
std::string sreq(request.SerializeAsString());
- bool r = false;
- ServerSender sender = [&](const void *p, const int len) {
- r = reply.ParseFromArray(p, len);
- return r;
- };
- server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
- return r;
+ server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
};
}
if (sub_cb) {
@@ -290,19 +346,18 @@
ProcNode().Start(on_req, on_sub, on_reply);
}
-int BHServerCallbackReply(const void *tag,
- const void *data,
- const int data_len)
-{
- auto &sender = *(const ServerSender *) (tag);
- return sender(data, data_len);
-}
void BHFree(void *data, int size)
{
free(data);
}
+int BHCleanup()
+{
+ ProcNodePtr().reset();
+ return 0;
+}
+
int BHGetLastError(void **msg, int *msg_len)
{
int ec = 0;
--
Gitblit v1.8.0