guard shm alloc; rm OnWrite,dup lock in shm queue.
| | |
| | | mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), |
| | | name_(name) |
| | | { |
| | | pmutex_ = FindOrCreate<Mutex>("ShmSelfControlMutex0"); |
| | | } |
| | | |
| | | SharedMemory::~SharedMemory() |
| | |
| | | class SharedMemory : public mshm_t |
| | | { |
| | | std::string name_; |
| | | Mutex *pmutex_ = 0; |
| | | |
| | | static permissions AllowAll() |
| | | { |
| | |
| | | { |
| | | return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...); |
| | | } |
| | | void *Alloc(const size_t size) { return allocate(size, std::nothrow); } |
| | | void *Alloc(const size_t size) |
| | | { |
| | | Guard lock(*pmutex_); |
| | | return allocate(size, std::nothrow); |
| | | } |
| | | void Dealloc(void *p) |
| | | { |
| | | Guard lock(*pmutex_); |
| | | if (p) { deallocate(p); } |
| | | } |
| | | template <class T> |
| | | void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); } |
| | | |
| | | template <class T, class... Params> |
| | | T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); } |
| | | T *New(Params &&...params) |
| | | { |
| | | Guard lock(*pmutex_); |
| | | return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); |
| | | } |
| | | template <class T> |
| | | void Delete(T *p) |
| | | { |
| | | Guard lock(*pmutex_); |
| | | if (p) { destroy_ptr<T>(p); }; |
| | | } |
| | | template <class T> |
| | |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | bool r = false; |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); |
| | | msg.AddRef(); |
| | | r = remote->TryWrite(msg); |
| | | if (!r) { |
| | | msg.Release(); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return false; |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | // Test shows that in the 2 cases: |
| | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | template <class Iter> |
| | | static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | template <class... Rest> |
| | | bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); |
| | | bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } |
| | | |
| | | private: |
| | | MQId id_; |
| | |
| | | template <class D> |
| | | using Circular = robust::CircularBuffer<D, Allocator<D>>; |
| | | |
| | | |
| | | template <class D> |
| | | class SharedQueue |
| | | { |
| | | public: |
| | | SharedQueue(const uint32_t len, Allocator<D> const &alloc) : |
| | | queue_(len, alloc) {} |
| | | |
| | | template <class OnWrite> |
| | | bool TryWrite(const D &d, const OnWrite &onWrite) |
| | | { |
| | | Guard lock(mutex()); |
| | | if (!queue_.full()) { |
| | | onWrite(d); |
| | | queue_.push_back(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool TryWrite(const D &d) |
| | | { |
| | | Guard lock(mutex()); |
| | | return !queue_.full() ? (queue_.push_back(d), true) : false; |
| | | } |
| | | |
| | | bool Read(D &d, const int timeout_ms) |
| | | { |
| | |
| | | } while (steady_clock::now() < end_time); |
| | | return false; |
| | | } |
| | | bool TryRead(D &d) |
| | | { |
| | | Guard lock(mutex()); |
| | | if (!queue_.empty()) { |
| | | queue_.pop_front(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | bool TryRead(D &d) { return queue_.pop_front(d); } |
| | | bool TryWrite(const D &d) { return queue_.push_back(d); } |
| | | |
| | | private: |
| | | typedef Circular<D> Queue; |
| | | Queue queue_; |
| | | Mutex mutex_; |
| | | Mutex &mutex() { return mutex_; } |
| | | }; |
| | | |
| | | } // namespace bhome_shm |