From 77a6c3512a44dfe6540dde71946e6484fe4f173f Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 10 五月 2021 16:05:28 +0800
Subject: [PATCH] test lock code.

---
 src/bh_api.cpp |  217 ++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 139 insertions(+), 78 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 2abe66d..c9ceb20 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -1,6 +1,7 @@
 #include "bh_api.h"
 #include "defs.h"
 #include "topic_node.h"
+#include <cstdio>
 #include <memory>
 
 using namespace bhome_shm;
@@ -8,10 +9,42 @@
 
 namespace
 {
+std::string GetProcExe()
+{
+	auto f = fopen("/proc/self/stat", "rb");
+	if (f) {
+		DEFER1(fclose(f));
+		char buf[100] = {0};
+		int n = fread(buf, 1, sizeof(buf), f);
+		if (n > 0) {
+			std::string s(buf, n);
+			auto start = s.find('(');
+			if (start != std::string::npos) {
+				++start;
+				auto end = s.find(')', start);
+				return s.substr(start, end - start);
+			}
+		}
+	}
+	return std::to_string(getpid());
+}
+std::unique_ptr<TopicNode> &ProcNodePtr()
+{
+	static bool init = GlobalInit(BHomeShm());
+	auto InitLog = []() {
+		auto id = GetProcExe();
+		char path[200] = {0};
+		sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str());
+		ns_log::AddLog(path);
+		return true;
+	};
+	static bool init_log = InitLog();
+	static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm()));
+	return ptr;
+}
 TopicNode &ProcNode()
 {
-	static TopicNode node(BHomeShm());
-	return node;
+	return *ProcNodePtr();
 }
 
 class TmpPtr : private boost::noncopyable
@@ -70,12 +103,10 @@
 }
 
 template <class MsgIn, class MsgOut = MsgCommonReply>
-bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
-                  const void *request,
-                  const int request_len,
-                  void **reply,
-                  int *reply_len,
-                  const int timeout_ms)
+bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+                    const void *request, const int request_len,
+                    void **reply, int *reply_len,
+                    const int timeout_ms)
 {
 	MsgIn input;
 	if (!input.ParseFromArray(request, request_len)) {
@@ -83,44 +114,83 @@
 		return false;
 	}
 	MsgOut msg_reply;
-	if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
+	return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+	       PackOutput(msg_reply, reply, reply_len);
+}
 
-	} else {
+template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
+bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
+                    const void *in0, const int in0_len,
+                    const void *in1, const int in1_len,
+                    void **reply, int *reply_len,
+                    const int timeout_ms)
+{
+	MsgIn0 input0;
+	MsgIn1 input1;
+	if (!input0.ParseFromArray(in0, in0_len) ||
+	    !input1.ParseFromArray(in1, in1_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
 		return false;
 	}
+	MsgOut msg_reply;
+	return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
+	       PackOutput(msg_reply, reply, reply_len);
 }
 
 } // namespace
 
-bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
+                      const void *request,
+                      const int request_len,
+                      void **reply,
+                      int *reply_len,
+                      const int timeout_ms)
 {
-	return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+	return (*func)(request, request_len, reply, reply_len, timeout_ms);
 }
 
-bool BHHeartBeatEasy(const int timeout_ms)
+int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+
+int BHHeartbeatEasy(const int timeout_ms)
 {
 	return ProcNode().Heartbeat(timeout_ms);
 }
 
-bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
-bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
-bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHQueryTopicAddress(const void *remote, const int remote_len,
+                        const void *topic, const int topic_len,
+                        void **reply, int *reply_len,
+                        const int timeout_ms)
 {
-	return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
+	return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
+	    &TopicNode::QueryTopicAddress,
+	    remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
 }
 
-bool BHPublish(const void *msgpub,
-               const int msgpub_len,
-               const int timeout_ms)
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+int BHPublish(const void *msgpub,
+              const int msgpub_len,
+              const int timeout_ms)
 {
 	MsgPublish pub;
 	if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -130,11 +200,11 @@
 	return ProcNode().Publish(pub, timeout_ms);
 }
 
-bool BHReadSub(void **proc_id,
-               int *proc_id_len,
-               void **msgpub,
-               int *msgpub_len,
-               const int timeout_ms)
+int BHReadSub(void **proc_id,
+              int *proc_id_len,
+              void **msgpub,
+              int *msgpub_len,
+              const int timeout_ms)
 {
 	std::string proc;
 	MsgPublish pub;
@@ -151,19 +221,23 @@
 	return false;
 }
 
