From 708ff9e8af731e2799767ed8bfca7df3b74fc26a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 16 四月 2021 19:20:21 +0800
Subject: [PATCH] sendq use less shm, copy data.

---
 src/socket.h       |   25 +++----
 utest/api_test.cpp |   19 +++++-
 src/msg.h          |   33 ++++++++++
 src/sendq.cpp      |   40 +++++++++----
 src/sendq.h        |   40 ++++++-------
 5 files changed, 104 insertions(+), 53 deletions(-)

diff --git a/src/msg.h b/src/msg.h
index 10ad0d2..c239956 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -82,6 +82,15 @@
 		    uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
 	}
 
+	void *Pack(SharedMemory &shm, const std::string &content)
+	{
+		void *addr = shm.Alloc(content.size());
+		if (addr) {
+			memcpy(addr, content.data(), content.size());
+		}
+		return addr;
+	}
+
 	bool MakeRC(SharedMemory &shm, void *addr);
 	bool Make(SharedMemory &shm, void *addr);
 
@@ -111,7 +120,6 @@
 	}
 
 	bool EnableRefCount(SharedMemory &shm);
-
 	template <class Body>
 	inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
 	{
@@ -119,6 +127,29 @@
 		auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
 		return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
 	}
+	template <class Body>
+	static inline std::string Serialize(const BHMsgHead &head, const Body &body)
+	{
+		uint32_t head_len = head.ByteSizeLong();
+		uint32_t body_len = body.ByteSizeLong();
+		std::string s(4 + head_len + 4 + body_len, '\0');
+		size_t pos = 0;
+		auto add1 = [&](auto &&msg, auto &&size) {
+			Put32(&s[pos], size);
+			pos += 4;
+			msg.SerializeToArray(&s[pos], size);
+			pos += size;
+		};
+		add1(head, head_len);
+		add1(body, body_len);
+		assert(pos == s.size());
+		return s;
+	}
+	inline bool Make(SharedMemory &shm, const std::string &content)
+	{
+		void *p = Pack(shm, content);
+		return Make(shm, p);
+	}
 
 	bool ParseHead(BHMsgHead &head) const;
 	template <class Body>
diff --git a/src/sendq.cpp b/src/sendq.cpp
index ad293c3..4be24f1 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -19,11 +19,6 @@
 #include "shm_queue.h"
 #include <chrono>
 
