lichao
2021-03-24 6f9521a6dca494a9f9644d1ccacdee23744dc0e5
add msg meta or header, queue pointer only.
2个文件已添加
3个文件已修改
186 ■■■■ 已修改文件
src/msg.cpp 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.cpp 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp
New file
@@ -0,0 +1,35 @@
/*
 * =====================================================================================
 *
 *       Filename:  msg.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月24日 16时48分42秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "msg.h"
namespace bhome_shm {
bool MsgMetaV1::Parse(const void *p)
{
    assert(p);
    *this = *static_cast<const MsgMetaV1*>(p);
    return tag_ == kMsgMetaTag;
}
void MsgMetaV1::Pack(void *p)
{
    *static_cast<MsgMetaV1*>(p) = *this;
}
} // namespace bhome_shm
src/msg.h
New file
@@ -0,0 +1,57 @@
/*
 * =====================================================================================
 *
 *       Filename:  msg.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月24日 16时49分20秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef MSG_5BILLZET
#define MSG_5BILLZET
#include <stdint.h>
#include <boost/interprocess/offset_ptr.hpp>
namespace bhome_shm {
using namespace boost::interprocess;
// safe to be stored in shared memory.
// message format: header(meta) + body(data).
enum MsgType {
    kMsgTypeNull = 0,
    kMsgTypeNormal = 1,
    kMsgTypeMaxValue
};
const uint32_t kMsgMetaTag = 0xf1e2d3c4;
struct MsgMetaV1 {
    uint16_t self_size_ = sizeof(MsgMetaV1); // sizeof(*this)
    uint16_t type_ = kMsgTypeNormal; // msg type.
    uint32_t tag_ = kMsgMetaTag;
    uint32_t data_size_ = 0;
    unsigned char src_id_[16] = {0};
    // more fields add at end, must not change
    MsgMetaV1(){}
    bool Parse(const void *p);
    void Pack(void *p);
};
typedef offset_ptr<void> Msg;
} // namespace bhome_shm
#endif // end of include guard: MSG_5BILLZET
src/shm.cpp
@@ -10,11 +10,12 @@
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  YOUR NAME (),
 *         Author:  Li Chao (),
 *   Organization:  
 *
 * =====================================================================================
 */
