From 3b93dc0dc34008cf25b2b12f6b026b3d9e4ed623 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 12:48:14 +0800 Subject: [PATCH] guard shm alloc; rm OnWrite,dup lock in shm queue. --- src/shm.h | 15 ++++++- src/shm_msg_queue.h | 23 +---------- src/shm_queue.h | 34 +---------------- src/shm.cpp | 1 src/shm_msg_queue.cpp | 15 +++---- 5 files changed, 25 insertions(+), 63 deletions(-) diff --git a/src/shm.cpp b/src/shm.cpp index 1658900..479b94f 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -25,6 +25,7 @@ mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), name_(name) { + pmutex_ = FindOrCreate<Mutex>("ShmSelfControlMutex0"); } SharedMemory::~SharedMemory() diff --git a/src/shm.h b/src/shm.h index 515d856..cb487df 100644 --- a/src/shm.h +++ b/src/shm.h @@ -96,6 +96,7 @@ class SharedMemory : public mshm_t { std::string name_; + Mutex *pmutex_ = 0; static permissions AllowAll() { @@ -122,19 +123,29 @@ { return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...); } - void *Alloc(const size_t size) { return allocate(size, std::nothrow); } + void *Alloc(const size_t size) + { + Guard lock(*pmutex_); + return allocate(size, std::nothrow); + } void Dealloc(void *p) { + Guard lock(*pmutex_); if (p) { deallocate(p); } } template <class T> void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); } template <class T, class... Params> - T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); } + T *New(Params &&...params) + { + Guard lock(*pmutex_); + return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); + } template <class T> void Delete(T *p) { + Guard lock(*pmutex_); if (p) { destroy_ptr<T>(p); }; } template <class T> diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index ae019bf..03a6cfb 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -84,19 +84,18 @@ return Shmq::Find(shm, MsgQIdToName(remote_id)); } -bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) +bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) { Queue *remote = Find(shm, remote_id); + bool r = false; if (remote) { - if (onsend) { - return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); - } else { - return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); + msg.AddRef(); + r = remote->TryWrite(msg); + if (!r) { + msg.Release(); } - } else { - // SetLestError(eNotFound); - return false; } + return r; } // Test shows that in the 2 cases: diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index d7b33af..c56784c 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -46,27 +46,8 @@ bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } static Queue *Find(SharedMemory &shm, const MQId remote_id); - static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); - template <class Iter> - static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) - { - Queue *remote = Find(shm, remote_id); - if (remote) { - if (onsend) { - return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); - } else { - return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); - } - } else { - // SetLestError(eNotFound); - return 0; - } - } - - template <class... Rest> - bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } - template <class... Rest> - int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } + static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); + bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } private: MQId id_; diff --git a/src/shm_queue.h b/src/shm_queue.h index 11f9893..7e4ec31 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -29,32 +29,12 @@ template <class D> using Circular = robust::CircularBuffer<D, Allocator<D>>; - template <class D> class SharedQueue { public: SharedQueue(const uint32_t len, Allocator<D> const &alloc) : queue_(len, alloc) {} - - template <class OnWrite> - bool TryWrite(const D &d, const OnWrite &onWrite) - { - Guard lock(mutex()); - if (!queue_.full()) { - onWrite(d); - queue_.push_back(d); - return true; - } else { - return false; - } - } - - bool TryWrite(const D &d) - { - Guard lock(mutex()); - return !queue_.full() ? (queue_.push_back(d), true) : false; - } bool Read(D &d, const int timeout_ms) { @@ -69,22 +49,12 @@ } while (steady_clock::now() < end_time); return false; } - bool TryRead(D &d) - { - Guard lock(mutex()); - if (!queue_.empty()) { - queue_.pop_front(d); - return true; - } else { - return false; - } - } + bool TryRead(D &d) { return queue_.pop_front(d); } + bool TryWrite(const D &d) { return queue_.push_back(d); } private: typedef Circular<D> Queue; Queue queue_; - Mutex mutex_; - Mutex &mutex() { return mutex_; } }; } // namespace bhome_shm -- Gitblit v1.8.0