lichao
2021-03-29 f51636c193d032723c47343e39ff8296db350200
change msg to use protobuf, add more msg type.
8个文件已修改
359 ■■■■ 已修改文件
proto/source/bhome_msg.proto 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 121 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 64 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto
@@ -1,5 +1,7 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
package bhome.msg;
message BHAddress {
@@ -22,6 +24,7 @@
    kMsgTypeReply = 2;
    kMsgTypePublish = 3;
    kMsgTypeSubscribe = 4;
    kMsgTypeUnsubscribe = 5;
}
message DataPub {
src/CMakeLists.txt
@@ -5,4 +5,4 @@
add_library(${Target} ${sources})
target_link_libraries(${Target} pthread rt)
target_link_libraries(${Target} bhome_msg pthread rt)
src/msg.cpp
@@ -16,50 +16,115 @@
 * =====================================================================================
 */
#include "msg.h"
#include "bh_util.h"
namespace bhome_msg {
bool MsgMetaV1::Parse(const void *p)
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
BHMsg InitMsg(MsgType type)
{
    assert(p);
    *this = *static_cast<const MsgMetaV1*>(p);
    return tag_ == kMsgMetaTag;
    BHMsg msg;
    msg.set_type(type);
    time_t tm = 0;
    msg.set_timestamp(time(&tm));
    return msg;
}
void MsgMetaV1::Pack(void *p)
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
{
    *static_cast<MsgMetaV1*>(p) = *this;
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeRequest));
    msg.set_body(data, size);
    BHAddress addr;
    msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
    return msg;
}
bool Msg::Build(SharedMemory &shm, const MQId &src_id, const void *data, const size_t size, const bool refcount)
BHMsg MakeReply(const void *data, const size_t size)
{
    if (!data || !size) {
        return false;
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeReply));
    msg.set_body(data, size);
    return msg;
}
BHMsg MakeSubUnsub(const std::vector<std::string> &topics, const MsgType sub_unsub)
{
    assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
    BHMsg msg(InitMsg(sub_unsub));
    DataSub subs;
    for (auto &t : topics) {
        subs.add_topics(t);
    }
    void *p = shm.Alloc(sizeof(MsgMetaV1) + size);
    if (!p) {
        return false;
    }
    RefCount *rc = 0;
    if (refcount) {
        rc = shm.New<RefCount>();
        if (!rc) {
    msg.set_body(subs.SerializeAsString());
    return msg;
}
BHMsg MakeSub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeSubscribe); }
BHMsg MakeUnsub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeUnsubscribe); }
BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypePublish));
    DataPub pub;
    pub.set_topic(topic);
    pub.set_data(data, size);
    msg.set_body(pub.SerializeAsString());
    return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{
    uint32_t msg_size = msg.ByteSizeLong();
    void *p = shm.Alloc(4 + msg_size);
    if(p) {
        Put32(p, msg_size);
        if (!msg.SerializeToArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size)) {
            shm.Dealloc(p);
            return false;
            p = 0;
        }
    }
    MsgMetaV1 meta;
    meta.data_size_ = size;
    meta.src_id_ = src_id;
    meta.Pack(p);
    memcpy(static_cast<char *>(p) + sizeof(meta), data, size);
    Msg(p, rc).swap(*this);
    return true;
    return p;
}
int Msg::Release(SharedMemory &shm)
bool MsgI::Unpack(BHMsg &msg) const
{
    void *p = ptr_.get();
    assert(p);
    uint32_t msg_size = Get32(p);
    return msg.ParseFromArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size);
}
// with ref count;
bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
{
    void *p = Pack(shm, msg);
    if(!p) {
        return false;
    }
    RefCount *rc = shm.New<RefCount>();
    if (!rc) {
        shm.Dealloc(p);
        return false;
    }
    MsgI(p, rc).swap(*this);
    return true;
}
bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
{
    void *p = Pack(shm, msg);
    if(!p) {
        return false;
    }
    MsgI(p, 0).swap(*this);
    return true;
}
int MsgI::Release(SharedMemory &shm)
{
    if (IsCounted()) {
        const int n = count_->Dec();
src/msg.h
@@ -26,32 +26,12 @@
namespace bhome_msg {
    using namespace bhome_shm;
    using namespace bhome::msg; // for serialized data in Msg
    using namespace bhome::msg; // for serialized data in MsgI
// msg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
enum MsgType {
    kMsgTypeNull = 0,
    kMsgTypeNormal = 1,
    kMsgTypeMaxValue
};
typedef boost::uuids::uuid MQId;
const uint32_t kMsgMetaTag = 0xf1e2d3c4;
struct MsgMetaV1 {
    uint16_t self_size_ = sizeof(MsgMetaV1); // sizeof(*this)
    uint16_t type_ = kMsgTypeNormal; // msg type.
    uint32_t tag_ = kMsgMetaTag;
    uint32_t data_size_ = 0;
    MQId src_id_;
    // more fields add at end, must not change
    MsgMetaV1():src_id_(boost::uuids::nil_uuid()){}
    bool Parse(const void *p);
    void Pack(void *p);
};
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
@@ -65,13 +45,21 @@
    int num_ = 1;
};
class Msg {
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
BHMsg MakeReply(const void *data, const size_t size);
BHMsg MakeSub(const std::vector<std::string> &topics);
BHMsg MakeUnsub(const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
class MsgI {
private:
    offset_ptr<void> ptr_;
    offset_ptr<RefCount> count_;
    bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
public:
    Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
    void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
    MsgI(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
    void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
    template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
    // AddRef and Release works for both counted and not counted msg.
@@ -79,11 +67,15 @@
    int Release(SharedMemory &shm);
    int Count()  const{ return IsCounted() ? count_->Get() : 1; }
    bool IsCounted() const { return static_cast<bool>(count_); }
    bool Build(SharedMemory &shm, const MQId &src_id, const void *p, const size_t size, const bool refcount);
    bool Make(SharedMemory &shm, const BHMsg &msg);
    bool MakeRC(SharedMemory &shm, const BHMsg &msg);
    bool Unpack(BHMsg &msg) const;
};
inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
} // namespace bhome_msg
src/pubsub.cpp
@@ -44,7 +44,7 @@
        while (this->run_) {
            std::this_thread::sleep_for(100ms);
            BusManager &self = *this;
            Msg msg;
            BHMsg msg;
            const int timeout_ms = 100;
            if (!self.busq_.Recv(msg, timeout_ms)) {
                continue;
@@ -59,12 +59,13 @@
    for (int i = 0; i < n; ++i) {
        workers_.emplace_back(Worker);
    }
    return true;
}
bool BusManager::Stop()
{
    std::lock_guard<std::mutex> guard(mutex_);
    StopNoLock();
    return StopNoLock();
}
bool BusManager::StopNoLock()
@@ -75,7 +76,9 @@
                w.join();
            }
        }
        return true;
    }    
    return false;
}
} // namespace bhome_shm
src/shm_queue.cpp
@@ -15,6 +15,7 @@
 *
 * =====================================================================================
 */