-bool BHAsyncRequest(const void *request,
-                    const int request_len,
-                    void **msg_id,
-                    int *msg_id_len)
+int BHAsyncRequest(const void *remote,
+                   const int remote_len,
+                   const void *request,
+                   const int request_len,
+                   void **msg_id,
+                   int *msg_id_len)
 {
+	BHAddress dest;
 	MsgRequestTopic req;
-	if (!req.ParseFromArray(request, request_len)) {
+	if (!dest.ParseFromArray(remote, remote_len) ||
+	    !req.ParseFromArray(request, request_len)) {
 		SetLastError(eInvalidInput, "invalid input.");
 		return false;
 	}
 	std::string str_msg_id;
 	MsgRequestTopicReply out_msg;
-	if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+	if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
 		if (!msg_id || !msg_id_len) {
 			return true;
 		}
@@ -178,22 +252,26 @@
 	return false;
 }
 
-bool BHRequest(const void *request,
-               const int request_len,
-               void **proc_id,
-               int *proc_id_len,
-               void **reply,
-               int *reply_len,
-               const int timeout_ms)
+int BHRequest(const void *remote,
+              const int remote_len,
+              const void *request,
+              const int request_len,
+              void **proc_id,
+              int *proc_id_len,
+              void **reply,
+              int *reply_len,
+              const int timeout_ms)
 {
+	BHAddress dest;
 	MsgRequestTopic req;
-	if (!req.ParseFromArray(request, request_len)) {
+	if (!dest.ParseFromArray(remote, remote_len) ||
+	    !req.ParseFromArray(request, request_len)) {
 		SetLastError(eInvalidInput, "invalid input.");
 		return false;
 	}
 	std::string proc;
 	MsgRequestTopicReply out_msg;
-	if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
+	if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, reply, reply_len)) {
 			pproc.ReleaseTo(proc_id, proc_id_len);
@@ -205,12 +283,12 @@
 	return false;
 }
 
-bool BHReadRequest(void **proc_id,
-                   int *proc_id_len,
-                   void **request,
-                   int *request_len,
-                   void **src,
-                   const int timeout_ms)
+int BHReadRequest(void **proc_id,
+                  int *proc_id_len,
+                  void **request,
+                  int *request_len,
+                  void **src,
+                  const int timeout_ms)
 {
 	void *src_info = 0;
 	std::string proc;
@@ -228,9 +306,9 @@
 	return false;
 }
 
-bool BHSendReply(void *src,
-                 const void *reply,
-                 const int reply_len)
+int BHSendReply(void *src,
+                const void *reply,
+                const int reply_len)
 {
 	MsgRequestTopicReply rep;
 	if (!rep.ParseFromArray(reply, reply_len)) {
@@ -240,31 +318,15 @@
 	return ProcNode().ServerSendReply(src, rep);
 }
 
-int BHCleanUp()
-{
-	return 0;
-}
-
-namespace
-{
-typedef std::function<bool(const void *, const int)> ServerSender;
-} // namespace
-
 void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
 {
-	TopicNode::ServerCB on_req;
+	TopicNode::ServerAsyncCB on_req;
 	TopicNode::SubDataCB on_sub;
 	TopicNode::RequestResultCB on_reply;
 	if (server_cb) {
-		on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
+		on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
 			std::string sreq(request.SerializeAsString());
-			bool r = false;
-			ServerSender sender = [&](const void *p, const int len) {
-				r = reply.ParseFromArray(p, len);
-				return r;
-			};
-			server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
-			return r;
+			server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
 		};
 	}
 	if (sub_cb) {
@@ -284,19 +346,18 @@
 
 	ProcNode().Start(on_req, on_sub, on_reply);
 }
-bool BHServerCallbackReply(const BHServerCallbackTag *tag,
-                           const void *data,
-                           const int data_len)
-{
-	auto &sender = *(const ServerSender *) (tag);
-	return sender(data, data_len);
-}
 
 void BHFree(void *data, int size)
 {
 	free(data);
 }
 
+int BHCleanup()
+{
+	ProcNodePtr().reset();
+	return 0;
+}
+
 int BHGetLastError(void **msg, int *msg_len)
 {
 	int ec = 0;

--
Gitblit v1.8.0