From 28f06bc49a4d8d69f1ea2f767863b7921d12f155 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期六, 08 五月 2021 18:30:48 +0800 Subject: [PATCH] add robust FMutex, works fine; use boost circular. --- src/robust.h | 221 ++++++++++-------------- src/shm.h | 7 src/shm_msg_queue.h | 19 + src/shm_queue.h | 26 ++ utest/robust_test.cpp | 128 +++++++++++--- src/robust.cpp | 90 ++------- src/shm_msg_queue.cpp | 29 ++ 7 files changed, 273 insertions(+), 247 deletions(-) diff --git a/src/robust.cpp b/src/robust.cpp index 006ea5f..08d2073 100644 --- a/src/robust.cpp +++ b/src/robust.cpp @@ -25,87 +25,35 @@ namespace { static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); -static_assert(sizeof(RobustReqRep) == 24); -static_assert(sizeof(Mutex) == 8); -static_assert(sizeof(CircularBuffer<int>) == 48); auto Now() { return steady_clock::now().time_since_epoch(); } -const steady_clock::duration kIoTimeout = 10ms; -const steady_clock::duration kIoExpire = 100ms; - void Yield() { std::this_thread::sleep_for(10us); } + } // namespace -void QuickSleep() -{ - Yield(); -} -bool RobustReqRep::StateCas(State exp, State val) -{ - bool r = state_.compare_exchange_strong(exp, val); - return r ? (timestamp_.store(Now()), true) : false; -} +void QuickSleep() { Yield(); } -int RobustReqRep::ClientReadReply(Msg &reply) +bool FMutex::try_lock() { - auto end_time = Now() + kIoTimeout; - int done = false; - do { - if (StateCas(eServerWriteEnd, eClientReadBegin)) { - Read(reply); - done = StateCas(eClientReadBegin, eClientReadEnd); - if (done) { break; } - } - Yield(); - } while (Now() < end_time); - return done ? eSuccess : eTimeout; -} - -int RobustReqRep::ClientWriteRequest(const Msg &request) -{ - if (request.size() > capacity_) { - return eSizeError; - } - auto end_time = Now() + kIoTimeout; - bool done = false; - do { - if (StateCas(eStateReady, eClientWriteBegin)) { - Write(request); - done = StateCas(eClientWriteBegin, eClientWriteEnd); - if (done) { break; } - } - Yield(); - } while (Now() < end_time); - return done ? eSuccess : eTimeout; -} - -int RobustReqRep::ServerReadRequest(Msg &request) -{ - bool done = false; - if (StateCas(eClientWriteEnd, eServerReadBegin)) { - Read(request); - done = StateCas(eServerReadBegin, eServerReadEnd); - } else { - auto old = state_.load(); - if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) { - StateCas(old, eStateReady); + if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { + if (mtx_.try_lock()) { + return true; + } else { + flock(fd_, LOCK_UN); } } - return done ? eSuccess : eTimeout; + return false; } - -int RobustReqRep::ServerWriteReply(const Msg &reply) +void FMutex::lock() { - if (reply.size() > capacity_) { - return eSizeError; - } - // no need to loop write, either success or timeout. - bool done = false; - if (StateCas(eServerReadEnd, eServerWriteBegin)) { - Write(reply); - done = StateCas(eServerWriteBegin, eServerWriteEnd); - } - return done ? eSuccess : eTimeout; + //Note: the lock order affects performance a lot, + // locking fd_ first is about 100 times faster than locking mtx_ first. + flock(fd_, LOCK_EX); + mtx_.lock(); } - +void FMutex::unlock() +{ + mtx_.unlock(); + flock(fd_, LOCK_UN); +} } // namespace robust \ No newline at end of file diff --git a/src/robust.h b/src/robust.h index 3334bc0..d2d94e9 100644 --- a/src/robust.h +++ b/src/robust.h @@ -23,8 +23,12 @@ #include <atomic> #include <chrono> #include <memory> -#include <string.h> +#include <mutex> #include <string> +#include <sys/file.h> +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/stat.h> #include <sys/types.h> #include <unistd.h> @@ -37,143 +41,21 @@ void QuickSleep(); -class RobustReqRep -{ - typedef uint32_t State; - typedef std::string Msg; - typedef std::chrono::steady_clock::duration Duration; - -public: - enum ErrorCode { - eSuccess = 0, - eTimeout = EAGAIN, - eSizeError = EINVAL, - }; - - explicit RobustReqRep(const uint32_t max_len) : - capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} - - void PutReady() { state_.store(eStateReady); } - bool Ready() const { return state_.load() == eStateReady; } - uint32_t capacity() const { return capacity_; } - - int ClientRequest(const Msg &request, Msg &reply) - { - int r = ClientWriteRequest(request); - if (r == eSuccess) { - r = ClientReadReply(reply); - } - return r; - } - int ClientReadReply(Msg &reply); - int ClientWriteRequest(const Msg &request); - int ServerReadRequest(Msg &request); - int ServerWriteReply(const Msg &reply); - -private: - RobustReqRep(const RobustReqRep &); - RobustReqRep(RobustReqRep &&); - RobustReqRep &operator=(const RobustReqRep &) = delete; - RobustReqRep &operator=(RobustReqRep &&) = delete; - - enum { - eStateInit = 0, - eStateReady = 0x19833891, - eClientWriteBegin, - eClientWriteEnd, - eServerReadBegin, - eServerReadEnd, - eServerWriteBegin, - eServerWriteEnd, - eClientReadBegin, - eClientReadEnd = eStateReady, - }; - bool StateCas(State exp, State val); - void Write(const Msg &msg) - { - size_.store(msg.size()); - memcpy(buf, msg.data(), msg.size()); - } - void Read(Msg &msg) { msg.assign(buf, size_.load()); } - - const uint32_t capacity_; - std::atomic<State> state_; - static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); - std::atomic<Duration> timestamp_; - std::atomic<int32_t> size_; - char buf[4]; -}; - -class PidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) { return true; } -}; - -class RobustPidLocker -{ -public: - typedef int locker_t; - enum { eLockerBits = sizeof(locker_t) * 8 }; - static locker_t this_locker() - { - static locker_t val = getpid(); - return val; - } - static bool is_alive(locker_t locker) - { - char buf[64] = {0}; - snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); - return access(buf, F_OK) == 0; - } -}; - -class ExpiredLocker -{ -public: - typedef int64_t locker_t; - enum { eLockerBits = 63 }; - static locker_t this_locker() { return Now(); } - static bool is_alive(locker_t locker) - { - return Now() < locker + steady_clock::duration(10s).count(); - } - -private: - static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } -}; - -template <class LockerT> class CasMutex { - typedef typename LockerT::locker_t locker_t; - static inline locker_t this_locker() { return LockerT::this_locker(); } - static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } - static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); - static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); + typedef uint64_t locker_t; + static inline locker_t this_locker() { return pthread_self(); } + static const uint64_t kLockerMask = MaskBits(63); public: CasMutex() : meta_(0) {} int try_lock() { - const auto t = steady_clock::now().time_since_epoch().count(); auto old = meta_.load(); int r = 0; if (!Locked(old)) { r = MetaCas(old, Meta(1, this_locker())); - } else if (!is_alive(Locker(old))) { - r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; - if (r) { - LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r; - } } return r; } @@ -201,7 +83,89 @@ bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } }; -typedef CasMutex<RobustPidLocker> Mutex; +class NullMutex +{ +public: + bool try_lock() { return true; } + void lock() {} + void unlock() {} +}; + +// flock + mutex +class FMutex +{ +public: + typedef uint64_t id_t; + FMutex(id_t id) : + id_(id), fd_(Open(id_)) + { + if (fd_ == -1) { throw "error create mutex!"; } + } + ~FMutex() { Close(fd_); } + bool try_lock(); + void lock(); + void unlock(); + +private: + static std::string GetPath(id_t id) + { + const std::string dir("/tmp/.bhome_mtx"); + mkdir(dir.c_str(), 0777); + return dir + "/fm_" + std::to_string(id); + } + static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); } + static int Close(int fd) { return close(fd); } + id_t id_; + int fd_; + std::mutex mtx_; +}; + +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +class SemMutex +{ +public: + SemMutex(key_t key) : + key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT)) + { + if (sem_id_ == -1) { throw "error create semaphore."; } + union semun init_val; + init_val.val = 1; + semctl(sem_id_, 0, SETVAL, init_val); + } + ~SemMutex() + { + // semctl(sem_id_, 0, IPC_RMID, semun{}); + } + + bool try_lock() + { + sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; + return semop(sem_id_, &op, 1) == 0; + } + + void lock() + { + sembuf op = {0, -1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + + void unlock() + { + sembuf op = {0, 1, SEM_UNDO}; + semop(sem_id_, &op, 1); + } + +private: + key_t key_; + int sem_id_; +}; template <class Lock> class Guard @@ -245,7 +209,6 @@ bool push_back(const Data d) { - Guard<Mutex> guard(mutex_); auto old = mtail(); auto pos = Pos(old); auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); @@ -257,7 +220,6 @@ } bool pop_front(Data &d) { - Guard<Mutex> guard(mutex_); auto old = mhead(); auto pos = Pos(old); if (!(pos == tail())) { @@ -281,7 +243,6 @@ meta_type mtail() const { return mtail_.load(); } // data const size_type capacity_; - Mutex mutex_; std::atomic<meta_type> mhead_; std::atomic<meta_type> mtail_; Alloc al_; diff --git a/src/shm.h b/src/shm.h index b168413..269df44 100644 --- a/src/shm.h +++ b/src/shm.h @@ -19,7 +19,7 @@ #ifndef SHM_6CHO6D6C #define SHM_6CHO6D6C -#include "robust.h" +#include "log.h" #include <atomic> #include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/sync/interprocess_mutex.hpp> @@ -90,8 +90,9 @@ } }; -typedef robust::Mutex Mutex; -typedef robust::Guard<Mutex> Guard; +typedef interprocess_mutex Mutex; +typedef scoped_lock<Mutex> Guard; +// typedef robust::Guard<Mutex> Guard; class SharedMemory : public mshm_t { diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index 17558de..bc5075f 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -39,13 +39,13 @@ ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : id_(id), - queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) + queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) { } ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) : id_(id), - queue_(segment, create_or_else_find, MsgQIdToName(id_)) + queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager()) { if (!queue_.IsOk()) { throw("error create/find msgq " + std::to_string(id_)); @@ -56,6 +56,18 @@ ShmMsgQueue::~ShmMsgQueue() {} +ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) +{ + static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; + + static std::mutex mtx; + std::lock_guard<std::mutex> lock(mtx); + auto pos = imm.find(id); + if (pos == imm.end()) { + pos = imm.emplace(id, new Mutex(id)).first; + } + return *pos->second; +} bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) { Queue *q = Find(shm, id); @@ -75,14 +87,15 @@ bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) { - Queue *remote = Find(shm, remote_id); bool r = false; - if (remote) { + try { + ShmMsgQueue dest(remote_id, false, shm, 1); msg.AddRef(); - r = remote->TryWrite(msg.Offset()); - if (!r) { - msg.Release(); - } + DEFER1(if (!r) { msg.Release(); }); + + Guard lock(GetMutex(remote_id)); + r = dest.queue().TryWrite(msg.Offset()); + } catch (...) { } return r; } diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index f496f0f..f8888f3 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -26,11 +26,13 @@ class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> { - typedef ShmObject<SharedQ63<4>> Shmq; - // typedef ShmObject<SharedQueue<int64_t>> Shmq; + // typedef ShmObject<SharedQ63<4>> Shmq; + typedef ShmObject<SharedQueue<int64_t>> Shmq; typedef Shmq::ShmType ShmType; typedef Shmq::Data Queue; typedef std::function<void()> OnSend; + typedef robust::FMutex Mutex; + typedef robust::Guard<Mutex> Guard; public: typedef uint64_t MQId; @@ -45,13 +47,22 @@ MQId Id() const { return id_; } ShmType &shm() const { return queue_.shm(); } - bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); } - bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); } + bool Recv(MsgI &msg, const int timeout_ms) + { + Guard lock(GetMutex(Id())); + return queue().Read(msg.OffsetRef(), timeout_ms); + } + bool TryRecv(MsgI &msg) + { + Guard lock(GetMutex(Id())); + return queue().TryRead(msg.OffsetRef()); + } static Queue *Find(SharedMemory &shm, const MQId remote_id); static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } private: + static Mutex &GetMutex(const MQId id); MQId id_; Queue &queue() { return *queue_.data(); } Shmq queue_; diff --git a/src/shm_queue.h b/src/shm_queue.h index c7d3a23..0041f16 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -19,15 +19,18 @@ #ifndef SHM_QUEUE_JE0OEUP3 #define SHM_QUEUE_JE0OEUP3 +#include "robust.h" #include "shm.h" #include <atomic> +#include <boost/circular_buffer.hpp> #include <chrono> namespace bhome_shm { template <class D> -using Circular = robust::CircularBuffer<D, Allocator<D>>; +using Circular = boost::circular_buffer<D, Allocator<D>>; +// using Circular = robust::CircularBuffer<D, Allocator<D>>; template <class D> class SharedQueue @@ -49,8 +52,25 @@ } while (steady_clock::now() < end_time); return false; } - bool TryRead(D &d) { return queue_.pop_front(d); } - bool TryWrite(const D &d) { return queue_.push_back(d); } + bool TryRead(D &d) + { + if (!queue_.empty()) { + d = queue_.front(); + queue_.pop_front(); + return true; + } else { + return false; + } + } + bool TryWrite(const D &d) + { + if (!queue_.full()) { + queue_.push_back(d); + return true; + } else { + return false; + } + } private: Circular<D> queue_; diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp index 2b4ba96..68c0e72 100644 --- a/utest/robust_test.cpp +++ b/utest/robust_test.cpp @@ -1,5 +1,6 @@ #include "robust.h" #include "util.h" +#include <boost/circular_buffer.hpp> using namespace robust; @@ -18,7 +19,7 @@ BOOST_AUTO_TEST_CASE(QueueTest) { const int nthread = 100; - const uint64_t nmsg = 1000 * 1000 * 100; + const uint64_t nmsg = 1000 * 1000 * 10; SharedMemory &shm = TestShm(); shm.Remove(); @@ -33,7 +34,12 @@ BOOST_CHECK_EQUAL((u64 & 255), i); } -#if 1 + uint64_t correct_total = nmsg * (nmsg - 1) / 2; + std::atomic<uint64_t> total(0); + std::atomic<uint64_t> nwrite(0); + std::atomic<uint64_t> writedone(0); + +#if 0 typedef AtomicQueue<4> Rcb; Rcb tmp; @@ -48,18 +54,8 @@ BOOST_CHECK(tmp.tail() == 1); ShmObject<Rcb> rcb(shm, "test_rcb"); -#else - typedef Circular<int64_t> Rcb; - ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager()); -#endif + bool try_more = true; - const int nsize = sizeof(Rcb); - - bool try_more = false; - uint64_t correct_total = nmsg * (nmsg - 1) / 2; - std::atomic<uint64_t> total(0); - std::atomic<uint64_t> nwrite(0); - std::atomic<uint64_t> writedone(0); auto Writer = [&]() { uint64_t n = 0; while ((n = nwrite++) < nmsg) { @@ -82,6 +78,58 @@ } }; +#else + typedef Circular<int64_t> Rcb; + ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager()); + + typedef FMutex Mutex; + // typedef SemMutex Mutex; + Mutex mtx(123); + auto Writer = [&]() { + uint64_t n = 0; + while ((n = nwrite++) < nmsg) { + auto Write = [&]() { + robust::Guard<Mutex> lk(mtx); + if (rcb->full()) { + return false; + } else { + rcb->push_back(n); + return true; + } + // return rcb->push_back(n); + }; + while (!Write()) { + // MySleep(); + } + ++writedone; + } + }; + std::atomic<uint64_t> nread(0); + auto Reader = [&]() { + while (nread.load() < nmsg) { + int64_t d; + auto Read = [&]() { + robust::Guard<Mutex> lk(mtx); + if (rcb->empty()) { + return false; + } else { + d = rcb->front(); + rcb->pop_front(); + return true; + } + // return rcb->pop_front(d); + }; + if (Read()) { + ++nread; + total += d; + } else { + // MySleep(); + } + } + }; + +#endif + auto status = [&]() { auto next = steady_clock::now(); uint32_t lw = 0; @@ -102,7 +150,8 @@ { ThreadManager threads; boost::timer::auto_cpu_timer timer; - printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); + // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); + printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread); for (int i = 0; i < nthread; ++i) { threads.Launch(Reader); threads.Launch(Writer); @@ -116,7 +165,8 @@ BOOST_AUTO_TEST_CASE(MutexTest) { - typedef robust::Mutex RobustMutex; + // typedef robust::MFMutex RobustMutex; + typedef robust::SemMutex RobustMutex; for (int i = 0; i < 20; ++i) { int size = i; @@ -131,7 +181,9 @@ const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); + // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345); + RobustMutex rmtx(12345); + auto mtx = &rmtx; auto pi = shm.FindOrCreate<int>(int_name, 100); std::mutex m; @@ -142,29 +194,48 @@ printf("int : %d, add1: %d\n", old, ++*pi); } - { - const int ntimes = 1000 * 1000; - RobustMutex mutex; + auto LockSpeed = [](auto &mutex, const std::string &name) { + const int ntimes = 1000 * 1; auto Lock = [&]() { for (int i = 0; i < ntimes; ++i) { mutex.lock(); mutex.unlock(); } }; - + printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes); { boost::timer::auto_cpu_timer timer; - printf("test lock/unlock %d times: ", ntimes); + printf("1 thread: "); Lock(); } - { + auto InThread = [&](int nthread) { boost::timer::auto_cpu_timer timer; - printf("test lock/unlock %d times, 2 thread: ", ntimes); - std::thread t1(Lock), t2(Lock); - t1.join(); - t2.join(); - } - } + printf("%d threads: ", nthread); + std::vector<std::thread> vt; + for (int i = 0; i < nthread; ++i) { + vt.emplace_back(Lock); + } + for (auto &t : vt) { + t.join(); + } + }; + InThread(4); + InThread(16); + InThread(100); + InThread(1000); + }; + NullMutex null_mtx; + std::mutex std_mtx; + CasMutex cas_mtx; + FMutex mfmtx(3); + boost::interprocess::interprocess_mutex ipc_mutex; + SemMutex sem_mtx(3); + LockSpeed(null_mtx, "null mutex"); + LockSpeed(std_mtx, "std::mutex"); + // LockSpeed(cas_mtx, "CAS mutex"); + LockSpeed(ipc_mutex, "boost ipc mutex"); + LockSpeed(mfmtx, "mutex+flock"); + LockSpeed(sem_mtx, "sem mutex"); auto TryLock = [&]() { if (mtx->try_lock()) { @@ -183,6 +254,7 @@ if (mtx) { printf("mtx exists\n"); if (TryLock()) { + // Sleep(10s); auto op = [&]() { if (TryLock()) { Unlock(); -- Gitblit v1.8.0