#include "shm_queue.h"
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_generators.hpp>
@@ -47,10 +48,10 @@
ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len):
Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
id_(id)
{
}
{}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):ShmMsgQueue(NewId(), segment, len)
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):
ShmMsgQueue(NewId(), segment, len)
{}
ShmMsgQueue::~ShmMsgQueue()
@@ -58,21 +59,16 @@
    Remove();
}
bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
    Queue *remote = find(MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [](const Msg&msg){msg.AddRef();});
    return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
}
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
{
    // Test shows that in the 2 cases:
    // 1) build msg first, then find remote queue;
    // 2) find remote queue first, then build msg;
    // 1 is about 50% faster than 2, maybe cache related.
    Msg msg;
    if(msg.Build(shm(), Id(), data, size, false)) {
    MsgI msg;
    if(msg.Make(shm(), data)) {
        if(Send(remote_id, msg, timeout_ms)) {
            return true;
        } else {
@@ -82,30 +78,34 @@
    return false;
}
bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms)
/*
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
{
    Msg msg;
    if (Read(msg, timeout_ms)) {
        DEFER1(msg.Release(shm()););
    // Test shows that in the 2 cases:
    // 1) build msg first, then find remote queue;
    // 2) find remote queue first, then build msg;
    // 1 is about 50% faster than 2, maybe cache related.
        auto ptr = msg.get<char>();
        if (ptr) {
            MsgMetaV1 meta;
            meta.Parse(ptr);
            source_id = meta.src_id_;
            size = meta.data_size_;
            data = malloc(size);
            if (data) {
                memcpy(data, ptr + meta.self_size_, size);
                return true;
            }
    MsgI msg;
    if(msg.BuildRequest(shm(), Id(), data, size)) {
        if(Send(remote_id, msg, timeout_ms)) {
            return true;
        } else {
            msg.Release(shm());
        }
    }
    source_id = EmptyId();
    data = 0;
    size = 0;
    return false;
}
//*/
bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
{
    MsgI imsg;
    if (Read(imsg, timeout_ms)) {
        DEFER1(imsg.Release(shm()););
        return imsg.Unpack(msg);
    } else {
        return false;
    }
}
} // namespace bhome_shm
src/shm_queue.h
@@ -102,12 +102,12 @@
using namespace bhome_msg;
class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
class ShmMsgQueue : private ShmObject<SharedQueue<MsgI> >
{
    typedef ShmObject<SharedQueue<Msg> > Super;
    typedef ShmObject<SharedQueue<MsgI> > Super;
    typedef Super::Data Queue;
    bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
    bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
    bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
    bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
    MQId id_;
protected:
    ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
@@ -115,11 +115,11 @@
    ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms);
    bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
    // bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); // request
    bool Recv(BHMsg &msg, const int timeout_ms);
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
    const MQId &Id() const { return id_; }
    bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
    bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms);
};
} // namespace bhome_shm
utest/utest.cpp
@@ -178,10 +178,8 @@
        int ms = i * 100;
        printf("Timeout Test %4d: ", ms);
        boost::timer::auto_cpu_timer timer;
        MQId id;
        void *data;
        size_t size;
        bool r = q.Recv(id, data, size, ms);
        BHMsg msg;
        bool r = q.Recv(msg, ms);
        BOOST_CHECK(!r);
    }
}
@@ -192,10 +190,10 @@
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024*1024);
    Msg m0(shm.Alloc(1000), shm.New<RefCount>());
    MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
    BOOST_CHECK(m0.IsCounted());
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    Msg m1 = m0;
    MsgI m1 = m0;
    BOOST_CHECK(m1.IsCounted());
    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
