From 02ba913dc7bb5d711471b27f2ea23a897d0f2e28 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 15:34:26 +0800
Subject: [PATCH] bind msgi to shm, change offset_ptr to abs offset.

---
 src/bh_api.cpp |  240 +++++++++++++++++++++++++++++++++++------------------------
 1 files changed, 141 insertions(+), 99 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 78b8a59..f0ba26d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -10,6 +10,7 @@
 {
 TopicNode &ProcNode()
 {
+	static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm());
 	static TopicNode node(BHomeShm());
 	return node;
 }
@@ -39,11 +40,25 @@
 	}
 	size_t size() const { return size_; }
 	operator bool() const { return ptr_; }
+	bool ReleaseTo(void **pdata, int *psize)
+	{
+		if (!ptr_) {
+			return false;
+		}
+		if (pdata && psize) {
+			*psize = size();
+			*pdata = release();
+		}
+		return true;
+	}
 };
 
 template <class Msg>
 bool PackOutput(const Msg &msg, void **out, int *out_len)
 {
+	if (!out || !out_len) {
+		return true; // not wanted.
+	}
 	auto size = msg.ByteSizeLong();
 	TmpPtr p(size);
 	if (!p) {
@@ -51,59 +66,68 @@
 		return false;
 	}
 	msg.SerializePartialToArray(p.get(), size);
-	*out = p.release();
-	*out_len = size;
+	p.ReleaseTo(out, out_len);
 	return true;
+}
+
+template <class MsgIn, class MsgOut = MsgCommonReply>
+bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+                  const void *request,
+                  const int request_len,
+                  void **reply,
+                  int *reply_len,
+                  const int timeout_ms)
+{
+	MsgIn input;
+	if (!input.ParseFromArray(request, request_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	MsgOut msg_reply;
+	return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+	       PackOutput(msg_reply, reply, reply_len);
 }
 
 } // namespace
 
-bool BHRegister(const void *proc_info,
-                const int proc_info_len,
-                void **reply,
-                int *reply_len,
-                const int timeout_ms)
+int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
+                      const void *request,
+                      const int request_len,
+                      void **reply,
+                      int *reply_len,
+                      const int timeout_ms)
 {
-	ProcInfo pi;
-	if (!pi.ParseFromArray(proc_info, proc_info_len)) {
-		SetLastError(eInvalidInput, "invalid input.");
-		return false;
-	}
-	MsgCommonReply msg_reply;
-	if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-	} else {
-		return false;
-	}
+	return (*func)(request, request_len, reply, reply_len, timeout_ms);
 }
 
-bool BHHeartBeatEasy(const int timeout_ms)
+int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+
+int BHHeartbeatEasy(const int timeout_ms)
 {
 	return ProcNode().Heartbeat(timeout_ms);
 }
 
-bool BHHeartBeat(const void *proc_info,
-                 const int proc_info_len,
-                 void **reply,
-                 int *reply_len,
-                 const int timeout_ms)
+int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	ProcInfo pi;
-	if (!pi.ParseFromArray(proc_info, proc_info_len)) {
-		SetLastError(eInvalidInput, "invalid input.");
-		return false;
-	}
-	MsgCommonReply msg_reply;
-	if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-	} else {
-		return false;
-	}
+	return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
-bool BHPublish(const void *msgpub,
-               const int msgpub_len,
-               const int timeout_ms)
+int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+int BHPublish(const void *msgpub,
+              const int msgpub_len,
+              const int timeout_ms)
 {
 	MsgPublish pub;
 	if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -113,11 +137,11 @@
 	return ProcNode().Publish(pub, timeout_ms);
 }
 
-bool BHReadSub(void **proc_id,
-               int *proc_id_len,
-               void **msgpub,
-               int *msgpub_len,
-               const int timeout_ms)
+int BHReadSub(void **proc_id,
+              int *proc_id_len,
+              void **msgpub,
+              int *msgpub_len,
+              const int timeout_ms)
 {
 	std::string proc;
 	MsgPublish pub;
@@ -125,8 +149,8 @@
 	if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -134,26 +158,61 @@
 	return false;
 }
 
