lichao
2021-04-23 dc86ace85e437ecb8a2e728e4dce36d02bbb8a6e
move ref count into msg meta, only 1 poinetr now.
9个文件已修改
161 ■■■■ 已修改文件
box/center.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/status_main.cc 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.cpp 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -454,7 +454,6 @@
                replyer(reply);
            } else {
                replyer(MakeReply(eSuccess));
                if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
                if (clients.empty()) { return; }
                auto it = clients.begin();
box/status_main.cc
@@ -31,8 +31,22 @@
int status_main(int argc, char const *argv[])
{
    auto &shm = BHomeShm();
    AppArg args(argc, argv);
    auto shm_name = args.Get("shm", BHomeShm().name());
    auto shm_size = std::atol(args.Get("size", "").c_str());
    if (shm_size <= 0 || shm_size > 512) {
        shm_size = 50;
    }
    auto DisplayName = [&]() -> std::string {
        if (shm_name == BHomeShm().name()) {
            return "[bhome shm]";
        } else {
            return shm_name;
        }
    };
    printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size);
    SharedMemory shm(shm_name, 1024 * 1024 * shm_size);
    std::atomic<bool> run(true);
    auto Now = []() { return steady_clock::now(); };
src/failed_msg.cpp
@@ -17,18 +17,13 @@
 */
#include "failed_msg.h"
FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg)
FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg msg)
{
    msg.AddRef();
    return [remote, msg](void *valid_sock) {
    return [remote, msg](void *valid_sock) mutable {
        assert(valid_sock);
        ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
        bool r = sock.Send(remote.data(), msg);
        //TODO check remote removed.
        if (r && msg.IsCounted()) {
            auto tmp = msg; // Release() is not const, but it's safe to release.
            tmp.Release(sock.shm());
        }
        return r;
        DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release.
        return sock.Send(remote.data(), msg);
    };
}
src/failed_msg.h
@@ -40,7 +40,7 @@
    }
