add robust FMutex, works fine; use boost circular.
| | |
| | | namespace |
| | | { |
| | | static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); |
| | | static_assert(sizeof(RobustReqRep) == 24); |
| | | static_assert(sizeof(Mutex) == 8); |
| | | static_assert(sizeof(CircularBuffer<int>) == 48); |
| | | |
| | | auto Now() { return steady_clock::now().time_since_epoch(); } |
| | | const steady_clock::duration kIoTimeout = 10ms; |
| | | const steady_clock::duration kIoExpire = 100ms; |
| | | |
| | | void Yield() { std::this_thread::sleep_for(10us); } |
| | | |
| | | } // namespace |
| | | |
| | | void QuickSleep() |
| | | { |
| | | Yield(); |
| | | } |
| | | bool RobustReqRep::StateCas(State exp, State val) |
| | | { |
| | | bool r = state_.compare_exchange_strong(exp, val); |
| | | return r ? (timestamp_.store(Now()), true) : false; |
| | | } |
| | | void QuickSleep() { Yield(); } |
| | | |
| | | int RobustReqRep::ClientReadReply(Msg &reply) |
| | | bool FMutex::try_lock() |
| | | { |
| | | auto end_time = Now() + kIoTimeout; |
| | | int done = false; |
| | | do { |
| | | if (StateCas(eServerWriteEnd, eClientReadBegin)) { |
| | | Read(reply); |
| | | done = StateCas(eClientReadBegin, eClientReadEnd); |
| | | if (done) { break; } |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ClientWriteRequest(const Msg &request) |
| | | { |
| | | if (request.size() > capacity_) { |
| | | return eSizeError; |
| | | } |
| | | auto end_time = Now() + kIoTimeout; |
| | | bool done = false; |
| | | do { |
| | | if (StateCas(eStateReady, eClientWriteBegin)) { |
| | | Write(request); |
| | | done = StateCas(eClientWriteBegin, eClientWriteEnd); |
| | | if (done) { break; } |
| | | } |
| | | Yield(); |
| | | } while (Now() < end_time); |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | int RobustReqRep::ServerReadRequest(Msg &request) |
| | | { |
| | | bool done = false; |
| | | if (StateCas(eClientWriteEnd, eServerReadBegin)) { |
| | | Read(request); |
| | | done = StateCas(eServerReadBegin, eServerReadEnd); |
| | | if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { |
| | | if (mtx_.try_lock()) { |
| | | return true; |
| | | } else { |
| | | auto old = state_.load(); |
| | | if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) { |
| | | StateCas(old, eStateReady); |
| | | flock(fd_, LOCK_UN); |
| | | } |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | return false; |
| | | } |
| | | |
| | | int RobustReqRep::ServerWriteReply(const Msg &reply) |
| | | void FMutex::lock() |
| | | { |
| | | if (reply.size() > capacity_) { |
| | | return eSizeError; |
| | | //Note: the lock order affects performance a lot, |
| | | // locking fd_ first is about 100 times faster than locking mtx_ first. |
| | | flock(fd_, LOCK_EX); |
| | | mtx_.lock(); |
| | | } |
| | | // no need to loop write, either success or timeout. |
| | | bool done = false; |
| | | if (StateCas(eServerReadEnd, eServerWriteBegin)) { |
| | | Write(reply); |
| | | done = StateCas(eServerWriteBegin, eServerWriteEnd); |
| | | void FMutex::unlock() |
| | | { |
| | | mtx_.unlock(); |
| | | flock(fd_, LOCK_UN); |
| | | } |
| | | return done ? eSuccess : eTimeout; |
| | | } |
| | | |
| | | } // namespace robust |
| | |
| | | #include <atomic> |
| | | #include <chrono> |
| | | #include <memory> |
| | | #include <string.h> |
| | | #include <mutex> |
| | | #include <string> |
| | | #include <sys/file.h> |
| | | #include <sys/ipc.h> |
| | | #include <sys/sem.h> |
| | | #include <sys/stat.h> |
| | | #include <sys/types.h> |
| | | #include <unistd.h> |
| | | |
| | |
| | | |
| | | void QuickSleep(); |
| | | |
| | | class RobustReqRep |
| | | { |
| | | typedef uint32_t State; |
| | | typedef std::string Msg; |
| | | typedef std::chrono::steady_clock::duration Duration; |
| | | |
| | | public: |
| | | enum ErrorCode { |
| | | eSuccess = 0, |
| | | eTimeout = EAGAIN, |
| | | eSizeError = EINVAL, |
| | | }; |
| | | |
| | | explicit RobustReqRep(const uint32_t max_len) : |
| | | capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {} |
| | | |
| | | void PutReady() { state_.store(eStateReady); } |
| | | bool Ready() const { return state_.load() == eStateReady; } |
| | | uint32_t capacity() const { return capacity_; } |
| | | |
| | | int ClientRequest(const Msg &request, Msg &reply) |
| | | { |
| | | int r = ClientWriteRequest(request); |
| | | if (r == eSuccess) { |
| | | r = ClientReadReply(reply); |
| | | } |
| | | return r; |
| | | } |
| | | int ClientReadReply(Msg &reply); |
| | | int ClientWriteRequest(const Msg &request); |
| | | int ServerReadRequest(Msg &request); |
| | | int ServerWriteReply(const Msg &reply); |
| | | |
| | | private: |
| | | RobustReqRep(const RobustReqRep &); |
| | | RobustReqRep(RobustReqRep &&); |
| | | RobustReqRep &operator=(const RobustReqRep &) = delete; |
| | | RobustReqRep &operator=(RobustReqRep &&) = delete; |
| | | |
| | | enum { |
| | | eStateInit = 0, |
| | | eStateReady = 0x19833891, |
| | | eClientWriteBegin, |
| | | eClientWriteEnd, |
| | | eServerReadBegin, |
| | | eServerReadEnd, |
| | | eServerWriteBegin, |
| | | eServerWriteEnd, |
| | | eClientReadBegin, |
| | | eClientReadEnd = eStateReady, |
| | | }; |
| | | bool StateCas(State exp, State val); |
| | | void Write(const Msg &msg) |
| | | { |
| | | size_.store(msg.size()); |
| | | memcpy(buf, msg.data(), msg.size()); |
| | | } |
| | | void Read(Msg &msg) { msg.assign(buf, size_.load()); } |
| | | |
| | | const uint32_t capacity_; |
| | | std::atomic<State> state_; |
| | | static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data."); |
| | | std::atomic<Duration> timestamp_; |
| | | std::atomic<int32_t> size_; |
| | | char buf[4]; |
| | | }; |
| | | |
| | | class PidLocker |
| | | { |
| | | public: |
| | | typedef int locker_t; |
| | | enum { eLockerBits = sizeof(locker_t) * 8 }; |
| | | static locker_t this_locker() |
| | | { |
| | | static locker_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool is_alive(locker_t locker) { return true; } |
| | | }; |
| | | |
| | | class RobustPidLocker |
| | | { |
| | | public: |
| | | typedef int locker_t; |
| | | enum { eLockerBits = sizeof(locker_t) * 8 }; |
| | | static locker_t this_locker() |
| | | { |
| | | static locker_t val = getpid(); |
| | | return val; |
| | | } |
| | | static bool is_alive(locker_t locker) |
| | | { |
| | | char buf[64] = {0}; |
| | | snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker); |
| | | return access(buf, F_OK) == 0; |
| | | } |
| | | }; |
| | | |
| | | class ExpiredLocker |
| | | { |
| | | public: |
| | | typedef int64_t locker_t; |
| | | enum { eLockerBits = 63 }; |
| | | static locker_t this_locker() { return Now(); } |
| | | static bool is_alive(locker_t locker) |
| | | { |
| | | return Now() < locker + steady_clock::duration(10s).count(); |
| | | } |
| | | |
| | | private: |
| | | static locker_t Now() { return steady_clock::now().time_since_epoch().count(); } |
| | | }; |
| | | |
| | | template <class LockerT> |
| | | class CasMutex |
| | | { |
| | | typedef typename LockerT::locker_t locker_t; |
| | | static inline locker_t this_locker() { return LockerT::this_locker(); } |
| | | static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); } |
| | | static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits); |
| | | static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!"); |
| | | typedef uint64_t locker_t; |
| | | static inline locker_t this_locker() { return pthread_self(); } |
| | | static const uint64_t kLockerMask = MaskBits(63); |
| | | |
| | | public: |
| | | CasMutex() : |
| | | meta_(0) {} |
| | | int try_lock() |
| | | { |
| | | const auto t = steady_clock::now().time_since_epoch().count(); |
| | | auto old = meta_.load(); |
| | | int r = 0; |
| | | if (!Locked(old)) { |
| | | r = MetaCas(old, Meta(1, this_locker())); |
| | | } else if (!is_alive(Locker(old))) { |
| | | r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1; |
| | | if (r) { |
| | | LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r; |
| | | } |
| | | } |
| | | return r; |
| | | } |
| | |
| | | bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } |
| | | }; |
| | | |
| | | typedef CasMutex<RobustPidLocker> Mutex; |
| | | class NullMutex |
| | | { |
| | | public: |
| | | bool try_lock() { return true; } |
| | | void lock() {} |
| | | void unlock() {} |
| | | }; |
| | | |
| | | // flock + mutex |
| | | class FMutex |
| | | { |
| | | public: |
| | | typedef uint64_t id_t; |
| | | FMutex(id_t id) : |
| | | id_(id), fd_(Open(id_)) |
| | | { |
| | | if (fd_ == -1) { throw "error create mutex!"; } |
| | | } |
| | | ~FMutex() { Close(fd_); } |
| | | bool try_lock(); |
| | | void lock(); |
| | | void unlock(); |
| | | |
| | | private: |
| | | static std::string GetPath(id_t id) |
| | | { |
| | | const std::string dir("/tmp/.bhome_mtx"); |
| | | mkdir(dir.c_str(), 0777); |
| | | return dir + "/fm_" + std::to_string(id); |
| | | } |
| | | static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); } |
| | | static int Close(int fd) { return close(fd); } |
| | | id_t id_; |
| | | int fd_; |
| | | std::mutex mtx_; |
| | | }; |
| | | |
| | | union semun { |
| | | int val; /* Value for SETVAL */ |
| | | struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ |
| | | unsigned short *array; /* Array for GETALL, SETALL */ |
| | | struct seminfo *__buf; /* Buffer for IPC_INFO |
| | | (Linux-specific) */ |
| | | }; |
| | | |
| | | class SemMutex |
| | | { |
| | | public: |
| | | SemMutex(key_t key) : |
| | | key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT)) |
| | | { |
| | | if (sem_id_ == -1) { throw "error create semaphore."; } |
| | | union semun init_val; |
| | | init_val.val = 1; |
| | | semctl(sem_id_, 0, SETVAL, init_val); |
| | | } |
| | | ~SemMutex() |
| | | { |
| | | // semctl(sem_id_, 0, IPC_RMID, semun{}); |
| | | } |
| | | |
| | | bool try_lock() |
| | | { |
| | | sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; |
| | | return semop(sem_id_, &op, 1) == 0; |
| | | } |
| | | |
| | | void lock() |
| | | { |
| | | sembuf op = {0, -1, SEM_UNDO}; |
| | | semop(sem_id_, &op, 1); |
| | | } |
| | | |
| | | void unlock() |
| | | { |
| | | sembuf op = {0, 1, SEM_UNDO}; |
| | | semop(sem_id_, &op, 1); |
| | | } |
| | | |
| | | private: |
| | | key_t key_; |
| | | int sem_id_; |
| | | }; |
| | | |
| | | template <class Lock> |
| | | class Guard |
| | |
| | | |
| | | bool push_back(const Data d) |
| | | { |
| | | Guard<Mutex> guard(mutex_); |
| | | auto old = mtail(); |
| | | auto pos = Pos(old); |
| | | auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); |
| | |
| | | } |
| | | bool pop_front(Data &d) |
| | | { |
| | | Guard<Mutex> guard(mutex_); |
| | | auto old = mhead(); |
| | | auto pos = Pos(old); |
| | | if (!(pos == tail())) { |
| | |
| | | meta_type mtail() const { return mtail_.load(); } |
| | | // data |
| | | const size_type capacity_; |
| | | Mutex mutex_; |
| | | std::atomic<meta_type> mhead_; |
| | | std::atomic<meta_type> mtail_; |
| | | Alloc al_; |
| | |
| | | #ifndef SHM_6CHO6D6C |
| | | #define SHM_6CHO6D6C |
| | | |
| | | #include "robust.h" |
| | | #include "log.h" |
| | | #include <atomic> |
| | | #include <boost/interprocess/managed_shared_memory.hpp> |
| | | #include <boost/interprocess/sync/interprocess_mutex.hpp> |
| | |
| | | } |
| | | }; |
| | | |
| | | typedef robust::Mutex Mutex; |
| | | typedef robust::Guard<Mutex> Guard; |
| | | typedef interprocess_mutex Mutex; |
| | | typedef scoped_lock<Mutex> Guard; |
| | | // typedef robust::Guard<Mutex> Guard; |
| | | |
| | | class SharedMemory : public mshm_t |
| | | { |
| | |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) |
| | | queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_)) |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create/find msgq " + std::to_string(id_)); |
| | |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) |
| | | { |
| | | static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; |
| | | |
| | | static std::mutex mtx; |
| | | std::lock_guard<std::mutex> lock(mtx); |
| | | auto pos = imm.find(id); |
| | | if (pos == imm.end()) { |
| | | pos = imm.emplace(id, new Mutex(id)).first; |
| | | } |
| | | return *pos->second; |
| | | } |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | bool r = false; |
| | | if (remote) { |
| | | try { |
| | | ShmMsgQueue dest(remote_id, false, shm, 1); |
| | | msg.AddRef(); |
| | | r = remote->TryWrite(msg.Offset()); |
| | | if (!r) { |
| | | msg.Release(); |
| | | } |
| | | DEFER1(if (!r) { msg.Release(); }); |
| | | |
| | | Guard lock(GetMutex(remote_id)); |
| | | r = dest.queue().TryWrite(msg.Offset()); |
| | | } catch (...) { |
| | | } |
| | | return r; |
| | | } |
| | |
| | | |
| | | class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | { |
| | | typedef ShmObject<SharedQ63<4>> Shmq; |
| | | // typedef ShmObject<SharedQueue<int64_t>> Shmq; |
| | | // typedef ShmObject<SharedQ63<4>> Shmq; |
| | | typedef ShmObject<SharedQueue<int64_t>> Shmq; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef Shmq::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | typedef robust::FMutex Mutex; |
| | | typedef robust::Guard<Mutex> Guard; |
| | | |
| | | public: |
| | | typedef uint64_t MQId; |
| | |
| | | MQId Id() const { return id_; } |
| | | ShmType &shm() const { return queue_.shm(); } |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); } |
| | | bool Recv(MsgI &msg, const int timeout_ms) |
| | | { |
| | | Guard lock(GetMutex(Id())); |
| | | return queue().Read(msg.OffsetRef(), timeout_ms); |
| | | } |
| | | bool TryRecv(MsgI &msg) |
| | | { |
| | | Guard lock(GetMutex(Id())); |
| | | return queue().TryRead(msg.OffsetRef()); |
| | | } |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); |
| | | bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } |
| | | |
| | | private: |
| | | static Mutex &GetMutex(const MQId id); |
| | | MQId id_; |
| | | Queue &queue() { return *queue_.data(); } |
| | | Shmq queue_; |
| | |
| | | #ifndef SHM_QUEUE_JE0OEUP3 |
| | | #define SHM_QUEUE_JE0OEUP3 |
| | | |
| | | #include "robust.h" |
| | | #include "shm.h" |
| | | #include <atomic> |
| | | #include <boost/circular_buffer.hpp> |
| | | #include <chrono> |
| | | |
| | | namespace bhome_shm |
| | | { |
| | | |
| | | template <class D> |
| | | using Circular = robust::CircularBuffer<D, Allocator<D>>; |
| | | using Circular = boost::circular_buffer<D, Allocator<D>>; |
| | | // using Circular = robust::CircularBuffer<D, Allocator<D>>; |
| | | |
| | | template <class D> |
| | | class SharedQueue |
| | |
| | | } while (steady_clock::now() < end_time); |
| | | return false; |
| | | } |
| | | bool TryRead(D &d) { return queue_.pop_front(d); } |
| | | bool TryWrite(const D &d) { return queue_.push_back(d); } |
| | | bool TryRead(D &d) |
| | | { |
| | | if (!queue_.empty()) { |
| | | d = queue_.front(); |
| | | queue_.pop_front(); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | bool TryWrite(const D &d) |
| | | { |
| | | if (!queue_.full()) { |
| | | queue_.push_back(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | private: |
| | | Circular<D> queue_; |
| | |
| | | #include "robust.h" |
| | | #include "util.h" |
| | | #include <boost/circular_buffer.hpp> |
| | | |
| | | using namespace robust; |
| | | |
| | |
| | | BOOST_AUTO_TEST_CASE(QueueTest) |
| | | { |
| | | const int nthread = 100; |
| | | const uint64_t nmsg = 1000 * 1000 * 100; |
| | | const uint64_t nmsg = 1000 * 1000 * 10; |
| | | |
| | | SharedMemory &shm = TestShm(); |
| | | shm.Remove(); |
| | |
| | | BOOST_CHECK_EQUAL((u64 & 255), i); |
| | | } |
| | | |
| | | #if 1 |
| | | uint64_t correct_total = nmsg * (nmsg - 1) / 2; |
| | | std::atomic<uint64_t> total(0); |
| | | std::atomic<uint64_t> nwrite(0); |
| | | std::atomic<uint64_t> writedone(0); |
| | | |
| | | #if 0 |
| | | typedef AtomicQueue<4> Rcb; |
| | | |
| | | Rcb tmp; |
| | |
| | | BOOST_CHECK(tmp.tail() == 1); |
| | | |
| | | ShmObject<Rcb> rcb(shm, "test_rcb"); |
| | | #else |
| | | typedef Circular<int64_t> Rcb; |
| | | ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager()); |
| | | #endif |
| | | bool try_more = true; |
| | | |
| | | const int nsize = sizeof(Rcb); |
| | | |
| | | bool try_more = false; |
| | | uint64_t correct_total = nmsg * (nmsg - 1) / 2; |
| | | std::atomic<uint64_t> total(0); |
| | | std::atomic<uint64_t> nwrite(0); |
| | | std::atomic<uint64_t> writedone(0); |
| | | auto Writer = [&]() { |
| | | uint64_t n = 0; |
| | | while ((n = nwrite++) < nmsg) { |
| | |
| | | } |
| | | }; |
| | | |
| | | #else |
| | | typedef Circular<int64_t> Rcb; |
| | | ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager()); |
| | | |
| | | typedef FMutex Mutex; |
| | | // typedef SemMutex Mutex; |
| | | Mutex mtx(123); |
| | | auto Writer = [&]() { |
| | | uint64_t n = 0; |
| | | while ((n = nwrite++) < nmsg) { |
| | | auto Write = [&]() { |
| | | robust::Guard<Mutex> lk(mtx); |
| | | if (rcb->full()) { |
| | | return false; |
| | | } else { |
| | | rcb->push_back(n); |
| | | return true; |
| | | } |
| | | // return rcb->push_back(n); |
| | | }; |
| | | while (!Write()) { |
| | | // MySleep(); |
| | | } |
| | | ++writedone; |
| | | } |
| | | }; |
| | | std::atomic<uint64_t> nread(0); |
| | | auto Reader = [&]() { |
| | | while (nread.load() < nmsg) { |
| | | int64_t d; |
| | | auto Read = [&]() { |
| | | robust::Guard<Mutex> lk(mtx); |
| | | if (rcb->empty()) { |
| | | return false; |
| | | } else { |
| | | d = rcb->front(); |
| | | rcb->pop_front(); |
| | | return true; |
| | | } |
| | | // return rcb->pop_front(d); |
| | | }; |
| | | if (Read()) { |
| | | ++nread; |
| | | total += d; |
| | | } else { |
| | | // MySleep(); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | #endif |
| | | |
| | | auto status = [&]() { |
| | | auto next = steady_clock::now(); |
| | | uint32_t lw = 0; |
| | |
| | | { |
| | | ThreadManager threads; |
| | | boost::timer::auto_cpu_timer timer; |
| | | printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); |
| | | // printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread); |
| | | printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread); |
| | | for (int i = 0; i < nthread; ++i) { |
| | | threads.Launch(Reader); |
| | | threads.Launch(Writer); |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(MutexTest) |
| | | { |
| | | typedef robust::Mutex RobustMutex; |
| | | // typedef robust::MFMutex RobustMutex; |
| | | typedef robust::SemMutex RobustMutex; |
| | | |
| | | for (int i = 0; i < 20; ++i) { |
| | | int size = i; |
| | |
| | | |
| | | const std::string mtx_name("test_mutex"); |
| | | const std::string int_name("test_int"); |
| | | auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name); |
| | | // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345); |
| | | RobustMutex rmtx(12345); |
| | | auto mtx = &rmtx; |
| | | auto pi = shm.FindOrCreate<int>(int_name, 100); |
| | | |
| | | std::mutex m; |
| | |
| | | printf("int : %d, add1: %d\n", old, ++*pi); |
| | | } |
| | | |
| | | { |
| | | const int ntimes = 1000 * 1000; |
| | | RobustMutex mutex; |
| | | auto LockSpeed = [](auto &mutex, const std::string &name) { |
| | | const int ntimes = 1000 * 1; |
| | | auto Lock = [&]() { |
| | | for (int i = 0; i < ntimes; ++i) { |
| | | mutex.lock(); |
| | | mutex.unlock(); |
| | | } |
| | | }; |
| | | |
| | | printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes); |
| | | { |
| | | boost::timer::auto_cpu_timer timer; |
| | | printf("test lock/unlock %d times: ", ntimes); |
| | | printf("1 thread: "); |
| | | Lock(); |
| | | } |
| | | { |
| | | auto InThread = [&](int nthread) { |
| | | boost::timer::auto_cpu_timer timer; |
| | | printf("test lock/unlock %d times, 2 thread: ", ntimes); |
| | | std::thread t1(Lock), t2(Lock); |
| | | t1.join(); |
| | | t2.join(); |
| | | printf("%d threads: ", nthread); |
| | | std::vector<std::thread> vt; |
| | | for (int i = 0; i < nthread; ++i) { |
| | | vt.emplace_back(Lock); |
| | | } |
| | | for (auto &t : vt) { |
| | | t.join(); |
| | | } |
| | | }; |
| | | InThread(4); |
| | | InThread(16); |
| | | InThread(100); |
| | | InThread(1000); |
| | | }; |
| | | NullMutex null_mtx; |
| | | std::mutex std_mtx; |
| | | CasMutex cas_mtx; |
| | | FMutex mfmtx(3); |
| | | boost::interprocess::interprocess_mutex ipc_mutex; |
| | | SemMutex sem_mtx(3); |
| | | LockSpeed(null_mtx, "null mutex"); |
| | | LockSpeed(std_mtx, "std::mutex"); |
| | | // LockSpeed(cas_mtx, "CAS mutex"); |
| | | LockSpeed(ipc_mutex, "boost ipc mutex"); |
| | | LockSpeed(mfmtx, "mutex+flock"); |
| | | LockSpeed(sem_mtx, "sem mutex"); |
| | | |
| | | auto TryLock = [&]() { |
| | | if (mtx->try_lock()) { |
| | |
| | | if (mtx) { |
| | | printf("mtx exists\n"); |
| | | if (TryLock()) { |
| | | // Sleep(10s); |
| | | auto op = [&]() { |
| | | if (TryLock()) { |
| | | Unlock(); |