From 95bd9a67f9f6c90f627784e3f8fbf5c203784e51 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 30 四月 2021 15:36:17 +0800
Subject: [PATCH] change shm socket msg queue to atomic queue.

---
 utest/speed_test.cpp  |   17 ++++----
 utest/api_test.cpp    |    2 
 utest/util.h          |    2 -
 src/msg.h             |   27 ++++++++-----
 src/shm_msg_queue.h   |    9 ++--
 src/shm_queue.h       |   28 +++++++++++++-
 src/shm_msg_queue.cpp |   21 ++--------
 7 files changed, 61 insertions(+), 45 deletions(-)

diff --git a/src/msg.h b/src/msg.h
index e332a5d..1f5b0f1 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -49,26 +49,29 @@
 		int Dec() { return --num_; }
 		int Get() { return num_.load(); }
 	};
-	typedef int64_t Offset;
-	static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
-	static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
-	static inline Offset BaseAddr()
+	typedef int64_t OffsetType;
+	static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
+	static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
+	static inline OffsetType BaseAddr()
 	{
-		static const Offset base = Addr(shm().get_address()); // cache value.
+		static const OffsetType base = Addr(shm().get_address()); // cache value.
 		return base;
 	}
 
 	static const uint32_t kMsgTag = 0xf1e2d3c4;
-	typedef struct {
+	struct Meta {
 		RefCount count_;
 		const uint32_t tag_ = kMsgTag;
-	} Meta;
-	Offset offset_;
+		const uint32_t size_ = 0;
+		Meta(uint32_t size) :
+		    size_(size) {}
+	};
+	OffsetType offset_;
 	void *Alloc(const size_t size)
 	{
 		void *p = shm().Alloc(sizeof(Meta) + size);
 		if (p) {
-			auto pmeta = new (p) Meta;
+			auto pmeta = new (p) Meta(size);
 			p = pmeta + 1;
 		}
 		return p;
@@ -136,8 +139,10 @@
 	static bool BindShm(SharedMemory &shm) { return SetData(shm); }
 	ShmMsg() :
 	    ShmMsg(nullptr) {}
-	explicit ShmMsg(const size_t size) :
-	    ShmMsg(Alloc(size)) {}
+	explicit ShmMsg(const OffsetType offset) :
+	    offset_(offset) {}
+	OffsetType Offset() const { return offset_; }
+	OffsetType &OffsetRef() { return offset_; }
 	void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
 	bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
 
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 03a6cfb..cd8cd66 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -29,19 +29,6 @@
 	return std::string(buf, n + 4);
 }
 
-const int AdjustMQLength(const int len)
-{
-	const int kMaxLength = 10000;
-	const int kDefaultLen = 12;
-	if (len <= 0) {
-		return kDefaultLen;
-	} else if (len < kMaxLength) {
-		return len;
-	} else {
-		return kMaxLength;
-	}
-}
-
 } // namespace
 
 ShmMsgQueue::MQId ShmMsgQueue::NewId()
@@ -52,13 +39,13 @@
 // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
 ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
     id_(id),
-    queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+    queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
 {
 }
 
 ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
     id_(NewId()),
