lichao
2021-06-01 43d4e95770b0519341153202c9a535aaa8e164c5
refactor, remove useless code.
1个文件已删除
5个文件已修改
208 ■■■■ 已修改文件
src/robust.cpp 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp
@@ -16,9 +16,16 @@
 * =====================================================================================
 */
#include "robust.h"
#include <chrono>
#include <thread>
using namespace std::chrono;
using namespace std::chrono_literals;
namespace
{
void yield() { std::this_thread::sleep_for(10us); }
} // namespace
namespace robust
{
src/robust.h
@@ -23,23 +23,11 @@
#include "log.h"
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <sys/file.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>
namespace robust
{
using namespace std::chrono;
using namespace std::chrono_literals;
/*
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
@@ -100,16 +88,13 @@
    std::atomic<size_type> tail_;
    AData buf[capacity];
};
//*/
template <class Int>
class AtomicQueue<0, Int>
class AtomicQ63
{
    typedef Int Data;
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
public:
    AtomicQueue() { memset(this, 0, sizeof(*this)); }
    typedef int64_t Data;
    AtomicQ63() { memset(this, 0, sizeof(*this)); }
    bool push(const Data d, bool try_more = false)
    {
        auto cur = buf.load();
@@ -122,13 +107,15 @@
        if (r) { d = Dec(cur); }
        return r;
    }
    uint32_t head() const { return 0; }
    uint32_t tail() const { return 0; }
private:
    static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
    static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
    static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
    AData buf;
};
@@ -149,7 +136,7 @@
    static int GetState(Data d) { return d & MaskBits(3); }
    static Data Encode(Data d, State st) { return (d << 3) | st; }
    static Data Decode(Data d) { return d >> 3; }
    static void yield() { std::this_thread::sleep_for(10us); }
    typedef std::chrono::steady_clock steady_clock;
    typedef steady_clock::duration Duration;
    Duration now() { return steady_clock::now().time_since_epoch(); }
src/shm_msg_queue.h
@@ -29,7 +29,7 @@
{
public:
    typedef int64_t RawData;
    typedef ShmObject<SharedQ63<0>> Shmq;
    typedef ShmObject<SharedQ63> Shmq;
    typedef Shmq::Data Queue;
    typedef Shmq::ShmType ShmType;
    typedef uint64_t MQId;
@@ -43,10 +43,10 @@
    ShmType &shm() const { return queue_.shm(); }
    int64_t AbsAddr() const { return queue_.offset(); }
    bool Recv(RawData &val, const int timeout_ms) { return queue().Read(val, timeout_ms); }
    // bool Recv(RawData &val, const int timeout_ms) { return queue().Read(val, timeout_ms); }
    bool TryRecv(RawData &val) { return queue().TryRead(val); }
    bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
    // bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
    static Queue *Find(ShmType &shm, const MQId remote);
    static bool TrySend(ShmType &shm, const MQInfo &remote, const RawData val);
src/shm_queue.cpp
File was deleted
src/shm_queue.h
@@ -28,82 +28,20 @@
namespace bhome_shm
{
template <class D>
using Circular = boost::circular_buffer<D, Allocator<D>>;
template <class D>
class SharedQueue
{
public:
    SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
        queue_(len, alloc) {}
    bool Read(D &d, const int timeout_ms)
    {
        using namespace std::chrono;
        auto end_time = steady_clock::now() + milliseconds(timeout_ms);
        do {
            if (TryRead(d)) {
                return true;
            } else {
                std::this_thread::sleep_for(1ms);
            }
        } while (steady_clock::now() < end_time);
        return false;
    }
    bool TryRead(D &d)
    {
        // bhome_shm::Guard lock(mutex_);
        if (!queue_.empty()) {
            d = queue_.front();
            queue_.pop_front();
            return true;
        } else {
            return false;
        }
    }
    bool TryWrite(const D &d)
    {
        // bhome_shm::Guard lock(mutex_);
        if (!queue_.full()) {
            queue_.push_back(d);
            return true;
        } else {
            return false;
        }
    }
private:
    Circular<D> queue_;
};
template <int Power = 4>
// just wrap robust::AtomicQ63
class SharedQ63
{
public:
    template <class... T>
    explicit SharedQ63(T &&...t) {} // easy testing
    typedef int64_t Data;
    bool Read(Data &d, const int timeout_ms)
    {
        using namespace std::chrono;
        auto end_time = steady_clock::now() + milliseconds(timeout_ms);
        do {
            for (int i = 0; i < 100; ++i) {
                if (TryRead(d)) {
                    return true;
                }
            }
            std::this_thread::sleep_for(1ms);
        } while (steady_clock::now() < end_time);
        return false;
    }
    typedef robust::AtomicQ63 AQ63;
    typedef AQ63::Data Data;
    bool TryRead(Data &d, const bool try_more = true) { return queue_.pop(d, try_more); }
    bool TryWrite(const Data d, const bool try_more = true) { return queue_.push(d, try_more); }
private:
    robust::AtomicQueue<Power, Data> queue_;
    AQ63 queue_;
};
} // namespace bhome_shm
utest/robust_test.cpp
@@ -68,24 +68,12 @@
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
#if 1
    const int kPower = 0;
    typedef AtomicQueue<kPower> Rcb;
    typedef AtomicQ63 Rcb;
    Rcb tmp;
    // BOOST_CHECK(tmp.like_empty());
    BOOST_CHECK(tmp.push(1));
    if (kPower != 0) {
        BOOST_CHECK(tmp.tail() == 1);
    }
    BOOST_CHECK(tmp.head() == 0);
    int64_t d;
    BOOST_CHECK(tmp.pop(d));
    if (kPower != 0) {
        // BOOST_CHECK(tmp.like_empty());
        BOOST_CHECK(tmp.head() == 1);
        BOOST_CHECK(tmp.tail() == 1);
    }
    ShmObject<Rcb> rcb(shm, "test_rcb");
    bool try_more = true;
@@ -111,58 +99,6 @@
            }
        }
    };
#else
    typedef Circular<int64_t> Rcb;
    ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager());
    typedef FMutex Mutex;
    // typedef SemMutex Mutex;
    Mutex mtx(123);
    auto Writer = [&]() {
        uint64_t n = 0;
        while ((n = nwrite++) < nmsg) {
            auto Write = [&]() {
                robust::Guard<Mutex> lk(mtx);
                if (rcb->full()) {
                    return false;
                } else {
                    rcb->push_back(n);
                    return true;
                }
                // return rcb->push_back(n);
            };
            while (!Write()) {
                // MySleep();
            }
            ++writedone;
        }
    };
    std::atomic<uint64_t> nread(0);
    auto Reader = [&]() {
        while (nread.load() < nmsg) {
            int64_t d;
            auto Read = [&]() {
                robust::Guard<Mutex> lk(mtx);
                if (rcb->empty()) {
                    return false;
                } else {
                    d = rcb->front();
                    rcb->pop_front();
                    return true;
                }
                // return rcb->pop_front(d);
            };
            if (Read()) {
                ++nread;
                total += d;
            } else {
                // MySleep();
            }
        }
    };
#endif
    auto status = [&]() {
        auto next = steady_clock::now();