lichao
2021-04-23 02ba913dc7bb5d711471b27f2ea23a897d0f2e28
bind msgi to shm, change offset_ptr to abs offset.
13个文件已修改
384 ■■■■■ 已修改文件
box/center_main.cc 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 173 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc
@@ -86,6 +86,7 @@
int center_main(int argc, const char *argv[])
{
    auto &shm = BHomeShm();
    MsgI::BindShm(shm);
    AppArg args(argc, argv);
    if (args.Has("remove")) {
src/bh_api.cpp
@@ -10,6 +10,7 @@
{
TopicNode &ProcNode()
{
    static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm());
    static TopicNode node(BHomeShm());
    return node;
}
src/failed_msg.cpp
@@ -23,7 +23,7 @@
    return [remote, msg](void *valid_sock) mutable {
        assert(valid_sock);
        ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
        DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release.
        DEFER1(msg.Release()); // Release() is not const, but it's safe to release.
        return sock.Send(remote.data(), msg);
    };
}
src/msg.cpp
@@ -20,76 +20,6 @@
namespace bhome_msg
{
/*TODO change msg format, header has proc info;
reply has errer msg
    center accept request and route.;
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
void *MsgI::Alloc(SharedMemory &shm, const size_t size)
{
    void *p = shm.Alloc(sizeof(Meta) + size);
    if (p) {
        auto pmeta = new (p) Meta;
        p = pmeta + 1;
    }
    return p;
}
void MsgI::Free(SharedMemory &shm)
{
    assert(valid());
    shm.Dealloc(meta());
    ptr_ = nullptr;
    assert(!valid());
}
void *MsgI::Pack(SharedMemory &shm,
                 const uint32_t head_len, const ToArray &headToArray,
                 const uint32_t body_len, const ToArray &bodyToArray)
{
    void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len);
    if (addr) {
        auto p = static_cast<char *>(addr);
        auto Pack1 = [&p](auto len, auto &writer) {
            Put32(p, len);
            p += sizeof(len);
            writer(p, len);
            p += len;
        };
        Pack1(head_len, headToArray);
        Pack1(body_len, bodyToArray);
    }
    return addr;
}
bool MsgI::ParseHead(BHMsgHead &head) const
{
    auto p = get<char>();
    assert(p);
    uint32_t msg_size = Get32(p);
    p += 4;
    return head.ParseFromArray(p, msg_size);
}
bool MsgI::Make(SharedMemory &shm, void *p)
{
    if (!p) {
        return false;
    }
    MsgI(p).swap(*this);
    return true;
}
int MsgI::Release(SharedMemory &shm)
{
    if (!valid()) {
        return 0;
    }
    auto n = meta()->count_.Dec();
    if (n == 0) {
        Free(shm);
    }
    return n;
}
} // namespace bhome_msg
src/msg.h
@@ -31,83 +31,150 @@
{
using namespace bhome_shm;
// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message content layout: (meta) / header_size + header + data_size + data
typedef boost::uuids::uuid MQId;
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
{
    std::atomic<int> num_;
public:
    RefCount() :
        num_(1) { static_assert(std::is_pod<decltype(num_)>::value); }
    int Inc() { return ++num_; }
    int Dec() { return --num_; }
    int Get() { return num_.load(); }
};
// message content layout: header_size + header + data_size + data
class MsgI
class ShmMsg
{
private:
    // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
    class RefCount : private boost::noncopyable
    {
        std::atomic<int> num_;
    public:
        RefCount() :
            num_(1) { static_assert(std::is_pod<decltype(num_)>::value); }
        int Inc() { return ++num_; }
        int Dec() { return --num_; }
        int Get() { return num_.load(); }
    };
    typedef int64_t Offset;
    static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
    static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
    static inline Offset BaseAddr()
    {
        static const Offset base = Addr(shm().get_address()); // cache value.
        return base;
    }
    static inline SharedMemory &shm()
    {
        if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); }
        return *pshm();
    }
    static inline SharedMemory *&pshm()
    {
        static SharedMemory *pshm = 0;
        return pshm;
    }
    struct Meta {
        RefCount count_;
    };
    offset_ptr<void> ptr_;
    void *Alloc(SharedMemory &shm, const size_t size);
    void Free(SharedMemory &shm);
    Offset offset_;
    void *Alloc(const size_t size)
    {
        void *p = shm().Alloc(sizeof(Meta) + size);
        if (p) {
            auto pmeta = new (p) Meta;
            p = pmeta + 1;
        }
        return p;
    }
    void Free()
    {
        assert(valid());
        shm().Dealloc(meta());
        offset_ = 0;
        assert(!valid());
    }
    Meta *meta() const { return get<Meta>() - 1; }
    typedef std::function<void(void *p, int len)> ToArray;
    void *Pack(SharedMemory &shm,
               const uint32_t head_len, const ToArray &headToArray,
               const uint32_t body_len, const ToArray &bodyToArray);
    void *Pack(const uint32_t head_len, const ToArray &headToArray,
               const uint32_t body_len, const ToArray &bodyToArray)
    {
        void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
        if (addr) {
            auto p = static_cast<char *>(addr);
            auto Pack1 = [&p](auto len, auto &writer) {
                Put32(p, len);
                p += sizeof(len);
                writer(p, len);
                p += len;
            };
            Pack1(head_len, headToArray);
            Pack1(body_len, bodyToArray);
        }
        return addr;
    }
    template <class Body>
    void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    void *Pack(const BHMsgHead &head, const Body &body)
    {
        return Pack(
            shm,
            uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
            uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
    }
    void *Pack(SharedMemory &shm, const std::string &content)
    void *Pack(const std::string &content)
    {
        void *addr = Alloc(shm, content.size());
        void *addr = Alloc(content.size());
        if (addr) {
            memcpy(addr, content.data(), content.size());
        }
        return addr;
    }
    bool Make(SharedMemory &shm, void *addr);
    MsgI(void *p) :
        ptr_(p) {}
    bool Make(void *addr)
    {
        if (!addr) {
            return false;
        }
        ShmMsg(addr).swap(*this);
        return true;
    }
    ShmMsg(void *p) :
        offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
    template <class T = void>
    T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
public:
    MsgI() :
        MsgI(nullptr) {}
    MsgI(SharedMemory &shm, const size_t size) :
        MsgI(Alloc(shm, size)) {}
    void swap(MsgI &a) { std::swap(ptr_, a.ptr_); }
    bool valid() const { return static_cast<bool>(ptr_); }
    template <class T = void>
    T *get() const { return static_cast<T *>(ptr_.get()); }
    static bool BindShm(SharedMemory &shm)
    {
        assert(!pshm());
        pshm() = &shm;
        return true;
    }
    ShmMsg() :
        ShmMsg(nullptr) {}
    explicit ShmMsg(const size_t size) :
        ShmMsg(Alloc(size)) {}
    void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
    bool valid() const { return static_cast<bool>(offset_); }
    // AddRef and Release works for both counted and not counted msg.
    int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
    int Release(SharedMemory &shm);
    int Release()
    {
        if (!valid()) {
            return 0;
        }
        auto n = meta()->count_.Dec();
        if (n == 0) {
            Free();
        }
        return n;
    }
    int Count() const { return valid() ? meta()->count_.Get() : 1; }
    template <class Body>
    inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return Make(shm, Pack(shm, head, body));
    }
    inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); }
    inline bool Make(const std::string &content) { return Make(Pack(content)); }
    template <class Body>
    static inline std::string Serialize(const BHMsgHead &head, const Body &body)
    {
@@ -126,17 +193,19 @@
        assert(pos == s.size());
        return s;
    }
    inline bool Make(SharedMemory &shm, const std::string &content)
    {
        void *p = Pack(shm, content);
        return Make(shm, p);
    }
    bool ParseHead(BHMsgHead &head) const;
    bool ParseHead(BHMsgHead &head) const
    {
        auto p = get<char>();
        assert(p);
        uint32_t msg_size = Get32(p);
        p += 4;
        return head.ParseFromArray(p, msg_size);
    }
    template <class Body>
    bool ParseBody(Body &body) const
    {
        auto p = static_cast<char *>(ptr_.get());
        auto p = get<char>();
        assert(p);
        uint32_t size = Get32(p);
        p += 4;
@@ -147,7 +216,9 @@
    }
};
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); }
typedef ShmMsg MsgI;
} // namespace bhome_msg
src/sendq.cpp
@@ -33,7 +33,7 @@
            info.on_expire_(info.data_);
        }
        if (info.data_.index() == 0) {
            boost::variant2::get<0>(info.data_).Release(mq.shm());
            boost::variant2::get<0>(info.data_).Release();
        }
    }
@@ -43,13 +43,13 @@
            auto &msg = boost::variant2::get<0>(pos->data().data_);
            r = mq.TrySend(*(MQId *) remote.data(), msg);
            if (r) {
                msg.Release(mq.shm());
                msg.Release();
            }
        } else {
            auto &content = boost::variant2::get<1>(pos->data().data_);
            MsgI msg;
            if (msg.Make(mq.shm(), content)) {
                DEFER1(msg.Release(mq.shm()););
            if (msg.Make(content)) {
                DEFER1(msg.Release(););
                r = mq.TrySend(*(MQId *) remote.data(), msg);
            }
        }
src/socket.cpp
@@ -59,7 +59,7 @@
                return false;
            }
            auto onMsg = [&](MsgI &imsg) {
                DEFER1(imsg.Release(shm()));
                DEFER1(imsg.Release());
                BHMsgHead head;
                if (imsg.ParseHead(head)) {
                    onRecvWithPerMsgCB(*this, imsg, head);
@@ -118,7 +118,7 @@
        if (msg.ParseHead(head)) {
            return true;
        } else {
            msg.Release(shm());
            msg.Release();
        }
    }
    return false;
src/topic_node.cpp
@@ -111,7 +111,7 @@
        return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        if (r) {
@@ -139,7 +139,7 @@
        return sock.Send(&BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
@@ -172,7 +172,7 @@
        return sock.Send(&BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply;
@@ -366,7 +366,7 @@
            head.set_topic(request.topic());
            MsgI reply_msg;
            DEFER1(reply_msg.Release(shm_););
            DEFER1(reply_msg.Release(););
            BHMsgHead reply_head;
            if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) &&
@@ -403,7 +403,7 @@
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_));
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) {
@@ -442,7 +442,7 @@
            return sock.Send(&BHTopicBusAddress(), head, pub);
        } else {
            MsgI reply;
            DEFER1(reply.Release(shm()););
            DEFER1(reply.Release(););
            BHMsgHead reply_head;
            MsgCommonReply reply_body;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
@@ -475,7 +475,7 @@
            return sock.Send(&BHTopicBusAddress(), head, sub);
        } else {
            MsgI reply;
            DEFER1(reply.Release(shm()););
            DEFER1(reply.Release(););
            BHMsgHead reply_head;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
@@ -515,7 +515,7 @@
    auto &sock = SockSub();
    MsgI msg;
    DEFER1(msg.Release(shm()););
    DEFER1(msg.Release(););
    BHMsgHead head;
    //TODO error msg.
    if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
utest/api_test.cpp
@@ -153,17 +153,50 @@
    }
    void unlock() { mutex_.unlock(); }
};
namespace
{
typedef int64_t Offset;
Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
} // namespace
BOOST_AUTO_TEST_CASE(MutexTest)
{
    const std::string shm_name("ShmMutex");
    // ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 10);
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    void *base_ptr = shm.get_address();
    auto PrintPtr = [&](void *p) {
        printf("addr: %ld, ptr: %p, offset: %ld\n", Addr(p), p, Addr(p) - Addr(base_ptr));
    };
    printf("base");
    PrintPtr(base_ptr);
    MsgI msg;
    msg.Make("string data");
    for (int i = 0; i < 10; ++i) {
        int n = msg.AddRef();
        printf("add %d ref: %d\n", i, n);
    }
    for (int i = 0; i < 10; ++i) {
        int n = msg.Release();
        printf("release %d, ref : %d\n", i, n);
    }
    std::this_thread::sleep_for(1s);
    msg.Release();
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s);
    auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
    auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
    printf("mutetx ");
    PrintPtr(mtx);
    printf("int ");
    PrintPtr(pi);
    typedef std::chrono::steady_clock Clock;
    auto Now = []() { return Clock::now().time_since_epoch(); };
    if (pi) {
utest/simple_tests.cpp
@@ -106,9 +106,8 @@
BOOST_AUTO_TEST_CASE(TimedWaitTest)
{
    const std::string shm_name("shm_wait");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024);
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    ShmMsgQueue q(shm, 64);
    for (int i = 0; i < 2; ++i) {
        int ms = i * 100;
@@ -122,19 +121,19 @@
BOOST_AUTO_TEST_CASE(RefCountTest)
{
    const std::string shm_name("ShmRefCount");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024);
    SharedMemory &shm = TestShm();
    typedef MsgI Msg;
    Msg::BindShm(shm);
    MsgI m0(shm, 1000);
    Msg m0(1000);
    BOOST_CHECK(m0.valid());
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    MsgI m1 = m0;
    Msg m1 = m0;
    BOOST_CHECK(m1.valid());
    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
    BOOST_CHECK_EQUAL(m0.Release(shm), 2);
    BOOST_CHECK_EQUAL(m0.Release(shm), 1);
    BOOST_CHECK_EQUAL(m1.Release(shm), 0);
    BOOST_CHECK_EQUAL(m0.Release(), 2);
    BOOST_CHECK_EQUAL(m0.Release(), 1);
    BOOST_CHECK_EQUAL(m1.Release(), 0);
    BOOST_CHECK(!m1.valid());
}
utest/speed_test.cpp
@@ -22,16 +22,16 @@
BOOST_AUTO_TEST_CASE(SpeedTest)
{
    const std::string shm_name("ShmSpeed");
    ShmRemover auto_remove(shm_name);
    const int mem_size = 1024 * 1024 * 50;
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    MQId id = boost::uuids::random_generator()();
    const int timeout = 1000;
    const uint32_t data_size = 4000;
    const std::string proc_id = "demo_proc";
    auto Writer = [&](int writer_id, uint64_t n) {
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(shm, 64);
        std::string str(data_size, 'a');
        MsgI msg;
@@ -39,22 +39,21 @@
        body.set_topic("topic");
        body.set_data(str);
        auto head(InitMsgHead(GetType(body), proc_id));
        msg.Make(shm, head, body);
        msg.Make(head, body);
        assert(msg.valid());
        DEFER1(msg.Release(shm););
        DEFER1(msg.Release(););
        for (uint64_t i = 0; i < n; ++i) {
            while (!mq.TrySend(id, msg)) {}
        }
    };
    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(id, shm, 1000);
        while (*run) {
            MsgI msg;
            BHMsgHead head;
            if (mq.Recv(msg, timeout)) {
                DEFER1(msg.Release(shm));
                DEFER1(msg.Release());
                // ok
            } else if (isfork) {
                exit(0); // for forked quit after 1s.
@@ -62,7 +61,6 @@
        }
    };
    auto State = [&](std::atomic<bool> *run) {
        SharedMemory shm(shm_name, mem_size);
        auto init = shm.get_free_memory();
        printf("shm init : %ld\n", init);
        while (*run) {
@@ -116,8 +114,6 @@
// Send Recv Test
BOOST_AUTO_TEST_CASE(SRTest)
{
    const std::string shm_name("ShmSendRecv");
    ShmRemover auto_remove(shm_name);
    const int qlen = 64;
    const size_t msg_length = 100;
    std::string msg_content(msg_length, 'a');
@@ -125,7 +121,9 @@
    const std::string client_proc_id = "client_proc";
    const std::string server_proc_id = "server_proc";
    SharedMemory shm(shm_name, 1024 * 1024 * 512);
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    ShmSocket srv(shm, qlen);
@@ -174,7 +172,7 @@
        while (!stop) {
            if (srv.SyncRecv(req, req_head, 10)) {
                DEFER1(req.Release(shm));
                DEFER1(req.Release());
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
                    auto &mqid = req_head.route()[0].mq_id();
utest/utest.cpp
@@ -10,7 +10,14 @@
#include <thread>
#include <vector>
using namespace bhome_shm;
using namespace bhome_msg;
SharedMemory &TestShm()
{
    static SharedMemory shm("utest_0", 1024 * 1024 * 512);
    return shm;
}
template <class A, class B>
struct IsSameType {
@@ -84,10 +91,9 @@
BOOST_AUTO_TEST_CASE(PubSubTest)
{
    const std::string shm_name("ShmPubSub");
    ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    DEFER1(shm.Remove());
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    int *flag = shm.find_or_construct<int>("flag")(123);
@@ -196,9 +202,8 @@
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
    const std::string shm_name("ShmReqRep");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 512);
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
utest/util.h
@@ -20,6 +20,7 @@
#define UTIL_W8A0OA5U
#include "bh_util.h"
#include "shm.h"
#include "topic_node.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/noncopyable.hpp>
@@ -34,7 +35,6 @@
#include <vector>
using namespace boost::posix_time;
using namespace std::chrono_literals;
template <class D>
@@ -132,4 +132,6 @@
    }
};
bhome_shm::SharedMemory &TestShm();
#endif // end of include guard: UTIL_W8A0OA5U