From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 15 四月 2021 19:32:16 +0800
Subject: [PATCH] add api; fix send, socknode mem leak.

---
 src/bh_api.cpp |  142 ++++++++++++++++++++++++++++++++---------------
 1 files changed, 96 insertions(+), 46 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 78b8a59..2abe66d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -39,11 +39,25 @@
 	}
 	size_t size() const { return size_; }
 	operator bool() const { return ptr_; }
+	bool ReleaseTo(void **pdata, int *psize)
+	{
+		if (!ptr_) {
+			return false;
+		}
+		if (pdata && psize) {
+			*psize = size();
+			*pdata = release();
+		}
+		return true;
+	}
 };
 
 template <class Msg>
 bool PackOutput(const Msg &msg, void **out, int *out_len)
 {
+	if (!out || !out_len) {
+		return true; // not wanted.
+	}
 	auto size = msg.ByteSizeLong();
 	TmpPtr p(size);
 	if (!p) {
@@ -51,30 +65,37 @@
 		return false;
 	}
 	msg.SerializePartialToArray(p.get(), size);
-	*out = p.release();
-	*out_len = size;
+	p.ReleaseTo(out, out_len);
 	return true;
+}
+
+template <class MsgIn, class MsgOut = MsgCommonReply>
+bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+                  const void *request,
+                  const int request_len,
+                  void **reply,
+                  int *reply_len,
+                  const int timeout_ms)
+{
+	MsgIn input;
+	if (!input.ParseFromArray(request, request_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	MsgOut msg_reply;
+	if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
+		return PackOutput(msg_reply, reply, reply_len);
+
+	} else {
+		return false;
+	}
 }
 
 } // namespace
 
-bool BHRegister(const void *proc_info,
-                const int proc_info_len,
-                void **reply,
-                int *reply_len,
-                const int timeout_ms)
+bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	ProcInfo pi;
-	if (!pi.ParseFromArray(proc_info, proc_info_len)) {
-		SetLastError(eInvalidInput, "invalid input.");
-		return false;
-	}
-	MsgCommonReply msg_reply;
-	if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-	} else {
-		return false;
-	}
+	return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
 bool BHHeartBeatEasy(const int timeout_ms)
@@ -82,23 +103,19 @@
 	return ProcNode().Heartbeat(timeout_ms);
 }
 
-bool BHHeartBeat(const void *proc_info,
-                 const int proc_info_len,
-                 void **reply,
-                 int *reply_len,
-                 const int timeout_ms)
+bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	ProcInfo pi;
-	if (!pi.ParseFromArray(proc_info, proc_info_len)) {
-		SetLastError(eInvalidInput, "invalid input.");
-		return false;
-	}
-	MsgCommonReply msg_reply;
-	if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-	} else {
-		return false;
-	}
+	return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+
+bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
 bool BHPublish(const void *msgpub,
@@ -125,8 +142,35 @@
 	if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
+			return true;
+		} else {
+			SetLastError(ENOMEM, "out of mem");
+		}
+	}
+	return false;
+}
+
+bool BHAsyncRequest(const void *request,
+                    const int request_len,
+                    void **msg_id,
+                    int *msg_id_len)
+{
+	MsgRequestTopic req;
+	if (!req.ParseFromArray(request, request_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	std::string str_msg_id;
+	MsgRequestTopicReply out_msg;
+	if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+		if (!msg_id || !msg_id_len) {
+			return true;
+		}
+		TmpPtr ptr(str_msg_id);
+		if (ptr) {
+			ptr.ReleaseTo(msg_id, msg_id_len);
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -152,8 +196,8 @@
 	if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, reply, reply_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -174,9 +218,9 @@
 	if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, request, request_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
 			*src = src_info;
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -206,10 +250,11 @@
 typedef std::function<bool(const void *, const int)> ServerSender;
 } // namespace
 
-void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
+void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
 {
 	TopicNode::ServerCB on_req;
 	TopicNode::SubDataCB on_sub;
+	TopicNode::RequestResultCB on_reply;
 	if (server_cb) {
 		on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
 			std::string sreq(request.SerializeAsString());
@@ -228,8 +273,16 @@
 			sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
 		};
 	}
+	if (client_cb) {
+		on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
+			std::string s(rep.SerializeAsString());
+			client_cb(head.proc_id().data(), head.proc_id().size(),
+			          head.msg_id().data(), head.msg_id().size(),
+			          s.data(), s.size());
+		};
+	}
 
-	ProcNode().Start(on_req, on_sub);
+	ProcNode().Start(on_req, on_sub, on_reply);
 }
 bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                            const void *data,
@@ -251,10 +304,7 @@
 		std::string err_msg;
 		GetLastError(ec, err_msg);
 		TmpPtr p(err_msg);
-		if (p) {
-			*msg = p.release();
-			*msg_len = p.size();
-		}
+		p.ReleaseTo(msg, msg_len);
 	}
 	return ec;
 }

--
Gitblit v1.8.0