From a6f67b4249525089fb97eb9418c7014f66c2a000 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 28 四月 2021 19:29:17 +0800
Subject: [PATCH] use new robust mutex, circurar; rm timeout mutex.
---
utest/speed_test.cpp | 4
src/robust.h | 255 +++++++++++++++
src/shm.h | 88 +++--
utest/api_test.cpp | 163 +--------
utest/simple_tests.cpp | 4
utest/util.h | 1
src/shm_queue.h | 131 +++-----
utest/robust_test.cpp | 115 +++++++
utest/utest.cpp | 4
src/shm.cpp | 36 --
src/robust.cpp | 111 ++++++
11 files changed, 609 insertions(+), 303 deletions(-)
diff --git a/src/robust.cpp b/src/robust.cpp
new file mode 100644
index 0000000..38d5d28
--- /dev/null
+++ b/src/robust.cpp
@@ -0,0 +1,111 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: robust.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�27鏃� 10鏃�04鍒�19绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "robust.h"
+#include <chrono>
+#include <thread>
+
+namespace robust
+{
+
+namespace
+{
+static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
+static_assert(sizeof(RobustReqRep) == 24);
+static_assert(sizeof(CasMutex<false>) == 8);
+static_assert(sizeof(CircularBuffer<int>) == 48);
+
+auto Now() { return steady_clock::now().time_since_epoch(); }
+const steady_clock::duration kIoTimeout = 10ms;
+const steady_clock::duration kIoExpire = 100ms;
+
+void Yield() { std::this_thread::sleep_for(10us); }
+} // namespace
+
+void QuickSleep()
+{
+ Yield();
+}
+bool RobustReqRep::StateCas(State exp, State val)
+{
+ bool r = state_.compare_exchange_strong(exp, val);
+ return r ? (timestamp_.store(Now()), true) : false;
+}
+
+int RobustReqRep::ClientReadReply(Msg &reply)
+{
+ auto end_time = Now() + kIoTimeout;
+ int done = false;
+ do {
+ if (StateCas(eServerWriteEnd, eClientReadBegin)) {
+ Read(reply);
+ done = StateCas(eClientReadBegin, eClientReadEnd);
+ if (done) { break; }
+ }
+ Yield();
+ } while (Now() < end_time);
+ return done ? eSuccess : eTimeout;
+}
+
+int RobustReqRep::ClientWriteRequest(const Msg &request)
+{
+ if (request.size() > capacity_) {
+ return eSizeError;
+ }
+ auto end_time = Now() + kIoTimeout;
+ bool done = false;
+ do {
+ if (StateCas(eStateReady, eClientWriteBegin)) {
+ Write(request);
+ done = StateCas(eClientWriteBegin, eClientWriteEnd);
+ if (done) { break; }
+ }
+ Yield();
+ } while (Now() < end_time);
+ return done ? eSuccess : eTimeout;
+}
+
+int RobustReqRep::ServerReadRequest(Msg &request)
+{
+ bool done = false;
+ if (StateCas(eClientWriteEnd, eServerReadBegin)) {
+ Read(request);
+ done = StateCas(eServerReadBegin, eServerReadEnd);
+ } else {
+ auto old = state_.load();
+ if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
+ StateCas(old, eStateReady);
+ }
+ }
+ return done ? eSuccess : eTimeout;
+}
+
+int RobustReqRep::ServerWriteReply(const Msg &reply)
+{
+ if (reply.size() > capacity_) {
+ return eSizeError;
+ }
+ // no need to loop write, either success or timeout.
+ bool done = false;
+ if (StateCas(eServerReadEnd, eServerWriteBegin)) {
+ Write(reply);
+ done = StateCas(eServerWriteBegin, eServerWriteEnd);
+ }
+ return done ? eSuccess : eTimeout;
+}
+
+} // namespace robust
\ No newline at end of file
diff --git a/src/robust.h b/src/robust.h
new file mode 100644
index 0000000..19e9bda
--- /dev/null
+++ b/src/robust.h
@@ -0,0 +1,255 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: robust.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�27鏃� 10鏃�04鍒�29绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#ifndef ROBUST_Q31RCWYU
+#define ROBUST_Q31RCWYU
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <string.h>
+#include <string>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace robust
+{
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+void QuickSleep();
+
+class RobustReqRep
+{
+ typedef uint32_t State;
+ typedef std::string Msg;
+ typedef std::chrono::steady_clock::duration Duration;
+
+public:
+ enum ErrorCode {
+ eSuccess = 0,
+ eTimeout = EAGAIN,
+ eSizeError = EINVAL,
+ };
+
+ explicit RobustReqRep(const uint32_t max_len) :
+ capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
+
+ void PutReady() { state_.store(eStateReady); }
+ bool Ready() const { return state_.load() == eStateReady; }
+ uint32_t capacity() const { return capacity_; }
+
+ int ClientRequest(const Msg &request, Msg &reply)
+ {
+ int r = ClientWriteRequest(request);
+ if (r == eSuccess) {
+ r = ClientReadReply(reply);
+ }
+ return r;
+ }
+ int ClientReadReply(Msg &reply);
+ int ClientWriteRequest(const Msg &request);
+ int ServerReadRequest(Msg &request);
+ int ServerWriteReply(const Msg &reply);
+
+private:
+ RobustReqRep(const RobustReqRep &);
+ RobustReqRep(RobustReqRep &&);
+ RobustReqRep &operator=(const RobustReqRep &) = delete;
+ RobustReqRep &operator=(RobustReqRep &&) = delete;
+
+ enum {
+ eStateInit = 0,
+ eStateReady = 0x19833891,
+ eClientWriteBegin,
+ eClientWriteEnd,
+ eServerReadBegin,
+ eServerReadEnd,
+ eServerWriteBegin,
+ eServerWriteEnd,
+ eClientReadBegin,
+ eClientReadEnd = eStateReady,
+ };
+ bool StateCas(State exp, State val);
+ void Write(const Msg &msg)
+ {
+ size_.store(msg.size());
+ memcpy(buf, msg.data(), msg.size());
+ }
+ void Read(Msg &msg) { msg.assign(buf, size_.load()); }
+
+ const uint32_t capacity_;
+ std::atomic<State> state_;
+ static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
+ std::atomic<Duration> timestamp_;
+ std::atomic<int32_t> size_;
+ char buf[4];
+};
+
+template <bool isRobust = false>
+class CasMutex
+{
+ static pid_t pid()
+ {
+ static pid_t val = getpid();
+ return val;
+ }
+ static bool Killed(pid_t pid)
+ {
+ char buf[64] = {0};
+ snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
+ return access(buf, F_OK) != 0;
+ }
+
+public:
+ CasMutex() :
+ meta_(0) {}
+ int try_lock()
+ {
+ const auto t = steady_clock::now().time_since_epoch().count();
+ auto old = meta_.load();
+ int r = 0;
+ if (!Locked(old)) {
+ r = MetaCas(old, Meta(1, pid()));
+ } else if (isRobust && Killed(Pid(old))) {
+ r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
+ if (r) {
+ printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
+ }
+ }
+ return r;
+ }
+ int lock()
+ {
+ int r = 0;
+ do {
+ r = try_lock();
+ } while (r == 0);
+ return r;
+ }
+ void unlock()
+ {
+ auto old = meta_.load();
+ if (Locked(old) && Pid(old) == pid()) {
+ MetaCas(old, Meta(0, pid()));
+ }
+ }
+
+private:
+ std::atomic<uint64_t> meta_;
+ bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
+ pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
+ uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
+ bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
+ static_assert(sizeof(pid_t) < sizeof(uint64_t));
+};
+
+template <class Lock>
+class Guard
+{
+public:
+ Guard(Lock &l) :
+ l_(l) { l_.lock(); }
+ ~Guard() { l_.unlock(); }
+
+private:
+ Guard(const Guard &);
+ Guard(Guard &&);
+ Lock &l_;
+};
+
+template <class D, class Alloc = std::allocator<D>>
+class CircularBuffer
+{
+ typedef uint32_t size_type;
+ typedef uint32_t count_type;
+ typedef uint64_t meta_type;
+ static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
+ static count_type Count(meta_type meta) { return meta >> 32; }
+ static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
+
+public:
+ typedef D Data;
+
+ CircularBuffer(const size_type cap) :
+ CircularBuffer(cap, Alloc()) {}
+ CircularBuffer(const size_type cap, Alloc const &al) :
+ state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
+ {
+ if (!buf) {
+ throw("error allocate buffer: out of mem!");
+ }
+ }
+ ~CircularBuffer()
+ {
+ al_.deallocate(buf, capacity_);
+ }
+ size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
+ bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
+ bool empty() const { return head() == tail(); }
+ bool push_back(Data d)
+ {
+ Guard<MutexT> guard(mutex_);
+ if (!full()) {
+ auto old = mtail();
+ buf[Pos(old)] = d;
+ return mtail_.compare_exchange_strong(old, next(old));
+ } else {
+ return false;
+ }
+ }
+ bool pop_front(Data &d)
+ {
+ Guard<MutexT> guard(mutex_);
+ if (!empty()) {
+ auto old = mhead();
+ d = buf[Pos(old)];
+ return mhead_.compare_exchange_strong(old, next(old));
+ } else {
+ return false;
+ }
+ }
+ bool Ready() const { return state_.load() == eStateReady; }
+ void PutReady() { state_.store(eStateReady); }
+
+private:
+ CircularBuffer(const CircularBuffer &);
+ CircularBuffer(CircularBuffer &&);
+ CircularBuffer &operator=(const CircularBuffer &) = delete;
+ CircularBuffer &operator=(CircularBuffer &&) = delete;
+ typedef CasMutex<true> MutexT;
+ // static_assert(sizeof(MutexT) == 16);
+ meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
+ size_type head() const { return Pos(mhead()); }
+ size_type tail() const { return Pos(mtail()); }
+ meta_type mhead() const { return mhead_.load(); }
+ meta_type mtail() const { return mtail_.load(); }
+ // data
+ enum { eStateReady = 0x19833891 };
+ std::atomic<uint32_t> state_;
+ const size_type capacity_;
+ MutexT mutex_;
+ std::atomic<meta_type> mhead_;
+ std::atomic<meta_type> mtail_;
+ Alloc al_;
+ typename Alloc::pointer buf = nullptr;
+};
+
+} // namespace robust
+#endif // end of include guard: ROBUST_Q31RCWYU
diff --git a/src/shm.cpp b/src/shm.cpp
index 6d7dccd..1658900 100644
--- a/src/shm.cpp
+++ b/src/shm.cpp
@@ -21,42 +21,6 @@
namespace bhome_shm
{
-bool MutexWithTimeLimit::try_lock()
-{
- if (mutex_.try_lock()) {
- auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- return last_lock_time_.compare_exchange_strong(old_time, Now());
- } else {
- last_lock_time_.store(Now());
- return true;
- }
- } else {
- auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- return last_lock_time_.compare_exchange_strong(old_time, Now());
- } else {
- return false;
- }
- }
-}
-void MutexWithTimeLimit::lock()
-{
- while (!try_lock()) {
- std::this_thread::yield();
- }
-}
-void MutexWithTimeLimit::unlock()
-{
- auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- } else {
- if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
- mutex_.unlock();
- }
- }
-}
-
SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
name_(name)
diff --git a/src/shm.h b/src/shm.h
index 7773ceb..17352fe 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,13 +19,11 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
+#include "robust.h"
#include <atomic>
#include <boost/interprocess/managed_shared_memory.hpp>
-#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
-#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
-#include <chrono>
#include <thread>
namespace bhome_shm
@@ -35,53 +33,65 @@
typedef managed_shared_memory mshm_t;
-class CasMutex
-{
- std::atomic<bool> flag_;
- bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); }
-
-public:
- CasMutex() :
- flag_(false) {}
- bool try_lock() { return cas(false, true); }
- void lock()
- {
- while (!try_lock()) { std::this_thread::yield(); }
- }
- void unlock() { cas(true, false); }
-};
-
-class MutexWithTimeLimit
+class MutexWithPidCheck
{
typedef boost::interprocess::interprocess_mutex MutexT;
- // typedef CasMutex MutexT;
- typedef std::chrono::steady_clock Clock;
- typedef Clock::duration Duration;
- static Duration Now() { return Clock::now().time_since_epoch(); }
-
- const Duration limit_;
- std::atomic<Duration> last_lock_time_;
+ static pid_t pid()
+ {
+ static pid_t val = getpid();
+ return val;
+ }
+ static bool Killed(pid_t pid)
+ {
+ char buf[64] = {0};
+ snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
+ return access(buf, F_OK) != 0;
+ }
+ bool PidCas(pid_t exp, pid_t val) { return pid_.compare_exchange_strong(exp, val); }
MutexT mutex_;
+ std::atomic<pid_t> pid_;
public:
typedef MutexT::internal_mutex_type internal_mutex_type;
const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); }
internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); }
+ MutexWithPidCheck() :
+ pid_(0) {}
+ bool try_lock()
+ {
+ bool r = false;
+ if (mutex_.try_lock()) {
+ auto old = pid_.load();
+ r = PidCas(old, pid());
+ } else {
+ auto old = pid_.load();
+ if (Killed(old)) {
+ r = PidCas(old, pid());
+ if (r) {
+ printf("PidCheck captured pid %d -> %d\n", old, pid());
+ }
+ }
+ }
+ return r;
+ }
- explicit MutexWithTimeLimit(Duration limit) :
- limit_(limit) {}
- MutexWithTimeLimit() :
- MutexWithTimeLimit(std::chrono::seconds(1)) {}
- ~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); }
- bool try_lock();
- void lock();
- void unlock();
+ void lock()
+ {
+ while (!try_lock()) {
+ std::this_thread::yield();
+ }
+ }
+ void unlock()
+ {
+ auto old = pid_.load();
+ if (old == pid()) {
+ mutex_.unlock();
+ }
+ }
};
-// typedef boost::interprocess::interprocess_mutex Mutex;
-typedef MutexWithTimeLimit Mutex;
-typedef scoped_lock<Mutex> Guard;
-typedef interprocess_condition Cond;
+typedef robust::CasMutex<true> Mutex;
+typedef robust::Guard<Mutex> Guard;
class SharedMemory : public mshm_t
{
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 5dbda96..11f9893 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -21,105 +21,70 @@
#include "shm.h"
#include <atomic>
-#include <boost/circular_buffer.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <chrono>
namespace bhome_shm
{
template <class D>
-using Circular = boost::circular_buffer<D, Allocator<D>>;
+using Circular = robust::CircularBuffer<D, Allocator<D>>;
+
template <class D>
-class SharedQueue : private Circular<D>
+class SharedQueue
{
- typedef Circular<D> Super;
- Mutex mutex_;
- Cond cond_read_;
- Cond cond_write_;
- Mutex &mutex() { return mutex_; }
-
- static boost::posix_time::ptime MSFromNow(const int ms)
- {
- using namespace boost::posix_time;
- ptime cur = boost::posix_time::microsec_clock::universal_time();
- return cur + millisec(ms);
- }
-
- auto TimedReadPred(const int timeout_ms)
- {
- auto endtime = MSFromNow(timeout_ms);
- return [this, endtime](Guard &lock) {
- return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); }));
- };
- }
- auto TryReadPred()
- {
- return [this](Guard &lock) { return !this->empty(); };
- }
-
- template <class Pred>
- bool ReadOnCond(D &buf, Pred const &pred)
- {
- auto Read = [&]() {
- Guard lock(this->mutex());
- if (pred(lock)) {
- using std::swap;
- swap(buf, Super::front());
- Super::pop_front();
- return true;
- } else {
- return false;
- }
- };
- return Read() ? (this->cond_write_.notify_one(), true) : false;
- }
-
- template <class Iter, class Pred, class OnWrite>
- int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite)
- {
- if (begin == end) { return 0; }
-
- int n = 0;
- Guard lock(mutex());
- while (pred(lock)) {
- onWrite(*begin);
- Super::push_back(*begin);
- ++n;
- cond_read_.notify_one();
- if (++begin == end) {
- break;
- }
- }
- return n;
- }
-
public:
SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
- Super(len, alloc) {}
-
- template <class Iter, class OnWrite>
- int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
- {
- auto tryWritePred = [this](Guard &lock) { return !this->full(); };
- return WriteAllOnCond(begin, end, tryWritePred, onWrite);
- }
+ queue_(len, alloc) {}
template <class OnWrite>
- bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); }
-
- bool TryWrite(const D &buf)
+ bool TryWrite(const D &d, const OnWrite &onWrite)
{
- return TryWrite(buf, [](const D &buf) {});
+ Guard lock(mutex());
+ if (!queue_.full()) {
+ onWrite(d);
+ queue_.push_back(d);
+ return true;
+ } else {
+ return false;
+ }
}
- template <class OnData>
- int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); }
- template <class OnData>
- int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); }
+ bool TryWrite(const D &d)
+ {
+ Guard lock(mutex());
+ return !queue_.full() ? (queue_.push_back(d), true) : false;
+ }
- bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
- bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
+ bool Read(D &d, const int timeout_ms)
+ {
+ using namespace std::chrono;
+ auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+ do {
+ if (TryRead(d)) {
+ return true;
+ } else {
+ robust::QuickSleep();
+ }
+ } while (steady_clock::now() < end_time);
+ return false;
+ }
+ bool TryRead(D &d)
+ {
+ Guard lock(mutex());
+ if (!queue_.empty()) {
+ queue_.pop_front(d);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+private:
+ typedef Circular<D> Queue;
+ Queue queue_;
+ Mutex mutex_;
+ Mutex &mutex() { return mutex_; }
};
} // namespace bhome_shm
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 6682aaf..6577b51 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -16,6 +16,7 @@
* =====================================================================================
*/
#include "bh_api.h"
+#include "robust.h"
#include "util.h"
#include <atomic>
#include <boost/lockfree/queue.hpp>
@@ -96,138 +97,22 @@
// printf("client Recv reply : %s\n", reply.data().c_str());
}
-class TLMutex
-{
- typedef boost::interprocess::interprocess_mutex MutexT;
- // typedef CasMutex MutexT;
- // typedef std::mutex MutexT;
- typedef std::chrono::steady_clock Clock;
- typedef Clock::duration Duration;
- static Duration Now() { return Clock::now().time_since_epoch(); }
-
- const Duration limit_;
- std::atomic<Duration> last_lock_time_;
- MutexT mutex_;
- bool Expired(const Duration diff) { return diff > limit_; }
-
-public:
- struct Status {
- int64_t nlock_ = 0;
- int64_t nupdate_time_fail = 0;
- int64_t nfail = 0;
- int64_t nexcept = 0;
- };
- Status st_;
-
- explicit TLMutex(Duration limit) :
- limit_(limit) {}
- TLMutex() :
- TLMutex(std::chrono::seconds(1)) {}
- ~TLMutex() { static_assert(std::is_pod<Duration>::value); }
- bool try_lock()
- {
- if (mutex_.try_lock()) {
- auto old_time = last_lock_time_.load();
- auto cur = Now();
- if (Expired(cur - old_time)) {
- return last_lock_time_.compare_exchange_strong(old_time, cur);
- } else {
- last_lock_time_.store(Now());
- return true;
- }
- } else {
- auto old_time = last_lock_time_.load();
- auto cur = Now();
- if (Expired(cur - old_time)) {
- return last_lock_time_.compare_exchange_strong(old_time, cur);
- } else {
- return false;
- }
- }
- }
- void lock()
- {
- int n = 0;
- while (!try_lock()) {
- n++;
- std::this_thread::yield();
- }
- st_.nlock_ += n;
- }
- void unlock()
- {
- auto old_time = last_lock_time_.load();
- auto cur = Now();
- if (!Expired(cur - old_time)) {
- if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
- mutex_.unlock();
- }
- }
- }
-};
-
-//robust attr does NOT work, maybe os does not support it.
-class RobustMutex
-{
-public:
- RobustMutex()
- {
- pthread_mutexattr_t mutex_attr;
- auto attr = [&]() { return &mutex_attr; };
- int r = pthread_mutexattr_init(attr());
- r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
- r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
- r |= pthread_mutex_init(mtx(), attr());
- int rob = 0;
- pthread_mutexattr_getrobust_np(attr(), &rob);
- int shared = 0;
- pthread_mutexattr_getpshared(attr(), &shared);
- printf("robust : %d, shared : %d\n", rob, shared);
- r |= pthread_mutexattr_destroy(attr());
- if (r) {
- throw("init mutex error.");
- }
- }
- ~RobustMutex()
- {
- pthread_mutex_destroy(mtx());
- }
-
-public:
- void lock() { Lock(); }
- bool try_lock()
- {
- int r = TryLock();
- printf("TryLock ret: %d\n", r);
- return r == 0;
- }
-
- void unlock() { Unlock(); }
-
- // private:
- int TryLock() { return pthread_mutex_trylock(mtx()); }
- int Lock() { return pthread_mutex_lock(mtx()); }
- int Unlock() { return pthread_mutex_unlock(mtx()); }
-
-private:
- pthread_mutex_t *mtx() { return &mutex_; }
- pthread_mutex_t mutex_;
-};
-
-class LockFreeQueue
-{
- typedef int64_t Data;
- typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
- void push_back(Data d) { queue_.push(d); }
-
-private:
- LFQueue queue_;
-};
-
} // namespace
+#include <chrono>
+using namespace std::chrono;
+// using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(MutexTest)
{
+ // typedef robust::CasMutex<true> RobustMutex;
+ typedef MutexWithPidCheck RobustMutex;
+
+ for (int i = 0; i < 20; ++i) {
+ int size = i;
+ int left = size & 7;
+ int rsize = size + ((8 - left) & 7);
+ printf("size: %3d, rsize: %3d\n", size, rsize);
+ }
SharedMemory &shm = TestShm();
// shm.Remove();
// return;
@@ -235,7 +120,7 @@
const std::string mtx_name("test_mutex");
const std::string int_name("test_int");
- auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
+ auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
auto pi = shm.FindOrCreate<int>(int_name, 100);
std::mutex m;
@@ -248,11 +133,11 @@
{
boost::timer::auto_cpu_timer timer;
- printf("test time: ");
- TLMutex mutex;
- // CasMutex mutex;
+ const int ntimes = 1000 * 1000;
+ printf("test lock/unlock %d times: ", ntimes);
+ RobustMutex mutex;
auto Lock = [&]() {
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < ntimes; ++i) {
mutex.lock();
mutex.unlock();
}
@@ -260,11 +145,6 @@
std::thread t1(Lock), t2(Lock);
t1.join();
t2.join();
- printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
- mutex.st_.nlock_,
- mutex.st_.nupdate_time_fail,
- mutex.st_.nfail,
- mutex.st_.nexcept);
}
auto MSFromNow = [](const int ms) {
@@ -487,10 +367,13 @@
threads.Launch(hb, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
- const uint64_t nreq = 1000 * 10;
+ const uint64_t nreq = 1000 * 100;
for (int i = 0; i < ncli; ++i) {
threads.Launch(asyncRequest, nreq);
}
+ // for (int i = 0; i < 100; ++i) {
+ // SyncRequest(0);
+ // }
int same = 0;
int64_t last = 0;
@@ -509,4 +392,6 @@
threads.WaitAll();
auto &st = Status();
printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
+ BHCleanup();
+ printf("after cleanup\n");
}
\ No newline at end of file
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
new file mode 100644
index 0000000..0d54b46
--- /dev/null
+++ b/utest/robust_test.cpp
@@ -0,0 +1,115 @@
+#include "robust.h"
+#include "util.h"
+
+using namespace robust;
+
+typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
+Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
+{
+ int cap = nelem + 1;
+ typedef uint64_t Data;
+ auto size = sizeof(Rcb) + sizeof(Data) * cap;
+ void *p = shm.Alloc(size);
+ if (p) {
+ return new (p) Rcb(cap, shm.get_segment_manager());
+ }
+ return nullptr;
+}
+Rcb *GetRCB(SharedMemory &shm, const int nelem)
+{
+ void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
+ if (pStore) {
+ if (!*pStore) {
+ *pStore = GetRCBImpl(shm, nelem);
+ }
+ return (Rcb *) *pStore;
+ }
+ return nullptr;
+}
+
+void MySleep()
+{
+ std::this_thread::sleep_for(2us);
+}
+
+BOOST_AUTO_TEST_CASE(RobustTest)
+{
+ SharedMemory &shm = TestShm();
+ shm.Remove();
+ pid_t pid = getpid();
+ printf("pid : %d\n", pid);
+ auto Access = [](pid_t pid) {
+ char buf[100] = {0};
+ sprintf(buf, "/proc/%d/stat", pid);
+ int r = access(buf, F_OK);
+ printf("access %d\n", r);
+ };
+ Access(pid);
+ Access(pid + 1);
+ // Sleep(10s);
+ // return;
+
+ int nelement = 640;
+ auto rcb = GetRCB(shm, nelement);
+ BOOST_CHECK(rcb != nullptr);
+ BOOST_CHECK(rcb->empty());
+ BOOST_CHECK(rcb->push_back(1));
+ BOOST_CHECK(rcb->size() == 1);
+ int64_t d;
+ BOOST_CHECK(rcb->pop_front(d));
+ BOOST_CHECK(rcb->empty());
+
+ const uint64_t nmsg = 1000 * 1000 * 1;
+ uint64_t correct_total = nmsg * (nmsg - 1) / 2;
+ std::atomic<uint64_t> total(0);
+ std::atomic<uint64_t> nwrite(0);
+ std::atomic<uint64_t> writedone(0);
+ auto Writer = [&]() {
+ uint64_t n = 0;
+ while ((n = nwrite++) < nmsg) {
+ while (!rcb->push_back(n)) {
+ // MySleep();
+ }
+ ++writedone;
+ }
+ };
+ std::atomic<uint64_t> nread(0);
+ auto Reader = [&]() {
+ while (nread.load() < nmsg) {
+ int64_t d;
+ if (rcb->pop_front(d)) {
+ ++nread;
+ total += d;
+ } else {
+ MySleep();
+ }
+ }
+ };
+
+ auto status = [&]() {
+ auto next = steady_clock::now();
+ uint32_t lw = 0;
+ uint32_t lr = 0;
+ do {
+ std::this_thread::sleep_until(next);
+ next += 1s;
+ auto w = writedone.load();
+ auto r = nread.load();
+ printf("write: %6ld, spd: %6ld, read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
+ lw = w;
+ lr = r;
+ } while (nread.load() < nmsg);
+ };
+
+ ThreadManager threads;
+ boost::timer::auto_cpu_timer timer;
+ printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
+ threads.Launch(status);
+ for (int i = 0; i < 10; ++i) {
+ threads.Launch(Reader);
+ threads.Launch(Writer);
+ }
+ threads.WaitAll();
+ printf("total: %ld, expected: %ld\n", total.load(), correct_total);
+ BOOST_CHECK_EQUAL(total.load(), correct_total);
+}
\ No newline at end of file
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index 33c78f5..e14a1cd 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -107,7 +107,7 @@
BOOST_AUTO_TEST_CASE(TimedWaitTest)
{
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
ShmMsgQueue q(shm, 64);
for (int i = 0; i < 2; ++i) {
int ms = i * 100;
@@ -123,7 +123,7 @@
{
SharedMemory &shm = TestShm();
typedef MsgI Msg;
- Msg::BindShm(shm);
+ GlobalInit(shm);
Msg m0(1000);
BOOST_CHECK(m0.valid());
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 302d4bd..bd455ec 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,7 +24,7 @@
{
const int mem_size = 1024 * 1024 * 50;
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
MQId id = ShmMsgQueue::NewId();
const int timeout = 1000;
@@ -122,7 +122,7 @@
const std::string server_proc_id = "server_proc";
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
diff --git a/utest/utest.cpp b/utest/utest.cpp
index d058471..d8dae45 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -90,7 +90,7 @@
BOOST_AUTO_TEST_CASE(PubSubTest)
{
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
@@ -201,7 +201,7 @@
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
SharedMemory &shm = TestShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
diff --git a/utest/util.h b/utest/util.h
index 61e5b11..a4cbbaa 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -36,6 +36,7 @@
using namespace boost::posix_time;
using namespace std::chrono_literals;
+using namespace std::chrono;
template <class D>
inline void Sleep(D d, bool print = true)
--
Gitblit v1.8.0