From 95bd9a67f9f6c90f627784e3f8fbf5c203784e51 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 30 四月 2021 15:36:17 +0800
Subject: [PATCH] change shm socket msg queue to atomic queue.
---
utest/speed_test.cpp | 17 ++++----
utest/api_test.cpp | 2
utest/util.h | 2 -
src/msg.h | 27 ++++++++-----
src/shm_msg_queue.h | 9 ++--
src/shm_queue.h | 28 +++++++++++++-
src/shm_msg_queue.cpp | 21 ++--------
7 files changed, 61 insertions(+), 45 deletions(-)
diff --git a/src/msg.h b/src/msg.h
index e332a5d..1f5b0f1 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -49,26 +49,29 @@
int Dec() { return --num_; }
int Get() { return num_.load(); }
};
- typedef int64_t Offset;
- static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
- static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
- static inline Offset BaseAddr()
+ typedef int64_t OffsetType;
+ static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
+ static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
+ static inline OffsetType BaseAddr()
{
- static const Offset base = Addr(shm().get_address()); // cache value.
+ static const OffsetType base = Addr(shm().get_address()); // cache value.
return base;
}
static const uint32_t kMsgTag = 0xf1e2d3c4;
- typedef struct {
+ struct Meta {
RefCount count_;
const uint32_t tag_ = kMsgTag;
- } Meta;
- Offset offset_;
+ const uint32_t size_ = 0;
+ Meta(uint32_t size) :
+ size_(size) {}
+ };
+ OffsetType offset_;
void *Alloc(const size_t size)
{
void *p = shm().Alloc(sizeof(Meta) + size);
if (p) {
- auto pmeta = new (p) Meta;
+ auto pmeta = new (p) Meta(size);
p = pmeta + 1;
}
return p;
@@ -136,8 +139,10 @@
static bool BindShm(SharedMemory &shm) { return SetData(shm); }
ShmMsg() :
ShmMsg(nullptr) {}
- explicit ShmMsg(const size_t size) :
- ShmMsg(Alloc(size)) {}
+ explicit ShmMsg(const OffsetType offset) :
+ offset_(offset) {}
+ OffsetType Offset() const { return offset_; }
+ OffsetType &OffsetRef() { return offset_; }
void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 03a6cfb..cd8cd66 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -29,19 +29,6 @@
return std::string(buf, n + 4);
}
-const int AdjustMQLength(const int len)
-{
- const int kMaxLength = 10000;
- const int kDefaultLen = 12;
- if (len <= 0) {
- return kDefaultLen;
- } else if (len < kMaxLength) {
- return len;
- } else {
- return kMaxLength;
- }
-}
-
} // namespace
ShmMsgQueue::MQId ShmMsgQueue::NewId()
@@ -52,13 +39,13 @@
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
id_(id),
- queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+ queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
id_(NewId()),
- queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+ queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
{
if (!queue_.IsOk()) {
throw("error create msgq " + std::to_string(id_));
@@ -72,7 +59,7 @@
Queue *q = Find(shm, id);
if (q) {
MsgI msg;
- while (q->TryRead(msg)) {
+ while (q->TryRead(msg.OffsetRef())) {
msg.Release();
}
}
@@ -90,7 +77,7 @@
bool r = false;
if (remote) {
msg.AddRef();
- r = remote->TryWrite(msg);
+ r = remote->TryWrite(msg.Offset());
if (!r) {
msg.Release();
}
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index c56784c..aff931c 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -26,7 +26,8 @@
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
- typedef ShmObject<SharedQueue<MsgI>> Shmq;
+ typedef ShmObject<SharedQ63<4>> Shmq;
+ // typedef ShmObject<SharedQueue<int64_t>> Shmq;
typedef Shmq::ShmType ShmType;
typedef Shmq::Data Queue;
typedef std::function<void()> OnSend;
@@ -43,15 +44,15 @@
MQId Id() const { return id_; }
ShmType &shm() const { return queue_.shm(); }
- bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
- bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
+ bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); }
+ bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); }
static Queue *Find(SharedMemory &shm, const MQId remote_id);
static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
private:
MQId id_;
- Shmq &queue() { return queue_; }
+ Queue &queue() { return *queue_.data(); }
Shmq queue_;
};
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 7e4ec31..5d5c0e9 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -53,8 +53,32 @@
bool TryWrite(const D &d) { return queue_.push_back(d); }
private:
- typedef Circular<D> Queue;
- Queue queue_;
+ Circular<D> queue_;
+};
+
+template <int Power = 4>
+class SharedQ63
+{
+public:
+ typedef int64_t Data;
+ bool Read(Data &d, const int timeout_ms)
+ {
+ using namespace std::chrono;
+ auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+ do {
+ if (TryRead(d)) {
+ return true;
+ } else {
+ robust::QuickSleep();
+ }
+ } while (steady_clock::now() < end_time);
+ return false;
+ }
+ bool TryRead(Data &d, const bool try_more = true) { return queue_.pop_front(d, try_more); }
+ bool TryWrite(const Data d, const bool try_more = true) { return queue_.push_back(d, try_more); }
+
+private:
+ robust::AtomicQueue<Power, Data> queue_;
};
} // namespace bhome_shm
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index c6165e8..cf7baf9 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -149,7 +149,7 @@
bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
BHFree(reply, reply_len);
// printf("register topic : %s\n", r ? "ok" : "failed");
- Sleep(1s);
+ // Sleep(1s);
}
{ // Subscribe
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index bd455ec..c512569 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -16,9 +16,6 @@
* =====================================================================================
*/
#include "util.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-using namespace boost::posix_time;
BOOST_AUTO_TEST_CASE(SpeedTest)
{
@@ -49,14 +46,18 @@
};
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
ShmMsgQueue mq(id, shm, 1000);
+ auto now = []() { return steady_clock::now(); };
+ auto tm = now();
while (*run) {
MsgI msg;
BHMsgHead head;
- if (mq.Recv(msg, timeout)) {
+ if (mq.TryRecv(msg)) {
DEFER1(msg.Release());
- // ok
+ tm = now();
} else if (isfork) {
- exit(0); // for forked quit after 1s.
+ if (now() > tm + 1s) {
+ exit(0); // for forked quit after 1s.
+ }
}
}
};
@@ -70,8 +71,8 @@
}
};
- int nwriters[] = {1, 2, 4};
- int nreaders[] = {1, 2};
+ int nwriters[] = {1, 4, 16};
+ int nreaders[] = {1, 4};
auto Test = [&](auto &www, auto &rrr, bool isfork) {
for (auto nreader : nreaders) {
diff --git a/utest/util.h b/utest/util.h
index a4cbbaa..23463e2 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -22,7 +22,6 @@
#include "bh_util.h"
#include "shm.h"
#include "topic_node.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/noncopyable.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/timer/timer.hpp>
@@ -34,7 +33,6 @@
#include <thread>
#include <vector>
-using namespace boost::posix_time;
using namespace std::chrono_literals;
using namespace std::chrono;
--
Gitblit v1.8.0