From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/robust.h | 405 ++++++++++++++++++++++++++++-----------------------------
1 files changed, 197 insertions(+), 208 deletions(-)
diff --git a/src/robust.h b/src/robust.h
index b7459ad..b3b99c4 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -19,11 +19,17 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
+#include "bh_util.h"
+#include "log.h"
#include <atomic>
#include <chrono>
#include <memory>
-#include <string.h>
+#include <mutex>
#include <string>
+#include <sys/file.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
@@ -32,147 +38,23 @@
using namespace std::chrono;
using namespace std::chrono_literals;
-constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
-
void QuickSleep();
-class RobustReqRep
-{
- typedef uint32_t State;
- typedef std::string Msg;
- typedef std::chrono::steady_clock::duration Duration;
-
-public:
- enum ErrorCode {
- eSuccess = 0,
- eTimeout = EAGAIN,
- eSizeError = EINVAL,
- };
-
- explicit RobustReqRep(const uint32_t max_len) :
- capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
-
- void PutReady() { state_.store(eStateReady); }
- bool Ready() const { return state_.load() == eStateReady; }
- uint32_t capacity() const { return capacity_; }
-
- int ClientRequest(const Msg &request, Msg &reply)
- {
- int r = ClientWriteRequest(request);
- if (r == eSuccess) {
- r = ClientReadReply(reply);
- }
- return r;
- }
- int ClientReadReply(Msg &reply);
- int ClientWriteRequest(const Msg &request);
- int ServerReadRequest(Msg &request);
- int ServerWriteReply(const Msg &reply);
-
-private:
- RobustReqRep(const RobustReqRep &);
- RobustReqRep(RobustReqRep &&);
- RobustReqRep &operator=(const RobustReqRep &) = delete;
- RobustReqRep &operator=(RobustReqRep &&) = delete;
-
- enum {
- eStateInit = 0,
- eStateReady = 0x19833891,
- eClientWriteBegin,
- eClientWriteEnd,
- eServerReadBegin,
- eServerReadEnd,
- eServerWriteBegin,
- eServerWriteEnd,
- eClientReadBegin,
- eClientReadEnd = eStateReady,
- };
- bool StateCas(State exp, State val);
- void Write(const Msg &msg)
- {
- size_.store(msg.size());
- memcpy(buf, msg.data(), msg.size());
- }
- void Read(Msg &msg) { msg.assign(buf, size_.load()); }
-
- const uint32_t capacity_;
- std::atomic<State> state_;
- static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
- std::atomic<Duration> timestamp_;
- std::atomic<int32_t> size_;
- char buf[4];
-};
-
-class PidLocker
-{
-public:
- typedef int locker_t;
- enum { eLockerBits = sizeof(locker_t) * 8 };
- static locker_t this_locker()
- {
- static locker_t val = getpid();
- return val;
- }
- static bool is_alive(locker_t locker) { return true; }
-};
-
-class RobustPidLocker
-{
-public:
- typedef int locker_t;
- enum { eLockerBits = sizeof(locker_t) * 8 };
- static locker_t this_locker()
- {
- static locker_t val = getpid();
- return val;
- }
- static bool is_alive(locker_t locker)
- {
- char buf[64] = {0};
- snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
- return access(buf, F_OK) == 0;
- }
-};
-
-class ExpiredLocker
-{
-public:
- typedef int64_t locker_t;
- enum { eLockerBits = 63 };
- static locker_t this_locker() { return Now(); }
- static bool is_alive(locker_t locker)
- {
- return Now() < locker + steady_clock::duration(10s).count();
- }
-
-private:
- static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
-};
-
-template <class LockerT>
class CasMutex
{
- typedef typename LockerT::locker_t locker_t;
- static inline locker_t this_locker() { return LockerT::this_locker(); }
- static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
- static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
- static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
+ typedef uint64_t locker_t;
+ static inline locker_t this_locker() { return pthread_self(); }
+ static const uint64_t kLockerMask = MaskBits(63);
public:
CasMutex() :
meta_(0) {}
int try_lock()
{
- const auto t = steady_clock::now().time_since_epoch().count();
auto old = meta_.load();
int r = 0;
if (!Locked(old)) {
r = MetaCas(old, Meta(1, this_locker()));
- } else if (!is_alive(Locker(old))) {
- r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
- if (r) {
- printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r);
- }
}
return r;
}
@@ -200,7 +82,86 @@
bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
-typedef CasMutex<RobustPidLocker> Mutex;
+class NullMutex
+{
+public:
+ template <class... T>
+ explicit NullMutex(T &&...t) {} // easy test.
+ bool try_lock() { return true; }
+ void lock() {}
+ void unlock() {}
+};
+
+// flock + mutex
+class FMutex
+{
+public:
+ typedef uint64_t id_t;
+ FMutex(id_t id) :
+ id_(id), fd_(Open(id_)), count_(0)
+ {
+ if (fd_ == -1) { throw "error create mutex!"; }
+ }
+ ~FMutex() { Close(fd_); }
+ bool try_lock();
+ void lock();
+ void unlock();
+
+private:
+ static std::string GetPath(id_t id)
+ {
+ const std::string dir("/tmp/.bhome_mtx");
+ mkdir(dir.c_str(), 0777);
+ return dir + "/fm_" + std::to_string(id);
+ }
+ static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
+ static int Close(int fd) { return close(fd); }
+ id_t id_;
+ int fd_;
+ std::mutex mtx_;
+ std::atomic<int32_t> count_;
+};
+
+union semun {
+ int val; /* Value for SETVAL */
+ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
+ unsigned short *array; /* Array for GETALL, SETALL */
+ struct seminfo *__buf; /* Buffer for IPC_INFO
+ (Linux-specific) */
+};
+
+class SemMutex
+{
+public:
+ SemMutex(key_t key) :
+ key_(key), sem_id_(semget(key, 1, 0666))
+ {
+ if (sem_id_ == -1) { throw "error create semaphore."; }
+ }
+ ~SemMutex() {}
+
+ bool try_lock()
+ {
+ sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
+ return semop(sem_id_, &op, 1) == 0;
+ }
+
+ void lock()
+ {
+ sembuf op = {0, -1, SEM_UNDO};
+ semop(sem_id_, &op, 1);
+ }
+
+ void unlock()
+ {
+ sembuf op = {0, 1, SEM_UNDO};
+ semop(sem_id_, &op, 1);
+ }
+
+private:
+ key_t key_;
+ int sem_id_;
+};
template <class Lock>
class Guard
@@ -214,77 +175,6 @@
Guard(const Guard &);
Guard(Guard &&);
Lock &l_;
-};
-
-template <class D, class Alloc = std::allocator<D>>
-class CircularBuffer
-{
- typedef uint32_t size_type;
- typedef uint32_t count_type;
- typedef uint64_t meta_type;
- static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
- static count_type Count(meta_type meta) { return meta >> 32; }
- static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
-
-public:
- typedef D Data;
-
- CircularBuffer(const size_type cap) :
- CircularBuffer(cap, Alloc()) {}
- CircularBuffer(const size_type cap, Alloc const &al) :
- capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
- {
- if (!buf) {
- throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
- } else {
- memset(&buf[0], 0, sizeof(D) * capacity_);
- }
- }
- ~CircularBuffer() { al_.deallocate(buf, capacity_); }
-
- bool push_back(const Data d)
- {
- Guard<Mutex> guard(mutex_);
- auto old = mtail();
- auto pos = Pos(old);
- auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
- if (!full) {
- buf[pos] = d;
- return mtail_.compare_exchange_strong(old, next(old));
- }
- return false;
- }
- bool pop_front(Data &d)
- {
- Guard<Mutex> guard(mutex_);
- auto old = mhead();
- auto pos = Pos(old);
- if (!(pos == tail())) {
- d = buf[pos];
- return mhead_.compare_exchange_strong(old, next(old));
- } else {
- return false;
- }
- }
-
-private:
- CircularBuffer(const CircularBuffer &);
- CircularBuffer(CircularBuffer &&);
- CircularBuffer &operator=(const CircularBuffer &) = delete;
- CircularBuffer &operator=(CircularBuffer &&) = delete;
-
- meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
- size_type head() const { return Pos(mhead()); }
- size_type tail() const { return Pos(mtail()); }
- meta_type mhead() const { return mhead_.load(); }
- meta_type mtail() const { return mtail_.load(); }
- // data
- const size_type capacity_;
- Mutex mutex_;
- std::atomic<meta_type> mhead_;
- std::atomic<meta_type> mtail_;
- Alloc al_;
- typename Alloc::pointer buf = nullptr;
};
template <unsigned PowerSize = 4, class Int = int64_t>
@@ -306,28 +196,30 @@
size_type tail() const { return tail_.load(); }
bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
- bool push_back(const Data d, bool try_more = false)
+ bool push(const Data d, bool try_more = false)
{
bool r = false;
size_type i = 0;
do {
auto pos = tail();
- auto cur = buf[pos].load();
- r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
- tail_.compare_exchange_strong(pos, Next(pos));
+ if (tail_.compare_exchange_strong(pos, Next(pos))) {
+ auto cur = buf[pos].load();
+ r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
+ }
} while (try_more && !r && ++i < capacity);
return r;
}
- bool pop_front(Data &d, bool try_more = false)
+ bool pop(Data &d, bool try_more = false)
{
bool r = false;
Data cur;
size_type i = 0;
do {
auto pos = head();
- cur = buf[pos].load();
- r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
- head_.compare_exchange_strong(pos, Next(pos));
+ if (head_.compare_exchange_strong(pos, Next(pos))) {
+ cur = buf[pos].load();
+ r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
+ }
} while (try_more && !r && ++i < capacity);
if (r) { d = Dec(cur); }
return r;
@@ -348,5 +240,102 @@
AData buf[capacity];
};
+template <class Int>
+class AtomicQueue<0, Int>
+{
+ typedef Int Data;
+ typedef std::atomic<Data> AData;
+ static_assert(sizeof(Data) == sizeof(AData));
+
+public:
+ AtomicQueue() { memset(this, 0, sizeof(*this)); }
+ bool push(const Data d, bool try_more = false)
+ {
+ auto cur = buf.load();
+ return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
+ }
+ bool pop(Data &d, bool try_more = false)
+ {
+ Data cur = buf.load();
+ bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
+ if (r) { d = Dec(cur); }
+ return r;
+ }
+ uint32_t head() const { return 0; }
+ uint32_t tail() const { return 0; }
+
+private:
+ static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
+ static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok.
+ static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok.
+ AData buf;
+};
+
+class AtomicReqRep
+{
+public:
+ typedef int64_t Data;
+ typedef std::function<Data(const Data)> Handler;
+ bool ClientRequest(const Data request, Data &reply)
+ {
+ auto end_time = now() + 3s;
+ do {
+ Data cur = data_.load();
+ if (GetState(cur) == eStateFree &&
+ DataCas(cur, Encode(request, eStateRequest))) {
+ do {
+ yield();
+ cur = data_.load();
+ if (GetState(cur) == eStateReply) {
+ DataCas(cur, Encode(0, eStateFree));
+ reply = Decode(cur);
+ return true;
+ }
+ } while (now() < end_time);
+ }
+ yield();
+ } while (now() < end_time);
+ return false;
+ }
+
+ bool ServerProcess(Handler onReq)
+ {
+ Data cur = data_.load();
+ switch (GetState(cur)) {
+ case eStateRequest:
+ if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
+ timestamp_ = now();
+ return true;
+ }
+ break;
+ case eStateReply:
+ if (timestamp_.load() + 3s < now()) {
+ DataCas(cur, Encode(0, eStateFree));
+ }
+ break;
+ case eStateFree:
+ default: break;
+ }
+ return false;
+ }
+
+private:
+ enum State {
+ eStateFree,
+ eStateRequest,
+ eStateReply
+ };
+ static int GetState(Data d) { return d & MaskBits(3); }
+ static Data Encode(Data d, State st) { return (d << 3) | st; }
+ static Data Decode(Data d) { return d >> 3; }
+ static void yield() { QuickSleep(); }
+ typedef steady_clock::duration Duration;
+ Duration now() { return steady_clock::now().time_since_epoch(); }
+
+ bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
+ std::atomic<Data> data_;
+ std::atomic<Duration> timestamp_;
+};
+
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
--
Gitblit v1.8.0