liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/robust.h
@@ -21,75 +21,16 @@
#include "bh_util.h"
#include "log.h"
#include <string.h>
#include <atomic>
#include <chrono>
#include <unistd.h>
namespace robust
{
/*
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
public:
   typedef uint32_t size_type;
   typedef Int Data;
   typedef std::atomic<Data> AData;
   static_assert(sizeof(Data) == sizeof(AData));
   enum {
      power = PowerSize,
      capacity = (1 << power),
      mask = capacity - 1,
   };
   AtomicQueue() { memset(this, 0, sizeof(*this)); }
   size_type head() const { return head_.load(); }
   size_type tail() const { return tail_.load(); }
   bool push(const Data d, bool try_more = false)
   {
      bool r = false;
      size_type i = 0;
      do {
         auto pos = tail();
         if (tail_.compare_exchange_strong(pos, Next(pos))) {
            auto cur = buf[pos].load();
            r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
         }
      } while (try_more && !r && ++i < capacity);
      return r;
   }
   bool pop(Data &d, bool try_more = false)
   {
      bool r = false;
      Data cur;
      size_type i = 0;
      do {
         auto pos = head();
         if (head_.compare_exchange_strong(pos, Next(pos))) {
            cur = buf[pos].load();
            r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
         }
      } while (try_more && !r && ++i < capacity);
      if (r) { d = Dec(cur); }
      return r;
   }
private:
   static_assert(std::is_integral<Data>::value, "Data must be integral type!");
   static_assert(std::is_signed<Data>::value, "Data must be signed type!");
   static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
   static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
   static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   static size_type Next(const size_type index) { return (index + 1) & mask; }
   std::atomic<size_type> head_;
   std::atomic<size_type> tail_;
   AData buf[capacity];
};
//*/
// atomic queue, length is 1.
// lowest bit is used for data flag, 63 bit for data.
class AtomicQ63
{
public:
@@ -114,11 +55,12 @@
   static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
   typedef std::atomic<Data> AData;
   static_assert(sizeof(Data) == sizeof(AData));
   // static_assert(sizeof(Data) == sizeof(AData));
   AData buf;
};
// atomic request-reply process, one cycle a time.
class AtomicReqRep
{
public:
@@ -126,6 +68,8 @@
   typedef std::function<Data(const Data)> Handler;
   bool ClientRequest(const Data request, Data &reply);
   bool ServerProcess(Handler onReq);
   AtomicReqRep() :
       data_(0), timestamp_(now()) {}
private:
   enum State {
@@ -138,7 +82,7 @@
   static Data Decode(Data d) { return d >> 3; }
   typedef std::chrono::steady_clock steady_clock;
   typedef steady_clock::duration Duration;
   Duration now() { return steady_clock::now().time_since_epoch(); }
   static Duration now() { return steady_clock::now().time_since_epoch(); }
   bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
   std::atomic<Data> data_;