/*
|
* =====================================================================================
|
*
|
* Filename: robust.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月27日 10时04分29秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
|
#ifndef ROBUST_Q31RCWYU
|
#define ROBUST_Q31RCWYU
|
|
#include "bh_util.h"
|
#include "log.h"
|
#include <atomic>
|
#include <chrono>
|
#include <memory>
|
#include <mutex>
|
#include <string>
|
#include <sys/file.h>
|
#include <sys/ipc.h>
|
#include <sys/sem.h>
|
#include <sys/stat.h>
|
#include <sys/types.h>
|
#include <unistd.h>
|
|
namespace robust
|
{
|
|
using namespace std::chrono;
|
using namespace std::chrono_literals;
|
void QuickSleep();
|
|
class CasMutex
|
{
|
typedef uint64_t locker_t;
|
static inline locker_t this_locker() { return pthread_self(); }
|
static const uint64_t kLockerMask = MaskBits(63);
|
|
public:
|
CasMutex() :
|
meta_(0) {}
|
int try_lock()
|
{
|
auto old = meta_.load();
|
int r = 0;
|
if (!Locked(old)) {
|
r = MetaCas(old, Meta(1, this_locker()));
|
}
|
return r;
|
}
|
int lock()
|
{
|
int r = 0;
|
do {
|
r = try_lock();
|
} while (r == 0);
|
return r;
|
}
|
void unlock()
|
{
|
auto old = meta_.load();
|
if (Locked(old) && Locker(old) == this_locker()) {
|
MetaCas(old, Meta(0, this_locker()));
|
}
|
}
|
|
private:
|
std::atomic<uint64_t> meta_;
|
bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
|
locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
|
uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
|
bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
|
};
|
|
class NullMutex
|
{
|
public:
|
template <class... T>
|
explicit NullMutex(T &&...t) {} // easy test.
|
bool try_lock() { return true; }
|
void lock() {}
|
void unlock() {}
|
};
|
|
// flock + mutex
|
class FMutex
|
{
|
public:
|
typedef uint64_t id_t;
|
FMutex(id_t id) :
|
id_(id), fd_(Open(id_)), count_(0)
|
{
|
if (fd_ == -1) { throw "error create mutex!"; }
|
}
|
~FMutex() { Close(fd_); }
|
bool try_lock();
|
void lock();
|
void unlock();
|
|
private:
|
static std::string GetPath(id_t id)
|
{
|
const std::string dir("/tmp/.bhome_mtx");
|
mkdir(dir.c_str(), 0777);
|
return dir + "/fm_" + std::to_string(id);
|
}
|
static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
|
static int Close(int fd) { return close(fd); }
|
id_t id_;
|
int fd_;
|
std::mutex mtx_;
|
std::atomic<int32_t> count_;
|
};
|
|
union semun {
|
int val; /* Value for SETVAL */
|
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
|
unsigned short *array; /* Array for GETALL, SETALL */
|
struct seminfo *__buf; /* Buffer for IPC_INFO
|
(Linux-specific) */
|
};
|
|
class SemMutex
|
{
|
public:
|
SemMutex(key_t key) :
|
key_(key), sem_id_(semget(key, 1, 0666))
|
{
|
if (sem_id_ == -1) { throw "error create semaphore."; }
|
}
|
~SemMutex() {}
|
|
bool try_lock()
|
{
|
sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
|
return semop(sem_id_, &op, 1) == 0;
|
}
|
|
void lock()
|
{
|
sembuf op = {0, -1, SEM_UNDO};
|
semop(sem_id_, &op, 1);
|
}
|
|
void unlock()
|
{
|
sembuf op = {0, 1, SEM_UNDO};
|
semop(sem_id_, &op, 1);
|
}
|
|
private:
|
key_t key_;
|
int sem_id_;
|
};
|
|
template <class Lock>
|
class Guard
|
{
|
public:
|
Guard(Lock &l) :
|
l_(l) { l_.lock(); }
|
~Guard() { l_.unlock(); }
|
|
private:
|
Guard(const Guard &);
|
Guard(Guard &&);
|
Lock &l_;
|
};
|
|
template <unsigned PowerSize = 4, class Int = int64_t>
|
class AtomicQueue
|
{
|
public:
|
typedef uint32_t size_type;
|
typedef Int Data;
|
typedef std::atomic<Data> AData;
|
static_assert(sizeof(Data) == sizeof(AData));
|
enum {
|
power = PowerSize,
|
capacity = (1 << power),
|
mask = capacity - 1,
|
};
|
|
AtomicQueue() { memset(this, 0, sizeof(*this)); }
|
size_type head() const { return head_.load(); }
|
size_type tail() const { return tail_.load(); }
|
bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
|
bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
|
bool push(const Data d, bool try_more = false)
|
{
|
bool r = false;
|
size_type i = 0;
|
do {
|
auto pos = tail();
|
if (tail_.compare_exchange_strong(pos, Next(pos))) {
|
auto cur = buf[pos].load();
|
r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
|
}
|
} while (try_more && !r && ++i < capacity);
|
return r;
|
}
|
bool pop(Data &d, bool try_more = false)
|
{
|
bool r = false;
|
Data cur;
|
size_type i = 0;
|
do {
|
auto pos = head();
|
if (head_.compare_exchange_strong(pos, Next(pos))) {
|
cur = buf[pos].load();
|
r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
|
}
|
} while (try_more && !r && ++i < capacity);
|
if (r) { d = Dec(cur); }
|
return r;
|
}
|
|
private:
|
static_assert(std::is_integral<Data>::value, "Data must be integral type!");
|
static_assert(std::is_signed<Data>::value, "Data must be signed type!");
|
static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
|
|
static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
|
static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok.
|
static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok.
|
static size_type Next(const size_type index) { return (index + 1) & mask; }
|
|
std::atomic<size_type> head_;
|
std::atomic<size_type> tail_;
|
AData buf[capacity];
|
};
|
|
template <class Int>
|
class AtomicQueue<0, Int>
|
{
|
typedef Int Data;
|
typedef std::atomic<Data> AData;
|
static_assert(sizeof(Data) == sizeof(AData));
|
|
public:
|
AtomicQueue() { memset(this, 0, sizeof(*this)); }
|
bool push(const Data d, bool try_more = false)
|
{
|
auto cur = buf.load();
|
return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
|
}
|
bool pop(Data &d, bool try_more = false)
|
{
|
Data cur = buf.load();
|
bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
|
if (r) { d = Dec(cur); }
|
return r;
|
}
|
uint32_t head() const { return 0; }
|
uint32_t tail() const { return 0; }
|
|
private:
|
static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
|
static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok.
|
static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok.
|
AData buf;
|
};
|
|
class AtomicReqRep
|
{
|
public:
|
typedef int64_t Data;
|
typedef std::function<Data(const Data)> Handler;
|
bool ClientRequest(const Data request, Data &reply)
|
{
|
auto end_time = now() + 3s;
|
do {
|
Data cur = data_.load();
|
if (GetState(cur) == eStateFree &&
|
DataCas(cur, Encode(request, eStateRequest))) {
|
do {
|
yield();
|
cur = data_.load();
|
if (GetState(cur) == eStateReply) {
|
DataCas(cur, Encode(0, eStateFree));
|
reply = Decode(cur);
|
return true;
|
}
|
} while (now() < end_time);
|
}
|
yield();
|
} while (now() < end_time);
|
return false;
|
}
|
|
bool ServerProcess(Handler onReq)
|
{
|
Data cur = data_.load();
|
switch (GetState(cur)) {
|
case eStateRequest:
|
if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
|
timestamp_ = now();
|
return true;
|
}
|
break;
|
case eStateReply:
|
if (timestamp_.load() + 3s < now()) {
|
DataCas(cur, Encode(0, eStateFree));
|
}
|
break;
|
case eStateFree:
|
default: break;
|
}
|
return false;
|
}
|
|
private:
|
enum State {
|
eStateFree,
|
eStateRequest,
|
eStateReply
|
};
|
static int GetState(Data d) { return d & MaskBits(3); }
|
static Data Encode(Data d, State st) { return (d << 3) | st; }
|
static Data Decode(Data d) { return d >> 3; }
|
static void yield() { QuickSleep(); }
|
typedef steady_clock::duration Duration;
|
Duration now() { return steady_clock::now().time_since_epoch(); }
|
|
bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
|
std::atomic<Data> data_;
|
std::atomic<Duration> timestamp_;
|
};
|
|
} // namespace robust
|
#endif // end of include guard: ROBUST_Q31RCWYU
|