-bool BHRequest(const void *request,
-               const int request_len,
-               void **proc_id,
-               int *proc_id_len,
-               void **reply,
-               int *reply_len,
-               const int timeout_ms)
+int BHAsyncRequest(const void *remote,
+                   const int remote_len,
+                   const void *request,
+                   const int request_len,
+                   void **msg_id,
+                   int *msg_id_len)
 {
+	BHAddress dest;
 	MsgRequestTopic req;
-	if (!req.ParseFromArray(request, request_len)) {
+	if (!dest.ParseFromArray(remote, remote_len) ||
+	    !req.ParseFromArray(request, request_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	std::string str_msg_id;
+	MsgRequestTopicReply out_msg;
+	if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
+		if (!msg_id || !msg_id_len) {
+			return true;
+		}
+		TmpPtr ptr(str_msg_id);
+		if (ptr) {
+			ptr.ReleaseTo(msg_id, msg_id_len);
+			return true;
+		} else {
+			SetLastError(ENOMEM, "out of mem");
+		}
+	}
+	return false;
+}
+
+int BHRequest(const void *remote,
+              const int remote_len,
+              const void *request,
+              const int request_len,
+              void **proc_id,
+              int *proc_id_len,
+              void **reply,
+              int *reply_len,
+              const int timeout_ms)
+{
+	BHAddress dest;
+	MsgRequestTopic req;
+	if (!dest.ParseFromArray(remote, remote_len) ||
+	    !req.ParseFromArray(request, request_len)) {
 		SetLastError(eInvalidInput, "invalid input.");
 		return false;
 	}
 	std::string proc;
 	MsgRequestTopicReply out_msg;
-	if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
+	if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, reply, reply_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -161,12 +220,12 @@
 	return false;
 }
 
-bool BHReadRequest(void **proc_id,
-                   int *proc_id_len,
-                   void **request,
-                   int *request_len,
-                   void **src,
-                   const int timeout_ms)
+int BHReadRequest(void **proc_id,
+                  int *proc_id_len,
+                  void **request,
+                  int *request_len,
+                  void **src,
+                  const int timeout_ms)
 {
 	void *src_info = 0;
 	std::string proc;
@@ -174,9 +233,9 @@
 	if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, request, request_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
 			*src = src_info;
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -184,9 +243,9 @@
 	return false;
 }
 
-bool BHSendReply(void *src,
-                 const void *reply,
-                 const int reply_len)
+int BHSendReply(void *src,
+                const void *reply,
+                const int reply_len)
 {
 	MsgRequestTopicReply rep;
 	if (!rep.ParseFromArray(reply, reply_len)) {
@@ -196,30 +255,15 @@
 	return ProcNode().ServerSendReply(src, rep);
 }
 
-int BHCleanUp()
+void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
 {
-	return 0;
-}
-
-namespace
-{
-typedef std::function<bool(const void *, const int)> ServerSender;
-} // namespace
-
-void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
-{
-	TopicNode::ServerCB on_req;
+	TopicNode::ServerAsyncCB on_req;
 	TopicNode::SubDataCB on_sub;
+	TopicNode::RequestResultCB on_reply;
 	if (server_cb) {
-		on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
+		on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
 			std::string sreq(request.SerializeAsString());
-			bool r = false;
-			ServerSender sender = [&](const void *p, const int len) {
-				r = reply.ParseFromArray(p, len);
-				return r;
-			};
-			server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
-			return r;
+			server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
 		};
 	}
 	if (sub_cb) {
@@ -228,15 +272,16 @@
 			sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
 		};
 	}
+	if (client_cb) {
+		on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
+			std::string s(rep.SerializeAsString());
+			client_cb(head.proc_id().data(), head.proc_id().size(),
+			          head.msg_id().data(), head.msg_id().size(),
+			          s.data(), s.size());
+		};
+	}
 
-	ProcNode().Start(on_req, on_sub);
-}
-bool BHServerCallbackReply(const BHServerCallbackTag *tag,
-                           const void *data,
-                           const int data_len)
-{
-	auto &sender = *(const ServerSender *) (tag);
-	return sender(data, data_len);
+	ProcNode().Start(on_req, on_sub, on_reply);
 }
 
 void BHFree(void *data, int size)
@@ -251,10 +296,7 @@
 		std::string err_msg;
 		GetLastError(ec, err_msg);
 		TmpPtr p(err_msg);
-		if (p) {
-			*msg = p.release();
-			*msg_len = p.size();
-		}
+		p.ReleaseTo(msg, msg_len);
 	}
 	return ec;
 }

--
Gitblit v1.8.0