private:
    Func PrepareSender(const std::string &remote, Msg const &msg);
    Func PrepareSender(const std::string &remote, Msg msg);
    TimedFuncQ queue_;
};
src/msg.cpp
@@ -26,11 +26,28 @@
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
void *MsgI::Alloc(SharedMemory &shm, const size_t size)
{
    void *p = shm.Alloc(sizeof(Meta) + size);
    if (p) {
        auto pmeta = new (p) Meta;
        p = pmeta + 1;
    }
    return p;
}
void MsgI::Free(SharedMemory &shm)
{
    assert(valid());
    shm.Dealloc(meta());
    ptr_ = nullptr;
    assert(!valid());
}
void *MsgI::Pack(SharedMemory &shm,
                 const uint32_t head_len, const ToArray &headToArray,
                 const uint32_t body_len, const ToArray &bodyToArray)
{
    void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
    void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len);
    if (addr) {
        auto p = static_cast<char *>(addr);
        auto Pack1 = [&p](auto len, auto &writer) {
@@ -47,26 +64,11 @@
bool MsgI::ParseHead(BHMsgHead &head) const
{
    auto p = static_cast<char *>(ptr_.get());
    auto p = get<char>();
    assert(p);
    uint32_t msg_size = Get32(p);
    p += 4;
    return head.ParseFromArray(p, msg_size);
}
// with ref count;
bool MsgI::MakeRC(SharedMemory &shm, void *p)
{
    if (!p) {
        return false;
    }
    RefCount *rc = shm.New<RefCount>();
    if (!rc) {
        shm.Dealloc(p);
        return false;
    }
    MsgI(p, rc).swap(*this);
    return true;
}
bool MsgI::Make(SharedMemory &shm, void *p)
@@ -74,32 +76,20 @@
    if (!p) {
        return false;
    }
    MsgI(p, 0).swap(*this);
    MsgI(p).swap(*this);
    return true;
}
bool MsgI::EnableRefCount(SharedMemory &shm)
{
    if (!IsCounted()) {
        count_ = shm.New<RefCount>();
    }
    return IsCounted();
}
int MsgI::Release(SharedMemory &shm)
{
    if (IsCounted()) {
        const int n = count_->Dec();
        if (n != 0) {
            return n;
        }
    if (!valid()) {
        return 0;
    }
    // free data
    shm.Dealloc(ptr_);
    ptr_ = 0;
    shm.Delete(count_);
    count_ = 0;
    return 0;
    auto n = meta()->count_.Dec();
    if (n == 0) {
        Free(shm);
    }
    return n;
}
} // namespace bhome_msg
src/msg.h
@@ -53,8 +53,13 @@
class MsgI
{
private:
    struct Meta {
        RefCount count_;
    };
    offset_ptr<void> ptr_;
    offset_ptr<RefCount> count_;
    void *Alloc(SharedMemory &shm, const size_t size);
    void Free(SharedMemory &shm);
    Meta *meta() const { return get<Meta>() - 1; }
    typedef std::function<void(void *p, int len)> ToArray;
    void *Pack(SharedMemory &shm,
@@ -72,48 +77,36 @@
    void *Pack(SharedMemory &shm, const std::string &content)
    {
        void *addr = shm.Alloc(content.size());
        void *addr = Alloc(shm, content.size());
        if (addr) {
            memcpy(addr, content.data(), content.size());
        }
        return addr;
    }
    bool MakeRC(SharedMemory &shm, void *addr);
    bool Make(SharedMemory &shm, void *addr);
    MsgI(void *p) :
        ptr_(p) {}
public:
    MsgI(void *p = 0, RefCount *c = 0) :
        ptr_(p), count_(c) {}
    void swap(MsgI &a)
    {
        std::swap(ptr_, a.ptr_);
        std::swap(count_, a.count_);
    }
    MsgI() :
        MsgI(nullptr) {}
    MsgI(SharedMemory &shm, const size_t size) :
        MsgI(Alloc(shm, size)) {}
    void swap(MsgI &a) { std::swap(ptr_, a.ptr_); }
    bool valid() const { return static_cast<bool>(ptr_); }
    template <class T = void>
    T *get() { return static_cast<T *>(ptr_.get()); }
    T *get() const { return static_cast<T *>(ptr_.get()); }
    // AddRef and Release works for both counted and not counted msg.
    int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
    int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
    int Release(SharedMemory &shm);
    int Count() const { return valid() ? meta()->count_.Get() : 1; }
    int Count() const { return IsCounted() ? count_->Get() : 1; }
    bool IsCounted() const { return static_cast<bool>(count_); }
    template <class Body>
    inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return MakeRC(shm, Pack(shm, head, body));
    }
    bool EnableRefCount(SharedMemory &shm);
    template <class Body>
    inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        void *p = Pack(shm, head, body);
        auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
        return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
        return Make(shm, Pack(shm, head, body));
    }
    template <class Body>
    static inline std::string Serialize(const BHMsgHead &head, const Body &body)
src/sendq.cpp
@@ -42,17 +42,15 @@
        if (d.index() == 0) {
            auto &msg = boost::variant2::get<0>(pos->data().data_);
            r = mq.TrySend(*(MQId *) remote.data(), msg);
            if (r && msg.IsCounted()) {
            if (r) {
                msg.Release(mq.shm());
            }
        } else {
            auto &content = boost::variant2::get<1>(pos->data().data_);
            MsgI msg;
            if (msg.Make(mq.shm(), content)) {
                DEFER1(msg.Release(mq.shm()););
                r = mq.TrySend(*(MQId *) remote.data(), msg);
                if (!r || msg.IsCounted()) {
                    msg.Release(mq.shm());
                }
            }
        }
        return r;
utest/simple_tests.cpp
@@ -126,15 +126,15 @@
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024);
    MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
    BOOST_CHECK(m0.IsCounted());
    MsgI m0(shm, 1000);
    BOOST_CHECK(m0.valid());
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    MsgI m1 = m0;
    BOOST_CHECK(m1.IsCounted());
    BOOST_CHECK(m1.valid());
    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
    BOOST_CHECK_EQUAL(m0.Release(shm), 2);
    BOOST_CHECK_EQUAL(m0.Release(shm), 1);
    BOOST_CHECK_EQUAL(m1.Release(shm), 0);
    BOOST_CHECK(!m1.IsCounted());
    BOOST_CHECK(!m1.valid());
}
utest/speed_test.cpp
@@ -39,12 +39,12 @@
        body.set_topic("topic");
        body.set_data(str);
        auto head(InitMsgHead(GetType(body), proc_id));
        msg.MakeRC(shm, head, body);
        assert(msg.IsCounted());
        msg.Make(shm, head, body);
        assert(msg.valid());
        DEFER1(msg.Release(shm););
        for (uint64_t i = 0; i < n; ++i) {
            mq.Send(id, msg, timeout);
            while (!mq.TrySend(id, msg)) {}
        }
    };
    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {