lichao
2021-04-30 3b93dc0dc34008cf25b2b12f6b026b3d9e4ed623
guard shm alloc; rm OnWrite,dup lock in shm queue.
5个文件已修改
88 ■■■■ 已修改文件
src/shm.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.cpp
@@ -25,6 +25,7 @@
    mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
    name_(name)
{
    pmutex_ = FindOrCreate<Mutex>("ShmSelfControlMutex0");
}
SharedMemory::~SharedMemory()
src/shm.h
@@ -96,6 +96,7 @@
class SharedMemory : public mshm_t
{
    std::string name_;
    Mutex *pmutex_ = 0;
    static permissions AllowAll()
    {
@@ -122,19 +123,29 @@
    {
        return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
    }
    void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
    void *Alloc(const size_t size)
    {
        Guard lock(*pmutex_);
        return allocate(size, std::nothrow);
    }
    void Dealloc(void *p)
    {
        Guard lock(*pmutex_);
        if (p) { deallocate(p); }
    }
    template <class T>
    void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
    template <class T, class... Params>
    T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); }
    T *New(Params &&...params)
    {
        Guard lock(*pmutex_);
        return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...);
    }
    template <class T>
    void Delete(T *p)
    {
        Guard lock(*pmutex_);
        if (p) { destroy_ptr<T>(p); };
    }
    template <class T>
src/shm_msg_queue.cpp
@@ -84,19 +84,18 @@
    return Shmq::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
{
    Queue *remote = Find(shm, remote_id);
    bool r = false;
    if (remote) {
        if (onsend) {
            return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        } else {
            return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
        msg.AddRef();
        r = remote->TryWrite(msg);
        if (!r) {
            msg.Release();
        }
    } else {
        // SetLestError(eNotFound);
        return false;
    }
    return r;
}
// Test shows that in the 2 cases:
src/shm_msg_queue.h
@@ -46,27 +46,8 @@
    bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
    bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class Iter>
    static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    {
        Queue *remote = Find(shm, remote_id);
        if (remote) {
            if (onsend) {
                return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
            } else {
                return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
            }
        } else {
            // SetLestError(eNotFound);
            return 0;
        }
    }
    template <class... Rest>
    bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    template <class... Rest>
    int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
    static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
    bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
private:
    MQId id_;
src/shm_queue.h
@@ -29,32 +29,12 @@
template <class D>
using Circular = robust::CircularBuffer<D, Allocator<D>>;
template <class D>
class SharedQueue
{
public:
    SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
        queue_(len, alloc) {}
    template <class OnWrite>
    bool TryWrite(const D &d, const OnWrite &onWrite)
    {
        Guard lock(mutex());
        if (!queue_.full()) {
            onWrite(d);
            queue_.push_back(d);
            return true;
        } else {
            return false;
        }
    }
    bool TryWrite(const D &d)
    {
        Guard lock(mutex());
        return !queue_.full() ? (queue_.push_back(d), true) : false;
    }
    bool Read(D &d, const int timeout_ms)
    {
@@ -69,22 +49,12 @@
        } while (steady_clock::now() < end_time);
        return false;
    }
    bool TryRead(D &d)
    {
        Guard lock(mutex());
        if (!queue_.empty()) {
            queue_.pop_front(d);
            return true;
        } else {
            return false;
        }
    }
    bool TryRead(D &d) { return queue_.pop_front(d); }
    bool TryWrite(const D &d) { return queue_.push_back(d); }
private:
    typedef Circular<D> Queue;
    Queue queue_;
    Mutex mutex_;
    Mutex &mutex() { return mutex_; }
};
} // namespace bhome_shm