lichao
2021-03-26 ea2ee85202f7b16d7b713bc7a7dcd1fa63bc6213
change refcount to AddRef,Release interface.
5个文件已修改
86 ■■■■ 已修改文件
src/msg.cpp 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp
@@ -59,9 +59,18 @@
}
void Msg::FreeFrom(SharedMemory &shm)
int Msg::Release(SharedMemory &shm) const
{
    if (IsCounted()) {
        const int n = count_->Dec();
        if (n != 0) {
            return n;
        }
    }
    // free data
    shm.Dealloc(ptr_);
    shm.Delete(count_);
    return 0;
}
} // namespace bhome_shm
src/msg.h
@@ -69,20 +69,15 @@
public:
    Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
    void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
    // ~Msg() { RemoveRef(); }
    // Msg(const Msg &a):ptr_(a.ptr_), count_(a.count_) { AddRef(); }
    // Msg(Msg &&a):ptr_(a.ptr_), count_(a.count_) { a.ptr_ = 0; a.count_ = 0; }
    // Msg & operator = (const Msg &a) { Msg(a).swap(*this); }
    // Msg & operator = (Msg &&a) { Msg(std::move(a)).swap(*this); }
    template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
    // AddRef and Release works for both counted and not counted msg.
    int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
    int RemoveRef()  const{ return IsCounted() ? count_->Dec() : 0; }
    int Release(SharedMemory &shm) const;
    int Count()  const{ return IsCounted() ? count_->Get() : 1; }
    bool IsCounted() const { return static_cast<bool>(count_); }
    bool Build(SharedMemory &shm, const MQId &src_id, const void *p, const size_t size, const bool refcount);
    void FreeFrom(SharedMemory &shm);
};
inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
src/shm_queue.cpp
@@ -48,17 +48,7 @@
bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
{
    Queue *remote = find(MsgQIdToName(remote_id));
    if(!remote) {
        return false;
    }
    msg.AddRef();
    if (remote->Write(msg, timeout_ms)) {
        return true;
    } else {
        msg.RemoveRef();
        return false;
    }
    return remote && remote->Write(msg, timeout_ms, [&](){msg.AddRef();});
}
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
@@ -69,13 +59,11 @@
    // 1 is about 50% faster than 2, maybe cache related.
    Msg msg;
    if (msg.Build(shm(), Id(), data, size, false)) {
        if (Send(remote_id, msg, timeout_ms)) {
    if(msg.Build(shm(), Id(), data, size, false)) {
        if(Send(remote_id, msg, timeout_ms)) {
            return true;
        } else {
            if (msg.RemoveRef() == 0) { // works for both refcounted and not counted.
                msg.FreeFrom(shm());
            }
            msg.Release(shm());
        }
    }
    return false;
@@ -85,7 +73,7 @@
{
    Msg msg;
    if (Read(msg, timeout_ms)) {
        DEFER1(if (msg.RemoveRef() == 0) { msg.FreeFrom(shm()); });
        DEFER1(msg.Release(shm()););
        auto ptr = msg.get<char>();
        if (ptr) {
src/shm_queue.h
@@ -53,9 +53,11 @@
    using Super::size;
    using Super::capacity;
    const MQId &Id() const { return id_; }
    bool Write(const D &buf, const int timeout_ms) {
    template <class OnWrite>
    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
        Guard lock(mutex());
        if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
            onWrite();
            this->push_back(buf);
            cond_read_.notify_one();
            return true;
@@ -63,6 +65,7 @@
            return false;
        }
    }
    bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](){}); }
    bool Read(D &buf, const int timeout_ms){
        Guard lock(mutex());
@@ -92,6 +95,7 @@
    bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
    const MQId &Id() const { return data()->Id(); }
    bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
    bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
};
} // namespace bhome_shm
utest/utest.cpp
@@ -10,6 +10,7 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/microsec_time_clock.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "shm_queue.h"
#include "bh_util.h"
@@ -151,10 +152,10 @@
    BOOST_CHECK(m1.IsCounted());
    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
    BOOST_CHECK_EQUAL(m0.RemoveRef(), 2);
    BOOST_CHECK_EQUAL(m0.RemoveRef(), 1);
    BOOST_CHECK_EQUAL(m1.RemoveRef(), 0);
    BOOST_CHECK_EQUAL(m1.Count(), 0);
    BOOST_CHECK_EQUAL(m0.Release(shm), 2);
    BOOST_CHECK_EQUAL(m0.Release(shm), 1);
    BOOST_CHECK_EQUAL(m1.Release(shm), 0);
    BOOST_CHECK_THROW(m1.Count(), std::exception);
}
BOOST_AUTO_TEST_CASE(MsgHeaderTest)
@@ -209,7 +210,7 @@
    auto Client = [&](int tid, int nmsg){
        for (int i = 0; i < nmsg; ++i) {
            auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
            // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
            if (!Send()) {
                printf("********** client send error.\n");
@@ -246,9 +247,9 @@
            if (srv.Recv(src_id, data, size, 100)) {
                DEFER1(free(data));
                auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
                auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
                // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
                if (SendRefCounted()) {
                if (Send()) {
                    if (size != msg_content.size()) {
                        BOOST_TEST(false, "server msg size error");
                    }
@@ -263,7 +264,7 @@
    ThreadManager clients, servers;
    for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
    int ncli = 100*1;
    uint64_t nmsg = 100*100;
    uint64_t nmsg = 100*100*10;
    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
    clients.WaitAll();
@@ -274,18 +275,27 @@
    BOOST_CHECK_EQUAL(request.Count(), 1);
    BOOST_CHECK(reply.IsCounted());
    BOOST_CHECK_EQUAL(reply.Count(), 1);
    if (request.RemoveRef() == 0) {
        BOOST_CHECK_EQUAL(reply.Count(), 0);
        request.FreeFrom(shm);
    }
    request.Release(shm);
    BOOST_CHECK_THROW(request.Count(), std::exception);
    BOOST_CHECK_THROW(reply.Count(), std::exception);
    // BOOST_CHECK_THROW(reply.Count(), int);
}
inline int MyMin(int a, int b) {
    printf("MyMin\n");
    return a < b ? a : b;
}
int test_main(int argc, char *argv[])
{
    printf("test main\n");
    int a = 0;
    int b = 0;
    BOOST_CHECK_EQUAL(a, b);
    int n = MyMin(4,6);
    for (int i = 0; i < n; ++i) {
        printf("i = %d\n", i);
    }
    return 0;
}