From 95bd9a67f9f6c90f627784e3f8fbf5c203784e51 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 15:36:17 +0800 Subject: [PATCH] change shm socket msg queue to atomic queue. --- utest/speed_test.cpp | 17 ++++---- utest/api_test.cpp | 2 utest/util.h | 2 - src/msg.h | 27 ++++++++----- src/shm_msg_queue.h | 9 ++-- src/shm_queue.h | 28 +++++++++++++- src/shm_msg_queue.cpp | 21 ++-------- 7 files changed, 61 insertions(+), 45 deletions(-) diff --git a/src/msg.h b/src/msg.h index e332a5d..1f5b0f1 100644 --- a/src/msg.h +++ b/src/msg.h @@ -49,26 +49,29 @@ int Dec() { return --num_; } int Get() { return num_.load(); } }; - typedef int64_t Offset; - static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } - static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } - static inline Offset BaseAddr() + typedef int64_t OffsetType; + static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); } + static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); } + static inline OffsetType BaseAddr() { - static const Offset base = Addr(shm().get_address()); // cache value. + static const OffsetType base = Addr(shm().get_address()); // cache value. return base; } static const uint32_t kMsgTag = 0xf1e2d3c4; - typedef struct { + struct Meta { RefCount count_; const uint32_t tag_ = kMsgTag; - } Meta; - Offset offset_; + const uint32_t size_ = 0; + Meta(uint32_t size) : + size_(size) {} + }; + OffsetType offset_; void *Alloc(const size_t size) { void *p = shm().Alloc(sizeof(Meta) + size); if (p) { - auto pmeta = new (p) Meta; + auto pmeta = new (p) Meta(size); p = pmeta + 1; } return p; @@ -136,8 +139,10 @@ static bool BindShm(SharedMemory &shm) { return SetData(shm); } ShmMsg() : ShmMsg(nullptr) {} - explicit ShmMsg(const size_t size) : - ShmMsg(Alloc(size)) {} + explicit ShmMsg(const OffsetType offset) : + offset_(offset) {} + OffsetType Offset() const { return offset_; } + OffsetType &OffsetRef() { return offset_; } void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; } diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index 03a6cfb..cd8cd66 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -29,19 +29,6 @@ return std::string(buf, n + 4); } -const int AdjustMQLength(const int len) -{ - const int kMaxLength = 10000; - const int kDefaultLen = 12; - if (len <= 0) { - return kDefaultLen; - } else if (len < kMaxLength) { - return len; - } else { - return kMaxLength; - } -} - } // namespace ShmMsgQueue::MQId ShmMsgQueue::NewId() @@ -52,13 +39,13 @@ // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : id_(id), - queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) + queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) { } ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : id_(NewId()), - queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) + queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) { if (!queue_.IsOk()) { throw("error create msgq " + std::to_string(id_)); @@ -72,7 +59,7 @@ Queue *q = Find(shm, id); if (q) { MsgI msg; - while (q->TryRead(msg)) { + while (q->TryRead(msg.OffsetRef())) { msg.Release(); } } @@ -90,7 +77,7 @@ bool r = false; if (remote) { msg.AddRef(); - r = remote->TryWrite(msg); + r = remote->TryWrite(msg.Offset()); if (!r) { msg.Release(); } diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index c56784c..aff931c 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -26,7 +26,8 @@ class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> { - typedef ShmObject<SharedQueue<MsgI>> Shmq; + typedef ShmObject<SharedQ63<4>> Shmq; + // typedef ShmObject<SharedQueue<int64_t>> Shmq; typedef Shmq::ShmType ShmType; typedef Shmq::Data Queue; typedef std::function<void()> OnSend; @@ -43,15 +44,15 @@ MQId Id() const { return id_; } ShmType &shm() const { return queue_.shm(); } - bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } - bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } + bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); } + bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); } static Queue *Find(SharedMemory &shm, const MQId remote_id); static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } private: MQId id_; - Shmq &queue() { return queue_; } + Queue &queue() { return *queue_.data(); } Shmq queue_; }; diff --git a/src/shm_queue.h b/src/shm_queue.h index 7e4ec31..5d5c0e9 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -53,8 +53,32 @@ bool TryWrite(const D &d) { return queue_.push_back(d); } private: - typedef Circular<D> Queue; - Queue queue_; + Circular<D> queue_; +}; + +template <int Power = 4> +class SharedQ63 +{ +public: + typedef int64_t Data; + bool Read(Data &d, const int timeout_ms) + { + using namespace std::chrono; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + do { + if (TryRead(d)) { + return true; + } else { + robust::QuickSleep(); + } + } while (steady_clock::now() < end_time); + return false; + } + bool TryRead(Data &d, const bool try_more = true) { return queue_.pop_front(d, try_more); } + bool TryWrite(const Data d, const bool try_more = true) { return queue_.push_back(d, try_more); } + +private: + robust::AtomicQueue<Power, Data> queue_; }; } // namespace bhome_shm diff --git a/utest/api_test.cpp b/utest/api_test.cpp index c6165e8..cf7baf9 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -149,7 +149,7 @@ bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); BHFree(reply, reply_len); // printf("register topic : %s\n", r ? "ok" : "failed"); - Sleep(1s); + // Sleep(1s); } { // Subscribe diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index bd455ec..c512569 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -16,9 +16,6 @@ * ===================================================================================== */ #include "util.h" -#include <boost/date_time/posix_time/posix_time.hpp> - -using namespace boost::posix_time; BOOST_AUTO_TEST_CASE(SpeedTest) { @@ -49,14 +46,18 @@ }; auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { ShmMsgQueue mq(id, shm, 1000); + auto now = []() { return steady_clock::now(); }; + auto tm = now(); while (*run) { MsgI msg; BHMsgHead head; - if (mq.Recv(msg, timeout)) { + if (mq.TryRecv(msg)) { DEFER1(msg.Release()); - // ok + tm = now(); } else if (isfork) { - exit(0); // for forked quit after 1s. + if (now() > tm + 1s) { + exit(0); // for forked quit after 1s. + } } } }; @@ -70,8 +71,8 @@ } }; - int nwriters[] = {1, 2, 4}; - int nreaders[] = {1, 2}; + int nwriters[] = {1, 4, 16}; + int nreaders[] = {1, 4}; auto Test = [&](auto &www, auto &rrr, bool isfork) { for (auto nreader : nreaders) { diff --git a/utest/util.h b/utest/util.h index a4cbbaa..23463e2 100644 --- a/utest/util.h +++ b/utest/util.h @@ -22,7 +22,6 @@ #include "bh_util.h" #include "shm.h" #include "topic_node.h" -#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/noncopyable.hpp> #include <boost/test/unit_test.hpp> #include <boost/timer/timer.hpp> @@ -34,7 +33,6 @@ #include <thread> #include <vector> -using namespace boost::posix_time; using namespace std::chrono_literals; using namespace std::chrono; -- Gitblit v1.8.0