lichao
2021-04-30 95bd9a67f9f6c90f627784e3f8fbf5c203784e51
change shm socket msg queue to atomic queue.
7个文件已修改
106 ■■■■■ 已修改文件
src/msg.h 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h
@@ -49,26 +49,29 @@
        int Dec() { return --num_; }
        int Get() { return num_.load(); }
    };
    typedef int64_t Offset;
    static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
    static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
    static inline Offset BaseAddr()
    typedef int64_t OffsetType;
    static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
    static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
    static inline OffsetType BaseAddr()
    {
        static const Offset base = Addr(shm().get_address()); // cache value.
        static const OffsetType base = Addr(shm().get_address()); // cache value.
        return base;
    }
    static const uint32_t kMsgTag = 0xf1e2d3c4;
    typedef struct {
    struct Meta {
        RefCount count_;
        const uint32_t tag_ = kMsgTag;
    } Meta;
    Offset offset_;
        const uint32_t size_ = 0;
        Meta(uint32_t size) :
            size_(size) {}
    };
    OffsetType offset_;
    void *Alloc(const size_t size)
    {
        void *p = shm().Alloc(sizeof(Meta) + size);
        if (p) {
            auto pmeta = new (p) Meta;
            auto pmeta = new (p) Meta(size);
            p = pmeta + 1;
        }
        return p;
@@ -136,8 +139,10 @@
    static bool BindShm(SharedMemory &shm) { return SetData(shm); }
    ShmMsg() :
        ShmMsg(nullptr) {}
    explicit ShmMsg(const size_t size) :
        ShmMsg(Alloc(size)) {}
    explicit ShmMsg(const OffsetType offset) :
        offset_(offset) {}
    OffsetType Offset() const { return offset_; }
    OffsetType &OffsetRef() { return offset_; }
    void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
    bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
src/shm_msg_queue.cpp
@@ -29,19 +29,6 @@
    return std::string(buf, n + 4);
}
const int AdjustMQLength(const int len)
{
    const int kMaxLength = 10000;
    const int kDefaultLen = 12;
    if (len <= 0) {
        return kDefaultLen;
    } else if (len < kMaxLength) {
        return len;
    } else {
        return kMaxLength;
    }
}
} // namespace
ShmMsgQueue::MQId ShmMsgQueue::NewId()
@@ -52,13 +39,13 @@
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
    queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    id_(NewId()),
    queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
    queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
{
    if (!queue_.IsOk()) {
        throw("error create msgq " + std::to_string(id_));
@@ -72,7 +59,7 @@
    Queue *q = Find(shm, id);
    if (q) {
        MsgI msg;
        while (q->TryRead(msg)) {
        while (q->TryRead(msg.OffsetRef())) {
            msg.Release();
        }
    }
@@ -90,7 +77,7 @@
    bool r = false;
    if (remote) {
        msg.AddRef();
        r = remote->TryWrite(msg);
        r = remote->TryWrite(msg.Offset());
        if (!r) {
            msg.Release();
        }
src/shm_msg_queue.h
@@ -26,7 +26,8 @@
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
    typedef ShmObject<SharedQueue<MsgI>> Shmq;
    typedef ShmObject<SharedQ63<4>> Shmq;
    // typedef ShmObject<SharedQueue<int64_t>> Shmq;
    typedef Shmq::ShmType ShmType;
    typedef Shmq::Data Queue;
    typedef std::function<void()> OnSend;
@@ -43,15 +44,15 @@
    MQId Id() const { return id_; }
    ShmType &shm() const { return queue_.shm(); }
    bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
    bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
    bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); }
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
    bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
private:
    MQId id_;
    Shmq &queue() { return queue_; }
    Queue &queue() { return *queue_.data(); }
    Shmq queue_;
};
src/shm_queue.h
@@ -53,8 +53,32 @@
    bool TryWrite(const D &d) { return queue_.push_back(d); }
private:
    typedef Circular<D> Queue;
    Queue queue_;
    Circular<D> queue_;
};
template <int Power = 4>
class SharedQ63
{
public:
    typedef int64_t Data;
    bool Read(Data &d, const int timeout_ms)
    {
        using namespace std::chrono;
        auto end_time = steady_clock::now() + milliseconds(timeout_ms);
        do {
            if (TryRead(d)) {
                return true;
            } else {
                robust::QuickSleep();
            }
        } while (steady_clock::now() < end_time);
        return false;
    }
    bool TryRead(Data &d, const bool try_more = true) { return queue_.pop_front(d, try_more); }
    bool TryWrite(const Data d, const bool try_more = true) { return queue_.push_back(d, try_more); }
private:
    robust::AtomicQueue<Power, Data> queue_;
};
} // namespace bhome_shm
utest/api_test.cpp
@@ -149,7 +149,7 @@
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        BHFree(reply, reply_len);
        // printf("register topic : %s\n", r ? "ok" : "failed");
        Sleep(1s);
        // Sleep(1s);
    }
    { // Subscribe
utest/speed_test.cpp
@@ -16,9 +16,6 @@
 * =====================================================================================
 */
#include "util.h"
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost::posix_time;
BOOST_AUTO_TEST_CASE(SpeedTest)
{
@@ -49,14 +46,18 @@
    };
    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
        ShmMsgQueue mq(id, shm, 1000);
        auto now = []() { return steady_clock::now(); };
        auto tm = now();
        while (*run) {
            MsgI msg;
            BHMsgHead head;
            if (mq.Recv(msg, timeout)) {
            if (mq.TryRecv(msg)) {
                DEFER1(msg.Release());
                // ok
                tm = now();
            } else if (isfork) {
                exit(0); // for forked quit after 1s.
                if (now() > tm + 1s) {
                    exit(0); // for forked quit after 1s.
                }
            }
        }
    };
@@ -70,8 +71,8 @@
        }
    };
    int nwriters[] = {1, 2, 4};
    int nreaders[] = {1, 2};
    int nwriters[] = {1, 4, 16};
    int nreaders[] = {1, 4};
    auto Test = [&](auto &www, auto &rrr, bool isfork) {
        for (auto nreader : nreaders) {
utest/util.h
@@ -22,7 +22,6 @@
#include "bh_util.h"
#include "shm.h"
#include "topic_node.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/noncopyable.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/timer/timer.hpp>
@@ -34,7 +33,6 @@
#include <thread>
#include <vector>
using namespace boost::posix_time;
using namespace std::chrono_literals;
using namespace std::chrono;