From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/robust.h | 185 ++++++++++++++++++++++++++--------------------
1 files changed, 105 insertions(+), 80 deletions(-)
diff --git a/src/robust.h b/src/robust.h
index d2d94e9..b3b99c4 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -19,6 +19,7 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
+#include "bh_util.h"
#include "log.h"
#include <atomic>
#include <chrono>
@@ -37,8 +38,6 @@
using namespace std::chrono;
using namespace std::chrono_literals;
-constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
-
void QuickSleep();
class CasMutex
@@ -86,6 +85,8 @@
class NullMutex
{
public:
+ template <class... T>
+ explicit NullMutex(T &&...t) {} // easy test.
bool try_lock() { return true; }
void lock() {}
void unlock() {}
@@ -97,7 +98,7 @@
public:
typedef uint64_t id_t;
FMutex(id_t id) :
- id_(id), fd_(Open(id_))
+ id_(id), fd_(Open(id_)), count_(0)
{
if (fd_ == -1) { throw "error create mutex!"; }
}
@@ -113,11 +114,12 @@
mkdir(dir.c_str(), 0777);
return dir + "/fm_" + std::to_string(id);
}
- static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
+ static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
static int Close(int fd) { return close(fd); }
id_t id_;
int fd_;
std::mutex mtx_;
+ std::atomic<int32_t> count_;
};
union semun {
@@ -132,17 +134,11 @@
{
public:
SemMutex(key_t key) :
- key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
+ key_(key), sem_id_(semget(key, 1, 0666))
{
if (sem_id_ == -1) { throw "error create semaphore."; }
- union semun init_val;
- init_val.val = 1;
- semctl(sem_id_, 0, SETVAL, init_val);
}
- ~SemMutex()
- {
- // semctl(sem_id_, 0, IPC_RMID, semun{});
- }
+ ~SemMutex() {}
bool try_lock()
{
@@ -179,74 +175,6 @@
Guard(const Guard &);
Guard(Guard &&);
Lock &l_;
-};
-
-template <class D, class Alloc = std::allocator<D>>
-class CircularBuffer
-{
- typedef uint32_t size_type;
- typedef uint32_t count_type;
- typedef uint64_t meta_type;
- static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
- static count_type Count(meta_type meta) { return meta >> 32; }
- static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
-
-public:
- typedef D Data;
-
- CircularBuffer(const size_type cap) :
- CircularBuffer(cap, Alloc()) {}
- CircularBuffer(const size_type cap, Alloc const &al) :
- capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
- {
- if (!buf) {
- throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
- } else {
- memset(&buf[0], 0, sizeof(D) * capacity_);
- }
- }
- ~CircularBuffer() { al_.deallocate(buf, capacity_); }
-
- bool push_back(const Data d)
- {
- auto old = mtail();
- auto pos = Pos(old);
- auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
- if (!full) {
- buf[pos] = d;
- return mtail_.compare_exchange_strong(old, next(old));
- }
- return false;
- }
- bool pop_front(Data &d)
- {
- auto old = mhead();
- auto pos = Pos(old);
- if (!(pos == tail())) {
- d = buf[pos];
- return mhead_.compare_exchange_strong(old, next(old));
- } else {
- return false;
- }
- }
-
-private:
- CircularBuffer(const CircularBuffer &);
- CircularBuffer(CircularBuffer &&);
- CircularBuffer &operator=(const CircularBuffer &) = delete;
- CircularBuffer &operator=(CircularBuffer &&) = delete;
-
- meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
- size_type head() const { return Pos(mhead()); }
- size_type tail() const { return Pos(mtail()); }
- meta_type mhead() const { return mhead_.load(); }
- meta_type mtail() const { return mtail_.load(); }
- // data
- const size_type capacity_;
- std::atomic<meta_type> mhead_;
- std::atomic<meta_type> mtail_;
- Alloc al_;
- typename Alloc::pointer buf = nullptr;
};
template <unsigned PowerSize = 4, class Int = int64_t>
@@ -312,5 +240,102 @@
AData buf[capacity];
};
+template <class Int>
+class AtomicQueue<0, Int>
+{
+ typedef Int Data;
+ typedef std::atomic<Data> AData;
+ static_assert(sizeof(Data) == sizeof(AData));
+
+public:
+ AtomicQueue() { memset(this, 0, sizeof(*this)); }
+ bool push(const Data d, bool try_more = false)
+ {
+ auto cur = buf.load();
+ return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
+ }
+ bool pop(Data &d, bool try_more = false)
+ {
+ Data cur = buf.load();
+ bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
+ if (r) { d = Dec(cur); }
+ return r;
+ }
+ uint32_t head() const { return 0; }
+ uint32_t tail() const { return 0; }
+
+private:
+ static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
+ static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok.
+ static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok.
+ AData buf;
+};
+
+class AtomicReqRep
+{
+public:
+ typedef int64_t Data;
+ typedef std::function<Data(const Data)> Handler;
+ bool ClientRequest(const Data request, Data &reply)
+ {
+ auto end_time = now() + 3s;
+ do {
+ Data cur = data_.load();
+ if (GetState(cur) == eStateFree &&
+ DataCas(cur, Encode(request, eStateRequest))) {
+ do {
+ yield();
+ cur = data_.load();
+ if (GetState(cur) == eStateReply) {
+ DataCas(cur, Encode(0, eStateFree));
+ reply = Decode(cur);
+ return true;
+ }
+ } while (now() < end_time);
+ }
+ yield();
+ } while (now() < end_time);
+ return false;
+ }
+
+ bool ServerProcess(Handler onReq)
+ {
+ Data cur = data_.load();
+ switch (GetState(cur)) {
+ case eStateRequest:
+ if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
+ timestamp_ = now();
+ return true;
+ }
+ break;
+ case eStateReply:
+ if (timestamp_.load() + 3s < now()) {
+ DataCas(cur, Encode(0, eStateFree));
+ }
+ break;
+ case eStateFree:
+ default: break;
+ }
+ return false;
+ }
+
+private:
+ enum State {
+ eStateFree,
+ eStateRequest,
+ eStateReply
+ };
+ static int GetState(Data d) { return d & MaskBits(3); }
+ static Data Encode(Data d, State st) { return (d << 3) | st; }
+ static Data Decode(Data d) { return d >> 3; }
+ static void yield() { QuickSleep(); }
+ typedef steady_clock::duration Duration;
+ Duration now() { return steady_clock::now().time_since_epoch(); }
+
+ bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
+ std::atomic<Data> data_;
+ std::atomic<Duration> timestamp_;
+};
+
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
--
Gitblit v1.8.0