From 6f9521a6dca494a9f9644d1ccacdee23744dc0e5 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 24 三月 2021 18:34:18 +0800 Subject: [PATCH] add msg meta or header, queue pointer only. --- src/shm.h | 23 ++--- src/msg.h | 57 ++++++++++++++ utest/utest.cpp | 34 ++++++- src/shm.cpp | 37 +++++--- src/msg.cpp | 35 ++++++++ 5 files changed, 151 insertions(+), 35 deletions(-) diff --git a/src/msg.cpp b/src/msg.cpp new file mode 100644 index 0000000..e8c6d26 --- /dev/null +++ b/src/msg.cpp @@ -0,0 +1,35 @@ +/* + * ===================================================================================== + * + * Filename: msg.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�24鏃� 16鏃�48鍒�42绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "msg.h" + +namespace bhome_shm { + + +bool MsgMetaV1::Parse(const void *p) +{ + assert(p); + *this = *static_cast<const MsgMetaV1*>(p); + return tag_ == kMsgMetaTag; +} + +void MsgMetaV1::Pack(void *p) +{ + *static_cast<MsgMetaV1*>(p) = *this; +} + +} // namespace bhome_shm diff --git a/src/msg.h b/src/msg.h new file mode 100644 index 0000000..8fbb0c5 --- /dev/null +++ b/src/msg.h @@ -0,0 +1,57 @@ +/* + * ===================================================================================== + * + * Filename: msg.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�24鏃� 16鏃�49鍒�20绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#ifndef MSG_5BILLZET +#define MSG_5BILLZET + +#include <stdint.h> +#include <boost/interprocess/offset_ptr.hpp> + +namespace bhome_shm { + +using namespace boost::interprocess; + +// safe to be stored in shared memory. +// message format: header(meta) + body(data). +enum MsgType { + kMsgTypeNull = 0, + kMsgTypeNormal = 1, + kMsgTypeMaxValue +}; + +const uint32_t kMsgMetaTag = 0xf1e2d3c4; + +struct MsgMetaV1 { + uint16_t self_size_ = sizeof(MsgMetaV1); // sizeof(*this) + uint16_t type_ = kMsgTypeNormal; // msg type. + uint32_t tag_ = kMsgMetaTag; + uint32_t data_size_ = 0; + unsigned char src_id_[16] = {0}; + // more fields add at end, must not change + + MsgMetaV1(){} + bool Parse(const void *p); + void Pack(void *p); +}; + +typedef offset_ptr<void> Msg; + +} // namespace bhome_shm + + + +#endif // end of include guard: MSG_5BILLZET diff --git a/src/shm.cpp b/src/shm.cpp index b89acef..d271ef5 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -10,11 +10,12 @@ * Revision: none * Compiler: gcc * - * Author: YOUR NAME (), + * Author: Li Chao (), * Organization: * * ===================================================================================== */ + #include "shm.h" #include "bh_util.h" #include <mutex> @@ -31,29 +32,35 @@ MQId NewId() { return random_generator()(); } } -ShmMsgQueue::ShmMsgQueue(MQId id, ShmType &segment, const uint32_t len): +ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len): SharedQueue(segment, MsgQIdToName(id), id, len, segment.get_segment_manager()) { printf("queue size: %ld cap: %ld\n", data()->size(), data()->capacity()); } -ShmMsgQueue::ShmMsgQueue(ShmType &segment, const uint32_t len):ShmMsgQueue(NewId(), segment, len) {} + +ShmMsgQueue::ShmMsgQueue(ShmType &segment, const uint32_t len):ShmMsgQueue(NewId(), segment, len) +{} ShmMsgQueue::~ShmMsgQueue() { Remove(); } -bool ShmMsgQueue::Send(MQId remote_id, const void *data, const size_t size, const int timeout_ms) +bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms) { if (data && size) { Queue *remote = find(MsgQIdToName(remote_id)); if (remote) { - void *p = shm().allocate(size, std::nothrow); + void *p = shm().allocate(sizeof(MsgMetaV1) + size, std::nothrow); bool r = false; if (p) { - Msg buf = { Id(), p, size}; - memcpy(p, data, size); - if (remote->Write(buf, timeout_ms)) { + MsgMetaV1 meta; + meta.data_size_ = size; + memcpy(meta.src_id_, &Id(), sizeof(MQId)); + meta.Pack(p); + + memcpy(static_cast<char*>(p) + sizeof(meta), data, size); + if (remote->Write(p, timeout_ms)) { return true; } else { shm().deallocate(p); @@ -66,13 +73,15 @@ bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms) { - Msg buf; - if (Read(buf, timeout_ms) && buf.size_ > 0) { - DEFER1(shm().deallocate(buf.data_.get());); - source_id = buf.src_; - size = buf.size_; + Msg msg; + if (Read(msg, timeout_ms) && msg) { + DEFER1(shm().deallocate(msg.get());); + MsgMetaV1 meta; + meta.Parse(msg.get()); + memcpy(&source_id, meta.src_id_, sizeof(MQId)); + size = meta.data_size_; if (data = malloc(size)) { - memcpy(data, buf.data_.get(), size); + memcpy(data, static_cast<char*>(msg.get()) + meta.self_size_, size); return true; } } diff --git a/src/shm.h b/src/shm.h index 0ba4e8c..5aa4a48 100644 --- a/src/shm.h +++ b/src/shm.h @@ -10,7 +10,7 @@ * Revision: none * Compiler: gcc * - * Author: LiChao (), + * Author: Li Chao (), * Organization: * * ===================================================================================== @@ -26,6 +26,7 @@ #include <boost/noncopyable.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/uuid/uuid.hpp> +#include "msg.h" namespace bhome_shm { @@ -101,6 +102,7 @@ template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >; typedef boost::uuids::uuid MQId; + template <class D> class SyncedQueue : private Circular<D> { @@ -119,10 +121,11 @@ } public: - template <class...T> SyncedQueue(MQId id, T&&...t):Super(t...), id_(id) {} + // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {} + SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {} using Super::size; using Super::capacity; - MQId Id() const { return id_; } + const MQId &Id() const { return id_; } bool Write(D buf, const int timeout_ms) { Guard lock(mutex()); if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) { @@ -147,13 +150,6 @@ } }; -// safe to be stored in shared memory. -struct Msg { - MQId src_; - offset_ptr<void> data_; - size_t size_; -}; - class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> > { typedef ShmObject<SyncedQueue<Msg> > SharedQueue; @@ -161,13 +157,12 @@ bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } public: - ShmMsgQueue(MQId id, ShmType &segment, const uint32_t len); + ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len); ShmMsgQueue(ShmType &segment, const uint32_t len); ~ShmMsgQueue(); - bool Send(MQId remote_id, const void *data, const size_t size, const int timeout_ms); + bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms); - using SharedQueue::Remove; - MQId Id() const { return data()->Id(); } + const MQId &Id() const { return data()->Id(); } }; } // namespace bhome_shm diff --git a/utest/utest.cpp b/utest/utest.cpp index c128d37..38d924f 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,5 +1,5 @@ #include <stdio.h> -#include "shm.h" +#include "../src/shm.h" #include "../src/bh_util.h" #include <string> #include <vector> @@ -8,9 +8,10 @@ #include <atomic> #include <boost/noncopyable.hpp> #include <boost/timer/timer.hpp> -#include <boost/test/auto_unit_test.hpp> +#include <boost/test/unit_test.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/microsec_time_clock.hpp> +#include <boost/uuid/uuid_generators.hpp> using namespace std::chrono_literals; using namespace bhome_shm; @@ -122,6 +123,29 @@ } } +BOOST_AUTO_TEST_CASE(MsgHeader) +{ + MsgMetaV1 head; + BOOST_CHECK_EQUAL(head.self_size_, sizeof(head)); + BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal); + BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag); + BOOST_CHECK_EQUAL(head.data_size_, 0); + BOOST_CHECK_EQUAL(head.src_id_[5], 0); + + head.data_size_ = 100; + auto rand_id = boost::uuids::random_generator()(); + memcpy(head.src_id_, &rand_id, sizeof(rand_id)); + head.type_ = 123; + + BOOST_CHECK_EQUAL(sizeof(head.src_id_), sizeof(rand_id)); + + char buf[100] = {0}; + head.Pack(buf); + MsgMetaV1 result; + result.Parse(buf); + BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0); + +} BOOST_AUTO_TEST_CASE(RequestReply) { const std::string shm_name("ShmReqRep"); @@ -129,6 +153,7 @@ SharedMemory shm(shm_name, 1024*1024*50); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); + // DEFER1(BOOST_CHECK_EQUAL(init_avail, Avail()); printf("Request Reply Test shm No Leak.\n");); auto f0 = init_avail; const int qlen = 64; @@ -203,11 +228,6 @@ printf("request ok: %ld\n", count.load()); stop = true; servers.WaitAll(); - - srv.Remove(); - cli.Remove(); - BOOST_CHECK_EQUAL(init_avail, Avail()); - printf("Request Reply Test shm No Leak.\n"); } int test_main(int argc, char *argv[]) -- Gitblit v1.8.0