/*
|
* =====================================================================================
|
*
|
* Filename: robust.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月27日 10时04分29秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
|
#ifndef ROBUST_Q31RCWYU
|
#define ROBUST_Q31RCWYU
|
|
#include <atomic>
|
#include <chrono>
|
#include <memory>
|
#include <string.h>
|
#include <string>
|
#include <sys/types.h>
|
#include <unistd.h>
|
|
namespace robust
|
{
|
|
using namespace std::chrono;
|
using namespace std::chrono_literals;
|
|
void QuickSleep();
|
|
class RobustReqRep
|
{
|
typedef uint32_t State;
|
typedef std::string Msg;
|
typedef std::chrono::steady_clock::duration Duration;
|
|
public:
|
enum ErrorCode {
|
eSuccess = 0,
|
eTimeout = EAGAIN,
|
eSizeError = EINVAL,
|
};
|
|
explicit RobustReqRep(const uint32_t max_len) :
|
capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
|
|
void PutReady() { state_.store(eStateReady); }
|
bool Ready() const { return state_.load() == eStateReady; }
|
uint32_t capacity() const { return capacity_; }
|
|
int ClientRequest(const Msg &request, Msg &reply)
|
{
|
int r = ClientWriteRequest(request);
|
if (r == eSuccess) {
|
r = ClientReadReply(reply);
|
}
|
return r;
|
}
|
int ClientReadReply(Msg &reply);
|
int ClientWriteRequest(const Msg &request);
|
int ServerReadRequest(Msg &request);
|
int ServerWriteReply(const Msg &reply);
|
|
private:
|
RobustReqRep(const RobustReqRep &);
|
RobustReqRep(RobustReqRep &&);
|
RobustReqRep &operator=(const RobustReqRep &) = delete;
|
RobustReqRep &operator=(RobustReqRep &&) = delete;
|
|
enum {
|
eStateInit = 0,
|
eStateReady = 0x19833891,
|
eClientWriteBegin,
|
eClientWriteEnd,
|
eServerReadBegin,
|
eServerReadEnd,
|
eServerWriteBegin,
|
eServerWriteEnd,
|
eClientReadBegin,
|
eClientReadEnd = eStateReady,
|
};
|
bool StateCas(State exp, State val);
|
void Write(const Msg &msg)
|
{
|
size_.store(msg.size());
|
memcpy(buf, msg.data(), msg.size());
|
}
|
void Read(Msg &msg) { msg.assign(buf, size_.load()); }
|
|
const uint32_t capacity_;
|
std::atomic<State> state_;
|
static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
|
std::atomic<Duration> timestamp_;
|
std::atomic<int32_t> size_;
|
char buf[4];
|
};
|
|
template <bool isRobust = false>
|
class CasMutex
|
{
|
static pid_t pid()
|
{
|
static pid_t val = getpid();
|
return val;
|
}
|
static bool Killed(pid_t pid)
|
{
|
char buf[64] = {0};
|
snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
|
return access(buf, F_OK) != 0;
|
}
|
|
public:
|
CasMutex() :
|
meta_(0) {}
|
int try_lock()
|
{
|
const auto t = steady_clock::now().time_since_epoch().count();
|
auto old = meta_.load();
|
int r = 0;
|
if (!Locked(old)) {
|
r = MetaCas(old, Meta(1, pid()));
|
} else if (isRobust && Killed(Pid(old))) {
|
r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
|
if (r) {
|
printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
|
}
|
}
|
return r;
|
}
|
int lock()
|
{
|
int r = 0;
|
do {
|
r = try_lock();
|
} while (r == 0);
|
return r;
|
}
|
void unlock()
|
{
|
auto old = meta_.load();
|
if (Locked(old) && Pid(old) == pid()) {
|
MetaCas(old, Meta(0, pid()));
|
}
|
}
|
|
private:
|
std::atomic<uint64_t> meta_;
|
bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
|
pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
|
uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
|
bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
|
static_assert(sizeof(pid_t) < sizeof(uint64_t));
|
};
|
|
template <class Lock>
|
class Guard
|
{
|
public:
|
Guard(Lock &l) :
|
l_(l) { l_.lock(); }
|
~Guard() { l_.unlock(); }
|
|
private:
|
Guard(const Guard &);
|
Guard(Guard &&);
|
Lock &l_;
|
};
|
|
template <class D, class Alloc = std::allocator<D>>
|
class CircularBuffer
|
{
|
typedef uint32_t size_type;
|
typedef uint32_t count_type;
|
typedef uint64_t meta_type;
|
static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
|
static count_type Count(meta_type meta) { return meta >> 32; }
|
static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
|
|
public:
|
typedef D Data;
|
|
CircularBuffer(const size_type cap) :
|
CircularBuffer(cap, Alloc()) {}
|
CircularBuffer(const size_type cap, Alloc const &al) :
|
state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
|
{
|
if (!buf) {
|
throw("error allocate buffer: out of mem!");
|
}
|
}
|
~CircularBuffer()
|
{
|
al_.deallocate(buf, capacity_);
|
}
|
size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
|
bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
|
bool empty() const { return head() == tail(); }
|
bool push_back(Data d)
|
{
|
Guard<MutexT> guard(mutex_);
|
if (!full()) {
|
auto old = mtail();
|
buf[Pos(old)] = d;
|
return mtail_.compare_exchange_strong(old, next(old));
|
} else {
|
return false;
|
}
|
}
|
bool pop_front(Data &d)
|
{
|
Guard<MutexT> guard(mutex_);
|
if (!empty()) {
|
auto old = mhead();
|
d = buf[Pos(old)];
|
return mhead_.compare_exchange_strong(old, next(old));
|
} else {
|
return false;
|
}
|
}
|
bool Ready() const { return state_.load() == eStateReady; }
|
void PutReady() { state_.store(eStateReady); }
|
|
private:
|
CircularBuffer(const CircularBuffer &);
|
CircularBuffer(CircularBuffer &&);
|
CircularBuffer &operator=(const CircularBuffer &) = delete;
|
CircularBuffer &operator=(CircularBuffer &&) = delete;
|
typedef CasMutex<true> MutexT;
|
// static_assert(sizeof(MutexT) == 16);
|
meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
|
size_type head() const { return Pos(mhead()); }
|
size_type tail() const { return Pos(mtail()); }
|
meta_type mhead() const { return mhead_.load(); }
|
meta_type mtail() const { return mtail_.load(); }
|
// data
|
enum { eStateReady = 0x19833891 };
|
std::atomic<uint32_t> state_;
|
const size_type capacity_;
|
MutexT mutex_;
|
std::atomic<meta_type> mhead_;
|
std::atomic<meta_type> mtail_;
|
Alloc al_;
|
typename Alloc::pointer buf = nullptr;
|
};
|
|
} // namespace robust
|
#endif // end of include guard: ROBUST_Q31RCWYU
|