-    queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+    queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
 {
 	if (!queue_.IsOk()) {
 		throw("error create msgq " + std::to_string(id_));
@@ -72,7 +59,7 @@
 	Queue *q = Find(shm, id);
 	if (q) {
 		MsgI msg;
-		while (q->TryRead(msg)) {
+		while (q->TryRead(msg.OffsetRef())) {
 			msg.Release();
 		}
 	}
@@ -90,7 +77,7 @@
 	bool r = false;
 	if (remote) {
 		msg.AddRef();
-		r = remote->TryWrite(msg);
+		r = remote->TryWrite(msg.Offset());
 		if (!r) {
 			msg.Release();
 		}
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index c56784c..aff931c 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -26,7 +26,8 @@
 
 class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
 {
-	typedef ShmObject<SharedQueue<MsgI>> Shmq;
+	typedef ShmObject<SharedQ63<4>> Shmq;
+	// typedef ShmObject<SharedQueue<int64_t>> Shmq;
 	typedef Shmq::ShmType ShmType;
 	typedef Shmq::Data Queue;
 	typedef std::function<void()> OnSend;
@@ -43,15 +44,15 @@
 	MQId Id() const { return id_; }
 	ShmType &shm() const { return queue_.shm(); }
 
-	bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
-	bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
+	bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); }
+	bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); }
 	static Queue *Find(SharedMemory &shm, const MQId remote_id);
 	static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
 	bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
 
 private:
 	MQId id_;
-	Shmq &queue() { return queue_; }
+	Queue &queue() { return *queue_.data(); }
 	Shmq queue_;
 };
 
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 7e4ec31..5d5c0e9 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -53,8 +53,32 @@
 	bool TryWrite(const D &d) { return queue_.push_back(d); }
 
 private:
-	typedef Circular<D> Queue;
-	Queue queue_;
+	Circular<D> queue_;
+};
+
+template <int Power = 4>
+class SharedQ63
+{
+public:
+	typedef int64_t Data;
+	bool Read(Data &d, const int timeout_ms)
+	{
+		using namespace std::chrono;
+		auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+		do {
+			if (TryRead(d)) {
+				return true;
+			} else {
+				robust::QuickSleep();
+			}
+		} while (steady_clock::now() < end_time);
+		return false;
+	}
+	bool TryRead(Data &d, const bool try_more = true) { return queue_.pop_front(d, try_more); }
+	bool TryWrite(const Data d, const bool try_more = true) { return queue_.push_back(d, try_more); }
+
+private:
+	robust::AtomicQueue<Power, Data> queue_;
 };
 
 } // namespace bhome_shm
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index c6165e8..cf7baf9 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -149,7 +149,7 @@
 		bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
 		BHFree(reply, reply_len);
 		// printf("register topic : %s\n", r ? "ok" : "failed");
-		Sleep(1s);
+		// Sleep(1s);
 	}
 
 	{ // Subscribe
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index bd455ec..c512569 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -16,9 +16,6 @@
  * =====================================================================================
  */
 #include "util.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-using namespace boost::posix_time;
 
 BOOST_AUTO_TEST_CASE(SpeedTest)
 {
@@ -49,14 +46,18 @@
 	};
 	auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
 		ShmMsgQueue mq(id, shm, 1000);
+		auto now = []() { return steady_clock::now(); };
+		auto tm = now();
 		while (*run) {
 			MsgI msg;
 			BHMsgHead head;
-			if (mq.Recv(msg, timeout)) {
+			if (mq.TryRecv(msg)) {
 				DEFER1(msg.Release());
-				// ok
+				tm = now();
 			} else if (isfork) {
-				exit(0); // for forked quit after 1s.
+				if (now() > tm + 1s) {
+					exit(0); // for forked quit after 1s.
+				}
 			}
 		}
 	};
@@ -70,8 +71,8 @@
 		}
 	};
 
-	int nwriters[] = {1, 2, 4};
-	int nreaders[] = {1, 2};
+	int nwriters[] = {1, 4, 16};
+	int nreaders[] = {1, 4};
 
 	auto Test = [&](auto &www, auto &rrr, bool isfork) {
 		for (auto nreader : nreaders) {
diff --git a/utest/util.h b/utest/util.h
index a4cbbaa..23463e2 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -22,7 +22,6 @@
 #include "bh_util.h"
 #include "shm.h"
 #include "topic_node.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/test/unit_test.hpp>
 #include <boost/timer/timer.hpp>
@@ -34,7 +33,6 @@
 #include <thread>
 #include <vector>
 
-using namespace boost::posix_time;
 using namespace std::chrono_literals;
 using namespace std::chrono;
 

--
Gitblit v1.8.0