From dc86ace85e437ecb8a2e728e4dce36d02bbb8a6e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 12:59:50 +0800
Subject: [PATCH] move ref count into msg meta, only 1 poinetr now.

---
 utest/speed_test.cpp   |    6 +-
 src/failed_msg.h       |    2 
 box/status_main.cc     |   16 +++++
 utest/simple_tests.cpp |    8 +-
 box/center.cpp         |    1 
 src/msg.h              |   45 ++++++--------
 src/failed_msg.cpp     |   13 +---
 src/sendq.cpp          |    6 -
 src/msg.cpp            |   64 +++++++++------------
 9 files changed, 75 insertions(+), 86 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 3059e90..0f547e9 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -454,7 +454,6 @@
 				replyer(reply);
 			} else {
 				replyer(MakeReply(eSuccess));
-				if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
 				if (clients.empty()) { return; }
 
 				auto it = clients.begin();
diff --git a/box/status_main.cc b/box/status_main.cc
index 3a0288b..a435c2f 100644
--- a/box/status_main.cc
+++ b/box/status_main.cc
@@ -31,8 +31,22 @@
 
 int status_main(int argc, char const *argv[])
 {
-	auto &shm = BHomeShm();
+	AppArg args(argc, argv);
+	auto shm_name = args.Get("shm", BHomeShm().name());
+	auto shm_size = std::atol(args.Get("size", "").c_str());
+	if (shm_size <= 0 || shm_size > 512) {
+		shm_size = 50;
+	}
+	auto DisplayName = [&]() -> std::string {
+		if (shm_name == BHomeShm().name()) {
+			return "[bhome shm]";
+		} else {
+			return shm_name;
+		}
+	};
+	printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size);
 
+	SharedMemory shm(shm_name, 1024 * 1024 * shm_size);
 	std::atomic<bool> run(true);
 
 	auto Now = []() { return steady_clock::now(); };
diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp
index f128499..0b3ee42 100644
--- a/src/failed_msg.cpp
+++ b/src/failed_msg.cpp
@@ -17,18 +17,13 @@
  */
 #include "failed_msg.h"
 
-FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg)
+FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg msg)
 {
 	msg.AddRef();
-	return [remote, msg](void *valid_sock) {
+	return [remote, msg](void *valid_sock) mutable {
 		assert(valid_sock);
 		ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
-		bool r = sock.Send(remote.data(), msg);
-		//TODO check remote removed.
-		if (r && msg.IsCounted()) {
-			auto tmp = msg; // Release() is not const, but it's safe to release.
-			tmp.Release(sock.shm());
-		}
-		return r;
+		DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release.
+		return sock.Send(remote.data(), msg);
 	};
 }
\ No newline at end of file
diff --git a/src/failed_msg.h b/src/failed_msg.h
index 73030ba..8a810c7 100644
--- a/src/failed_msg.h
+++ b/src/failed_msg.h
@@ -40,7 +40,7 @@
 	}
 
 private:
-	Func PrepareSender(const std::string &remote, Msg const &msg);
+	Func PrepareSender(const std::string &remote, Msg msg);
 
 	TimedFuncQ queue_;
 };
diff --git a/src/msg.cpp b/src/msg.cpp
index 06b817e..ba844da 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -26,11 +26,28 @@
 //*/
 const uint32_t kMsgTag = 0xf1e2d3c4;
 