#include "shm.h"
#include "bh_util.h"
#include <mutex>
@@ -31,29 +32,35 @@
MQId NewId() { return random_generator()(); }
}
ShmMsgQueue::ShmMsgQueue(MQId id, ShmType &segment, const uint32_t len):
ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len):
SharedQueue(segment, MsgQIdToName(id), id, len, segment.get_segment_manager())
{
    printf("queue size: %ld cap: %ld\n", data()->size(), data()->capacity());
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const uint32_t len):ShmMsgQueue(NewId(), segment, len) {}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const uint32_t len):ShmMsgQueue(NewId(), segment, len)
{}
ShmMsgQueue::~ShmMsgQueue()
{
    Remove();
}
bool ShmMsgQueue::Send(MQId remote_id, const void *data, const size_t size, const int timeout_ms)
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
{
    if (data && size) {
        Queue *remote = find(MsgQIdToName(remote_id));
        if (remote) {
            void *p = shm().allocate(size, std::nothrow);
            void *p = shm().allocate(sizeof(MsgMetaV1) + size, std::nothrow);
            bool r = false;
            if (p) {
                Msg buf = { Id(), p, size};
                memcpy(p, data, size);
                if (remote->Write(buf, timeout_ms)) {
                MsgMetaV1 meta;
                meta.data_size_ = size;
                memcpy(meta.src_id_, &Id(), sizeof(MQId));
                meta.Pack(p);
                memcpy(static_cast<char*>(p) + sizeof(meta), data, size);
                if (remote->Write(p, timeout_ms)) {
                    return true;
                } else {
                    shm().deallocate(p);
@@ -66,13 +73,15 @@
bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms)
{
    Msg buf;
    if (Read(buf, timeout_ms) && buf.size_ > 0) {
        DEFER1(shm().deallocate(buf.data_.get()););
        source_id = buf.src_;
        size = buf.size_;
    Msg msg;
    if (Read(msg, timeout_ms) && msg) {
        DEFER1(shm().deallocate(msg.get()););
        MsgMetaV1 meta;
        meta.Parse(msg.get());
        memcpy(&source_id, meta.src_id_, sizeof(MQId));
        size = meta.data_size_;
        if (data = malloc(size)) {
            memcpy(data, buf.data_.get(), size);
            memcpy(data, static_cast<char*>(msg.get()) + meta.self_size_, size);
            return true;
        }
    }
src/shm.h
@@ -10,7 +10,7 @@
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  LiChao (),
 *         Author:  Li Chao (),
 *   Organization:  
 *
 * =====================================================================================
@@ -26,6 +26,7 @@
#include <boost/noncopyable.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/uuid/uuid.hpp>
#include "msg.h"
namespace bhome_shm {
@@ -101,6 +102,7 @@
template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >;
typedef boost::uuids::uuid MQId;
template <class D>
class SyncedQueue : private Circular<D>
{
@@ -119,10 +121,11 @@
    }
public:
    template <class...T> SyncedQueue(MQId id, T&&...t):Super(t...), id_(id) {}
    // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {}
    SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {}
    using Super::size;
    using Super::capacity;
    MQId Id() const { return id_; }
    const MQId &Id() const { return id_; }
    bool Write(D buf, const int timeout_ms) {
        Guard lock(mutex());
        if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
@@ -147,13 +150,6 @@
    }
};
// safe to be stored in shared memory.
struct Msg {
    MQId src_;
    offset_ptr<void> data_;
    size_t size_;
};
class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> >
{
    typedef ShmObject<SyncedQueue<Msg> > SharedQueue;
@@ -161,13 +157,12 @@
    bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
    bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
public:
    ShmMsgQueue(MQId id, ShmType &segment, const uint32_t len);
    ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len);
    ShmMsgQueue(ShmType &segment, const uint32_t len);
    ~ShmMsgQueue();
    bool Send(MQId remote_id, const void *data, const size_t size, const int timeout_ms);
    bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms);
    bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
    using SharedQueue::Remove;
    MQId Id() const { return data()->Id(); }
    const MQId &Id() const { return data()->Id(); }
};
} // namespace bhome_shm
utest/utest.cpp
@@ -1,5 +1,5 @@
#include <stdio.h>
#include "shm.h"
#include "../src/shm.h"
#include "../src/bh_util.h"
#include <string>
#include <vector>
@@ -8,9 +8,10 @@
#include <atomic>
#include <boost/noncopyable.hpp>
#include <boost/timer/timer.hpp>
#include <boost/test/auto_unit_test.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/microsec_time_clock.hpp>
#include <boost/uuid/uuid_generators.hpp>
using namespace std::chrono_literals;
using namespace bhome_shm;
@@ -122,6 +123,29 @@
    }
}
BOOST_AUTO_TEST_CASE(MsgHeader)
{
    MsgMetaV1 head;
    BOOST_CHECK_EQUAL(head.self_size_, sizeof(head));
    BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
    BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
    BOOST_CHECK_EQUAL(head.data_size_, 0);
    BOOST_CHECK_EQUAL(head.src_id_[5], 0);
    head.data_size_ = 100;
    auto rand_id = boost::uuids::random_generator()();
    memcpy(head.src_id_, &rand_id, sizeof(rand_id));
    head.type_ = 123;
    BOOST_CHECK_EQUAL(sizeof(head.src_id_), sizeof(rand_id));
    char buf[100] = {0};
    head.Pack(buf);
    MsgMetaV1 result;
    result.Parse(buf);
    BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
}
BOOST_AUTO_TEST_CASE(RequestReply)
{
    const std::string shm_name("ShmReqRep");
@@ -129,6 +153,7 @@
    SharedMemory shm(shm_name, 1024*1024*50);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n"););
    auto f0 = init_avail;
    const int qlen = 64;
@@ -203,11 +228,6 @@
    printf("request ok: %ld\n", count.load());
    stop = true;
    servers.WaitAll();
    srv.Remove();
    cli.Remove();
    BOOST_CHECK_EQUAL(init_avail, Avail());
    printf("Request Reply Test shm No Leak.\n");
}
int test_main(int argc, char *argv[])