lichao
2021-04-23 628c1c21ffb19d8c96ed9ce89531595f9870ab1a
add msg tag; recv all msgs before remove mq.
4个文件已修改
63 ■■■■■ 已修改文件
src/msg.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp
@@ -20,6 +20,5 @@
namespace bhome_msg
{
const uint32_t kMsgTag = 0xf1e2d3c4;
} // namespace bhome_msg
src/msg.h
@@ -70,9 +70,11 @@
        return pshm;
    }
    struct Meta {
    static const uint32_t kMsgTag = 0xf1e2d3c4;
    typedef struct {
        RefCount count_;
    };
        const uint32_t tag_ = kMsgTag;
    } Meta;
    Offset offset_;
    void *Alloc(const size_t size)
    {
@@ -155,9 +157,8 @@
    explicit ShmMsg(const size_t size) :
        ShmMsg(Alloc(size)) {}
    void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
    bool valid() const { return static_cast<bool>(offset_); }
    bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
    // AddRef and Release works for both counted and not counted msg.
    int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
    int Release()
    {
src/shm_queue.cpp
@@ -61,31 +61,24 @@
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
{
    Queue *q = Find(shm, id);
    if (q) {
        MsgI msg;
        while (q->TryRead(msg)) {
            msg.Release();
        }
    }
    return Super::Remove(shm, MsgQIdToName(id));
}
ShmMsgQueue::Queue *ShmMsgQueue::FindRemote(SharedMemory &shm, const MQId &remote_id)
ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id)
{
    return Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
{
    Queue *remote = FindRemote(shm, remote_id);
    if (remote) {
        if (onsend) {
            return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        } else {
            return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
        }
    } else {
        // SetLestError(eNotFound);
        return false;
    }
    return Super::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
{
    Queue *remote = FindRemote(shm, remote_id);
    Queue *remote = Find(shm, remote_id);
    if (remote) {
        if (onsend) {
            return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
src/shm_queue.h
@@ -110,22 +110,6 @@
        Super(len, alloc) {}
    using Super::capacity;
    using Super::size;
    template <class Iter, class OnWrite>
    int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
    {
        auto endtime = MSFromNow(timeout_ms);
        auto timedWritePred = [this, endtime](Guard &lock) {
            return (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); }));
        };
        return WriteAllOnCond(begin, end, timedWritePred, onWrite);
    }
    template <class OnWrite>
    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); }
    bool Write(const D &buf, const int timeout_ms)
    {
        return Write(buf, timeout_ms, [](const D &buf) {});
    }
    template <class Iter, class OnWrite>
    int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
@@ -174,13 +158,13 @@
    bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
    template <class OnData>
    int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
    static Queue *FindRemote(SharedMemory &shm, const MQId &remote_id);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
    static Queue *Find(SharedMemory &shm, const MQId &remote_id);
    // static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
    static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class Iter>
    static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    {
        Queue *remote = FindRemote(shm, remote_id);
        Queue *remote = Find(shm, remote_id);
        if (remote) {
            if (onsend) {
                return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
@@ -193,8 +177,8 @@
        }
    }
    template <class... Rest>
    bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    // template <class... Rest>
    // bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    template <class... Rest>
    bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    template <class... Rest>