From 3b93dc0dc34008cf25b2b12f6b026b3d9e4ed623 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 30 四月 2021 12:48:14 +0800
Subject: [PATCH] guard shm alloc; rm OnWrite,dup lock in shm queue.
---
src/shm.h | 15 ++++++-
src/shm_msg_queue.h | 23 +----------
src/shm_queue.h | 34 +----------------
src/shm.cpp | 1
src/shm_msg_queue.cpp | 15 +++----
5 files changed, 25 insertions(+), 63 deletions(-)
diff --git a/src/shm.cpp b/src/shm.cpp
index 1658900..479b94f 100644
--- a/src/shm.cpp
+++ b/src/shm.cpp
@@ -25,6 +25,7 @@
mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
name_(name)
{
+ pmutex_ = FindOrCreate<Mutex>("ShmSelfControlMutex0");
}
SharedMemory::~SharedMemory()
diff --git a/src/shm.h b/src/shm.h
index 515d856..cb487df 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -96,6 +96,7 @@
class SharedMemory : public mshm_t
{
std::string name_;
+ Mutex *pmutex_ = 0;
static permissions AllowAll()
{
@@ -122,19 +123,29 @@
{
return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
}
- void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
+ void *Alloc(const size_t size)
+ {
+ Guard lock(*pmutex_);
+ return allocate(size, std::nothrow);
+ }
void Dealloc(void *p)
{
+ Guard lock(*pmutex_);
if (p) { deallocate(p); }
}
template <class T>
void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
template <class T, class... Params>
- T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); }
+ T *New(Params &&...params)
+ {
+ Guard lock(*pmutex_);
+ return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...);
+ }
template <class T>
void Delete(T *p)
{
+ Guard lock(*pmutex_);
if (p) { destroy_ptr<T>(p); };
}
template <class T>
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index ae019bf..03a6cfb 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -84,19 +84,18 @@
return Shmq::Find(shm, MsgQIdToName(remote_id));
}
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
{
Queue *remote = Find(shm, remote_id);
+ bool r = false;
if (remote) {
- if (onsend) {
- return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
- } else {
- return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
+ msg.AddRef();
+ r = remote->TryWrite(msg);
+ if (!r) {
+ msg.Release();
}
- } else {
- // SetLestError(eNotFound);
- return false;
}
+ return r;
}
// Test shows that in the 2 cases:
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index d7b33af..c56784c 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -46,27 +46,8 @@
bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
static Queue *Find(SharedMemory &shm, const MQId remote_id);
- static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
- template <class Iter>
- static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
- {
- Queue *remote = Find(shm, remote_id);
- if (remote) {
- if (onsend) {
- return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
- } else {
- return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
- }
- } else {
- // SetLestError(eNotFound);
- return 0;
- }
- }
-
- template <class... Rest>
- bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
- template <class... Rest>
- int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
+ static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
+ bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
private:
MQId id_;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 11f9893..7e4ec31 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -29,32 +29,12 @@
template <class D>
using Circular = robust::CircularBuffer<D, Allocator<D>>;
-
template <class D>
class SharedQueue
{
public:
SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
queue_(len, alloc) {}
-
- template <class OnWrite>
- bool TryWrite(const D &d, const OnWrite &onWrite)
- {
- Guard lock(mutex());
- if (!queue_.full()) {
- onWrite(d);
- queue_.push_back(d);
- return true;
- } else {
- return false;
- }
- }
-
- bool TryWrite(const D &d)
- {
- Guard lock(mutex());
- return !queue_.full() ? (queue_.push_back(d), true) : false;
- }
bool Read(D &d, const int timeout_ms)
{
@@ -69,22 +49,12 @@
} while (steady_clock::now() < end_time);
return false;
}
- bool TryRead(D &d)
- {
- Guard lock(mutex());
- if (!queue_.empty()) {
- queue_.pop_front(d);
- return true;
- } else {
- return false;
- }
- }
+ bool TryRead(D &d) { return queue_.pop_front(d); }
+ bool TryWrite(const D &d) { return queue_.push_back(d); }
private:
typedef Circular<D> Queue;
Queue queue_;
- Mutex mutex_;
- Mutex &mutex() { return mutex_; }
};
} // namespace bhome_shm
--
Gitblit v1.8.0