@@ -205,25 +203,7 @@
    BOOST_CHECK(!m1.IsCounted());
}
BOOST_AUTO_TEST_CASE(MsgHeaderTest)
{
    MsgMetaV1 head;
    BOOST_CHECK_EQUAL(head.self_size_, sizeof(head));
    BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
    BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
    BOOST_CHECK_EQUAL(head.data_size_, 0);
    BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid());
    head.data_size_ = 100;
    head.src_id_ = boost::uuids::random_generator()();
    head.type_ = 123;
    char buf[100] = {0};
    head.Pack(buf);
    MsgMetaV1 result;
    result.Parse(buf);
    BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
}
BOOST_AUTO_TEST_CASE(SpeedTest)
{
@@ -238,10 +218,10 @@
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(shm, 64);
        std::string str(data_size, 'a');
        Msg msg;
        MsgI msg;
        DEFER1(msg.Release(shm););
        msg.Build(shm, mq.Id(), str.data(), str.size(), true);
        for (int i = 0; i < n; ++i) {
        msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
        for (uint64_t i = 0; i < n; ++i) {
            // mq.Send(id, str.data(), str.size(), timeout);
            mq.Send(id, msg, timeout);
        }
@@ -250,16 +230,9 @@
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(id, shm, 1000);
        while(*run) {
            Msg msg;
            BHMsg msg;
            if (mq.Recv(msg, timeout)) {
                MsgMetaV1 header;
                if (!header.Parse(msg.get())) {
                    BOOST_CHECK(false);
                }
                if (header.data_size_ != data_size) {
                    BOOST_CHECK(false);
                }
                msg.Release(shm);
                // ok
            } else if (isfork) {
                exit(0); // for forked quit after 1s.
            }
@@ -332,8 +305,10 @@
    ShmMsgQueue srv(shm, qlen);
    ShmMsgQueue cli(shm, qlen);
    Msg ref_counted_msg;
    ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
    MsgI request_rc;
    request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
    MsgI reply_rc;
    reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
    std::atomic<uint64_t> count(0);
@@ -342,29 +317,25 @@
    auto Client = [&](int cli_id, int nmsg){
        for (int i = 0; i < nmsg; ++i) {
            auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
            auto Req = [&]() {
                return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
            };
            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
            if (!SendRefCounted()) {
            if (!ReqRC()) {
                printf("********** client send error.\n");
                continue;
            }
            MQId id;
            void *data = 0;
            size_t size = 0;
            if (!cli.Recv(id, data, size, 1000)) {
            BHMsg msg;
            if (!cli.Recv(msg, 1000)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(free(data));
                if(size != msg_length) {
                    BOOST_CHECK(false);
                }
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) != cur) {
                    std::cout << "time: " << Now();
                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
                    last_time = cur;
                }
@@ -374,19 +345,18 @@
    std::atomic<bool> stop(false);
    auto Server = [&](){
        void *data = 0;
        size_t size = 0;
        MQId src_id;
        BHMsg req;
        while (!stop) {
            if (srv.Recv(src_id, data, size, 100)) {
                DEFER1(free(data));
                auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
                auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
                auto &mqid = req.route()[0].mq_id();
                MQId src_id;
                memcpy(&src_id, mqid.data(), sizeof(src_id));
                auto Reply = [&]() {
                    return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
                };
                auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
                if (SendRefCounted()) {
                    if (size != msg_content.size()) {
                        BOOST_TEST(false, "server msg size error");
                    }
                if (ReplyRC()) {
                }
            }
        }
@@ -405,10 +375,10 @@
    printf("request ok: %ld\n", count.load());
    stop = true;
    servers.WaitAll();
    BOOST_CHECK(ref_counted_msg.IsCounted());
    BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
    ref_counted_msg.Release(shm);
    BOOST_CHECK(!ref_counted_msg.IsCounted());
    BOOST_CHECK(request_rc.IsCounted());
    BOOST_CHECK_EQUAL(request_rc.Count(), 1);
    request_rc.Release(shm);
    BOOST_CHECK(!request_rc.IsCounted());
    // BOOST_CHECK_THROW(reply.Count(), int);
}