+void *MsgI::Alloc(SharedMemory &shm, const size_t size)
+{
+	void *p = shm.Alloc(sizeof(Meta) + size);
+	if (p) {
+		auto pmeta = new (p) Meta;
+		p = pmeta + 1;
+	}
+	return p;
+}
+void MsgI::Free(SharedMemory &shm)
+{
+	assert(valid());
+	shm.Dealloc(meta());
+	ptr_ = nullptr;
+	assert(!valid());
+}
+
 void *MsgI::Pack(SharedMemory &shm,
                  const uint32_t head_len, const ToArray &headToArray,
                  const uint32_t body_len, const ToArray &bodyToArray)
 {
-	void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
+	void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len);
 	if (addr) {
 		auto p = static_cast<char *>(addr);
 		auto Pack1 = [&p](auto len, auto &writer) {
@@ -47,26 +64,11 @@
 
 bool MsgI::ParseHead(BHMsgHead &head) const
 {
-	auto p = static_cast<char *>(ptr_.get());
+	auto p = get<char>();
 	assert(p);
 	uint32_t msg_size = Get32(p);
 	p += 4;
 	return head.ParseFromArray(p, msg_size);
-}
-
-// with ref count;
-bool MsgI::MakeRC(SharedMemory &shm, void *p)
-{
-	if (!p) {
-		return false;
-	}
-	RefCount *rc = shm.New<RefCount>();
-	if (!rc) {
-		shm.Dealloc(p);
-		return false;
-	}
-	MsgI(p, rc).swap(*this);
-	return true;
 }
 
 bool MsgI::Make(SharedMemory &shm, void *p)
@@ -74,32 +76,20 @@
 	if (!p) {
 		return false;
 	}
-	MsgI(p, 0).swap(*this);
+	MsgI(p).swap(*this);
 	return true;
-}
-
-bool MsgI::EnableRefCount(SharedMemory &shm)
-{
-	if (!IsCounted()) {
-		count_ = shm.New<RefCount>();
-	}
-	return IsCounted();
 }
 
 int MsgI::Release(SharedMemory &shm)
 {
-	if (IsCounted()) {
-		const int n = count_->Dec();
-		if (n != 0) {
-			return n;
-		}
+	if (!valid()) {
+		return 0;
 	}
-	// free data
-	shm.Dealloc(ptr_);
-	ptr_ = 0;
-	shm.Delete(count_);
-	count_ = 0;
-	return 0;
+	auto n = meta()->count_.Dec();
+	if (n == 0) {
+		Free(shm);
+	}
+	return n;
 }
 
 } // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index feab5ec..452567e 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -53,8 +53,13 @@
 class MsgI
 {
 private:
+	struct Meta {
+		RefCount count_;
+	};
 	offset_ptr<void> ptr_;
-	offset_ptr<RefCount> count_;
+	void *Alloc(SharedMemory &shm, const size_t size);
+	void Free(SharedMemory &shm);
+	Meta *meta() const { return get<Meta>() - 1; }
 
 	typedef std::function<void(void *p, int len)> ToArray;
 	void *Pack(SharedMemory &shm,
@@ -72,48 +77,36 @@
 
 	void *Pack(SharedMemory &shm, const std::string &content)
 	{
-		void *addr = shm.Alloc(content.size());
+		void *addr = Alloc(shm, content.size());
 		if (addr) {
 			memcpy(addr, content.data(), content.size());
 		}
 		return addr;
 	}
 
-	bool MakeRC(SharedMemory &shm, void *addr);
 	bool Make(SharedMemory &shm, void *addr);
+	MsgI(void *p) :
+	    ptr_(p) {}
 
 public:
-	MsgI(void *p = 0, RefCount *c = 0) :
-	    ptr_(p), count_(c) {}
-
-	void swap(MsgI &a)
-	{
-		std::swap(ptr_, a.ptr_);
-		std::swap(count_, a.count_);
-	}
+	MsgI() :
+	    MsgI(nullptr) {}
+	MsgI(SharedMemory &shm, const size_t size) :
+	    MsgI(Alloc(shm, size)) {}
+	void swap(MsgI &a) { std::swap(ptr_, a.ptr_); }
+	bool valid() const { return static_cast<bool>(ptr_); }
 	template <class T = void>
-	T *get() { return static_cast<T *>(ptr_.get()); }
+	T *get() const { return static_cast<T *>(ptr_.get()); }
 
 	// AddRef and Release works for both counted and not counted msg.
-	int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
+	int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
 	int Release(SharedMemory &shm);
+	int Count() const { return valid() ? meta()->count_.Get() : 1; }
 
-	int Count() const { return IsCounted() ? count_->Get() : 1; }
-	bool IsCounted() const { return static_cast<bool>(count_); }
-
-	template <class Body>
-	inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
-	{
-		return MakeRC(shm, Pack(shm, head, body));
-	}
-
-	bool EnableRefCount(SharedMemory &shm);
 	template <class Body>
 	inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
 	{
-		void *p = Pack(shm, head, body);
-		auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
-		return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
+		return Make(shm, Pack(shm, head, body));
 	}
 	template <class Body>
 	static inline std::string Serialize(const BHMsgHead &head, const Body &body)
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 4be24f1..8aa7214 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -42,17 +42,15 @@
 		if (d.index() == 0) {
 			auto &msg = boost::variant2::get<0>(pos->data().data_);
 			r = mq.TrySend(*(MQId *) remote.data(), msg);
-			if (r && msg.IsCounted()) {
+			if (r) {
 				msg.Release(mq.shm());
 			}
 		} else {
 			auto &content = boost::variant2::get<1>(pos->data().data_);
 			MsgI msg;
 			if (msg.Make(mq.shm(), content)) {
+				DEFER1(msg.Release(mq.shm()););
 				r = mq.TrySend(*(MQId *) remote.data(), msg);
-				if (!r || msg.IsCounted()) {
-					msg.Release(mq.shm());
-				}
 			}
 		}
 		return r;
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index cbbcc2a..817bdac 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -126,15 +126,15 @@
 	ShmRemover auto_remove(shm_name);
 	SharedMemory shm(shm_name, 1024 * 1024);
 
-	MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
-	BOOST_CHECK(m0.IsCounted());
+	MsgI m0(shm, 1000);
+	BOOST_CHECK(m0.valid());
 	BOOST_CHECK_EQUAL(m0.Count(), 1);
 	MsgI m1 = m0;
-	BOOST_CHECK(m1.IsCounted());
+	BOOST_CHECK(m1.valid());
 	BOOST_CHECK_EQUAL(m1.AddRef(), 2);
 	BOOST_CHECK_EQUAL(m0.AddRef(), 3);
 	BOOST_CHECK_EQUAL(m0.Release(shm), 2);
 	BOOST_CHECK_EQUAL(m0.Release(shm), 1);
 	BOOST_CHECK_EQUAL(m1.Release(shm), 0);
-	BOOST_CHECK(!m1.IsCounted());
+	BOOST_CHECK(!m1.valid());
 }
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 86367b9..5de3c93 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -39,12 +39,12 @@
 		body.set_topic("topic");
 		body.set_data(str);
 		auto head(InitMsgHead(GetType(body), proc_id));
-		msg.MakeRC(shm, head, body);
-		assert(msg.IsCounted());
+		msg.Make(shm, head, body);
+		assert(msg.valid());
 		DEFER1(msg.Release(shm););
 
 		for (uint64_t i = 0; i < n; ++i) {
-			mq.Send(id, msg, timeout);
+			while (!mq.TrySend(id, msg)) {}
 		}
 	};
 	auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {

--
Gitblit v1.8.0