-//TODO change to save head, body, instead of MsgI.
-// as MsgI which is in shm, but head, body are in current process.
-// Then if node crashes, shm will not be affected by msgs in sendq.
-// but pulishing ref-counted msg need some work.
-
 int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
 {
 	auto FirstNotExpired = [](Array &l) {
@@ -35,22 +30,41 @@
 	for (auto it = arr.begin(); it != pos; ++it) {
 		auto &info = it->data();
 		if (info.on_expire_) {
-			info.on_expire_(info.msg_);
+			info.on_expire_(info.data_);
 		}
-		info.msg_.Release(mq.shm());
+		if (info.data_.index() == 0) {
+			boost::variant2::get<0>(info.data_).Release(mq.shm());
+		}
 	}
 
-	int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end()));
-	for (int i = 0; i < n; ++i) {
-		auto &msg = pos->data().msg_;
-		if (msg.IsCounted()) {
-			msg.Release(mq.shm());
+	auto SendData = [&](Data &d) {
+		bool r = false;
+		if (d.index() == 0) {
+			auto &msg = boost::variant2::get<0>(pos->data().data_);
+			r = mq.TrySend(*(MQId *) remote.data(), msg);
+			if (r && msg.IsCounted()) {
+				msg.Release(mq.shm());
+			}
+		} else {
+			auto &content = boost::variant2::get<1>(pos->data().data_);
+			MsgI msg;
+			if (msg.Make(mq.shm(), content)) {
+				r = mq.TrySend(*(MQId *) remote.data(), msg);
+				if (!r || msg.IsCounted()) {
+					msg.Release(mq.shm());
+				}
+			}
 		}
+		return r;
+	};
+
+	while (pos != arr.end() && SendData(pos->data().data_)) {
 		++pos;
 	}
 
+	int nprocessed = std::distance(arr.begin(), pos);
 	arr.erase(arr.begin(), pos);
-	return n;
+	return nprocessed;
 }
 
 int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
diff --git a/src/sendq.h b/src/sendq.h
index b4f3821..bba44af 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -21,6 +21,7 @@
 #include "defs.h"
 #include "msg.h"
 #include "timed_queue.h"
+#include <boost/variant2/variant.hpp>
 #include <deque>
 #include <functional>
 #include <list>
@@ -38,36 +39,43 @@
 public:
 	typedef std::string Remote;
 	typedef bhome_msg::MsgI MsgI;
-	typedef std::function<void(const MsgI &msg)> OnMsgEvent;
+	typedef std::string Content;
+	typedef boost::variant2::variant<MsgI, Content> Data;
+	typedef std::function<void(const Data &)> OnMsgEvent;
 	struct MsgInfo {
-		MsgI msg_;
+		Data data_;
 		OnMsgEvent on_expire_;
-		// OnMsgEvent on_send_;
 	};
 	typedef TimedData<MsgInfo> TimedMsg;
 	typedef TimedMsg::TimePoint TimePoint;
 	typedef TimedMsg::Duration Duration;
 
-	void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
+	template <class... Rest>
+	void Append(const MQId &id, Rest &&...rest)
 	{
-		Append(std::string((const char *) &id, sizeof(id)), msg, onExpire);
+		Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
 	}
+
 	void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
 	{
-		using namespace std::chrono_literals;
-		Append(addr, msg, Now() + 60s, onExpire);
+		msg.AddRef();
+		AppendData(addr, Data(msg), DefaultExpire(), onExpire);
+	}
+	void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+	{
+		AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
 	}
 	bool TrySend(bhome_shm::ShmMsgQueue &mq);
 	// bool empty() const { return store_.empty(); }
 
 private:
 	static TimePoint Now() { return TimedMsg::Clock::now(); }
-	void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
+	static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
+	void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
 	{
 		//TODO simple queue, organize later ?
 
-		msg.AddRef();
-		TimedMsg tmp(expire, MsgInfo{msg, onExpire});
+		TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
 		std::unique_lock<std::mutex> lock(mutex_in_);
 		auto &al = in_[addr];
 		if (!al.empty()) {
@@ -82,18 +90,6 @@
 
 	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
 	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
-
-	class MsgIter
-	{
-		Array::iterator iter_;
-
-	public:
-		MsgIter(Array::iterator iter) :
-		    iter_(iter) {}
-		MsgIter &operator++() { return ++iter_, *this; }
-		bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
-		MsgI &operator*() { return iter_->data().msg_; }
-	};
 
 	std::mutex mutex_in_;
 	std::mutex mutex_out_;
diff --git a/src/socket.h b/src/socket.h
index dbb161c..db64b36 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,11 +36,10 @@
 
 class ShmSocket : private boost::noncopyable
 {
-	bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+	template <class... T>
+	bool SendImpl(const void *valid_remote, T &&...rest)
 	{
-		// if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
-		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
-		// }
+		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
 		return true;
 	}
 
@@ -69,24 +68,22 @@
 	template <class Body>
 	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
 	{
-		MsgI msg;
-		if (msg.Make(shm(), head, body)) {
-			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
-			std::string msg_id(head.msg_id());
+		try {
 			if (!cb) {
-				return SendImpl(valid_remote, msg);
+				return SendImpl(valid_remote, MsgI::Serialize(head, body));
 			} else {
+				std::string msg_id(head.msg_id());
 				per_msg_cbs_->Add(msg_id, cb);
-				auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
+				auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
 					RecvCB cb_no_use;
 					per_msg_cbs_->Find(msg_id, cb_no_use);
 				};
-				return SendImpl(valid_remote, msg, onExpireRemoveCB);
+				return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
 			}
-		} else {
-			SetLastError(ENOMEM, "Out of mem");
+		} catch (...) {
+			SetLastError(eError, "Send internal error.");
+			return false;
 		}
-		return false;
 	}
 
 	bool Send(const void *valid_remote, const MsgI &imsg)
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 58c73c6..da51044 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -138,6 +138,19 @@
 	}
 }
 
+namespace
+{
+struct CCC {
+};
+void F(CCC &&c) {}
+
+template <class... T>
+void Pass(T &&...t)
+{
+	F(std::forward<decltype(t)>(t)...);
+}
+
+} // namespace
 BOOST_AUTO_TEST_CASE(ApiTest)
 {
 	auto max_time = std::chrono::steady_clock::time_point::max();
@@ -241,7 +254,7 @@
 		MsgStatus last;
 		while (*run) {
 			auto &st = Status();
-			std::this_thread::sleep_for(1s);
+			Sleep(1s, false);
 			printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
 			       st.nrequest_.load(), st.nrequest_ - last.nrequest_,
 			       st.nfailed_.load(),
@@ -270,8 +283,8 @@
 
 	int same = 0;
 	int64_t last = 0;
-	while (last < nreq * ncli && same < 3) {
-		Sleep(1s);
+	while (last < nreq * ncli && same < 2) {
+		Sleep(1s, false);
 		auto cur = Status().nreply_.load();
 		if (last == cur) {
 			++same;

--
Gitblit v1.8.0