From a22dd242713636fad33ee5965fe0900a425ce50d Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 25 三月 2021 11:19:15 +0800 Subject: [PATCH] test msg ref count, refactor. --- src/shm.h | 89 +-------- src/msg.h | 45 ++++ src/pubsub.h | 29 +++ src/shm_queue.h | 97 ++++++++++ src/pubsub.cpp | 24 ++ utest/utest.cpp | 39 +++ src/shm_queue.cpp | 98 ++++++++++ src/shm.cpp | 71 ------- 8 files changed, 332 insertions(+), 160 deletions(-) diff --git a/src/msg.h b/src/msg.h index 8fbb0c5..214bb72 100644 --- a/src/msg.h +++ b/src/msg.h @@ -19,13 +19,12 @@ #define MSG_5BILLZET #include <stdint.h> +#include "shm.h" #include <boost/interprocess/offset_ptr.hpp> namespace bhome_shm { -using namespace boost::interprocess; - -// safe to be stored in shared memory. +// msg is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). enum MsgType { kMsgTypeNull = 0, @@ -48,7 +47,45 @@ void Pack(void *p); }; -typedef offset_ptr<void> Msg; +// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. +class RefCount : private boost::noncopyable +{ +public: + int Inc() { Guard lk(mutex_); return ++num_; } + int Dec() { Guard lk(mutex_); return --num_; } + int Get() { Guard lk(mutex_); return num_; } +private: + Mutex mutex_; + int num_ = 0; +}; + +class Msg { +private: + offset_ptr<void> ptr_; + offset_ptr<RefCount> count_; + void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); } +public: + class CountGuard : private boost::noncopyable { + Msg &msg_; + public: + CountGuard(Msg &msg) : msg_(msg) { msg_.AddRef(); } + ~CountGuard() { msg_.RemoveRef(); } + }; + + Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {} + + // ~Msg() { RemoveRef(); } + // Msg(const Msg &a):ptr_(a.ptr_), count_(a.count_) { AddRef(); } + // Msg(Msg &&a):ptr_(a.ptr_), count_(a.count_) { a.ptr_ = 0; a.count_ = 0; } + // Msg & operator = (const Msg &a) { Msg(a).swap(*this); } + // Msg & operator = (Msg &&a) { Msg(std::move(a)).swap(*this); } + + template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); } + int AddRef() { return count_ ? count_->Inc() : 0; } + int RemoveRef() { return count_ ? count_->Dec() : 0; } + int Count() { return count_ ? count_->Get() : 0; } +}; + } // namespace bhome_shm diff --git a/src/pubsub.cpp b/src/pubsub.cpp new file mode 100644 index 0000000..b592113 --- /dev/null +++ b/src/pubsub.cpp @@ -0,0 +1,24 @@ +/* + * ===================================================================================== + * + * Filename: pubsub.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�24鏃� 18鏃�44鍒�13绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "pubsub.h" + +namespace bhome_shm { + + +} // namespace bhome_shm + diff --git a/src/pubsub.h b/src/pubsub.h new file mode 100644 index 0000000..0d7f4f0 --- /dev/null +++ b/src/pubsub.h @@ -0,0 +1,29 @@ +/* + * ===================================================================================== + * + * Filename: pubsub.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�24鏃� 18鏃�44鍒�36绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#ifndef PUBSUB_4KGRA997 +#define PUBSUB_4KGRA997 + +#include "shm.h" + +namespace bhome_shm { + +bool Subscribe(const std::string &topic); + +} // namespace bhome_shm + +#endif // end of include guard: PUBSUB_4KGRA997 diff --git a/src/shm.cpp b/src/shm.cpp index d271ef5..fd0f838 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -17,78 +17,7 @@ */ #include "shm.h" -#include "bh_util.h" -#include <mutex> -#include <boost/uuid/uuid_io.hpp> -#include <boost/uuid/uuid_generators.hpp> namespace bhome_shm { -using namespace boost::interprocess; -using namespace boost::uuids; - -namespace { -std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); } -MQId EmptyId() { return nil_uuid(); } -MQId NewId() { return random_generator()(); } -} - -ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len): -SharedQueue(segment, MsgQIdToName(id), id, len, segment.get_segment_manager()) -{ - printf("queue size: %ld cap: %ld\n", data()->size(), data()->capacity()); -} - -ShmMsgQueue::ShmMsgQueue(ShmType &segment, const uint32_t len):ShmMsgQueue(NewId(), segment, len) -{} - -ShmMsgQueue::~ShmMsgQueue() -{ - Remove(); -} - -bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms) -{ - if (data && size) { - Queue *remote = find(MsgQIdToName(remote_id)); - if (remote) { - void *p = shm().allocate(sizeof(MsgMetaV1) + size, std::nothrow); - bool r = false; - if (p) { - MsgMetaV1 meta; - meta.data_size_ = size; - memcpy(meta.src_id_, &Id(), sizeof(MQId)); - meta.Pack(p); - - memcpy(static_cast<char*>(p) + sizeof(meta), data, size); - if (remote->Write(p, timeout_ms)) { - return true; - } else { - shm().deallocate(p); - } - } - } - } - return false; -} - -bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms) -{ - Msg msg; - if (Read(msg, timeout_ms) && msg) { - DEFER1(shm().deallocate(msg.get());); - MsgMetaV1 meta; - meta.Parse(msg.get()); - memcpy(&source_id, meta.src_id_, sizeof(MQId)); - size = meta.data_size_; - if (data = malloc(size)) { - memcpy(data, static_cast<char*>(msg.get()) + meta.self_size_, size); - return true; - } - } - source_id = EmptyId(); - data = 0; - size = 0; - return false; -} } // namespace bhome_shm diff --git a/src/shm.h b/src/shm.h index 5aa4a48..91a339d 100644 --- a/src/shm.h +++ b/src/shm.h @@ -19,18 +19,17 @@ #ifndef SHM_6CHO6D6C #define SHM_6CHO6D6C -#include <boost/interprocess/managed_shared_memory.hpp> -#include <boost/interprocess/sync/interprocess_condition.hpp> -#include <boost/interprocess/containers/string.hpp> -#include <boost/circular_buffer.hpp> #include <boost/noncopyable.hpp> -#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/uuid/uuid.hpp> -#include "msg.h" +#include <boost/interprocess/managed_shared_memory.hpp> +#include <boost/interprocess/sync/interprocess_mutex.hpp> +#include <boost/interprocess/sync/interprocess_condition.hpp> +#include <boost/interprocess/sync/scoped_lock.hpp> namespace bhome_shm { using namespace boost::interprocess; + typedef managed_shared_memory mshm_t; typedef interprocess_mutex Mutex; typedef scoped_lock<Mutex> Guard; @@ -55,7 +54,7 @@ {} std::string name() const { return name_; } bool Remove() { return Remove(name()); } - template <class T, class ...Params> T * New(Params const&...params) { return construct<T, std::nothrow>(anonymous_instance)(params...); } + template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); } template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; } template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); } @@ -65,7 +64,7 @@ // works like a smart pointer of an object in shared memory. template <class T> class ShmObject : private boost::noncopyable { - static std::string DataName(const std::string &name) { return "dat" + name; } + static std::string ObjName(const std::string &name) { return "obj" + name; } protected: typedef T Data; typedef SharedMemory ShmType; @@ -82,88 +81,22 @@ ShmObject(ShmType &segment, const std::string &name, Params&&...t): shm_(segment), name_(name) { - pdata_ = shm_.find_or_construct<Data>(DataName(name_).c_str())(t...); + pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...); if (!IsOk()) { throw("shm error: " + name_); } } - Data *find(const std::string &name) { return shm_.find<Data>(DataName(name).c_str()).first; } + Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; } virtual ~ShmObject() {} std::string name() const { return name_; } Data* data() { return pdata_; } const Data* data() const { return pdata_; } Data* operator->() { return data(); } const Data* operator->() const { return data(); } - virtual bool Remove() { return shm_.destroy<Data>(DataName(name_).c_str()); } + virtual bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); } }; -template <class D> using Allocator = allocator<D, mshm_t::segment_manager>; - -template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >; - -typedef boost::uuids::uuid MQId; - -template <class D> -class SyncedQueue : private Circular<D> -{ - typedef Circular<D> Super; - Mutex mutex_; - Cond cond_read_; - Cond cond_write_; - Mutex & mutex() { return mutex_; } - const MQId id_; - - static boost::posix_time::ptime MSFromNow(const int ms) - { - using namespace boost::posix_time; - ptime cur = boost::posix_time::microsec_clock::universal_time(); - return cur + millisec(ms); - } - -public: - // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {} - SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {} - using Super::size; - using Super::capacity; - const MQId &Id() const { return id_; } - bool Write(D buf, const int timeout_ms) { - Guard lock(mutex()); - if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) { - this->push_back(buf); - cond_read_.notify_one(); - return true; - } else { - return false; - } - } - - bool Read(D &buf, const int timeout_ms){ - Guard lock(mutex()); - if (cond_read_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->empty(); })) { - buf = this->front(); - this->pop_front(); - cond_write_.notify_one(); - return true; - } else { - return false; - } - } -}; - -class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> > -{ - typedef ShmObject<SyncedQueue<Msg> > SharedQueue; - typedef SharedQueue::Data Queue; - bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } - bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } -public: - ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len); - ShmMsgQueue(ShmType &segment, const uint32_t len); - ~ShmMsgQueue(); - bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); - bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms); - const MQId &Id() const { return data()->Id(); } -}; +template <class D> using Allocator = allocator<D, SharedMemory::segment_manager>; } // namespace bhome_shm diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp new file mode 100644 index 0000000..263fd94 --- /dev/null +++ b/src/shm_queue.cpp @@ -0,0 +1,98 @@ +/* + * ===================================================================================== + * + * Filename: shm_queue.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�25鏃� 10鏃�34鍒�42绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "shm_queue.h" +#include <boost/uuid/uuid_io.hpp> +#include <boost/uuid/uuid_generators.hpp> +#include "bh_util.h" + +namespace bhome_shm { + +using namespace boost::interprocess; +using namespace boost::uuids; + +namespace { +std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); } +MQId EmptyId() { return nil_uuid(); } +MQId NewId() { return random_generator()(); } +} + +ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len): +SharedQueue(segment, MsgQIdToName(id), id, len, segment.get_segment_manager()) +{ + printf("queue size: %ld cap: %ld\n", data()->size(), data()->capacity()); +} + +ShmMsgQueue::ShmMsgQueue(ShmType &segment, const uint32_t len):ShmMsgQueue(NewId(), segment, len) +{} + +ShmMsgQueue::~ShmMsgQueue() +{ + Remove(); +} + +bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms) +{ + if (data && size) { + Queue *remote = find(MsgQIdToName(remote_id)); + if (remote) { + void *p = shm().allocate(sizeof(MsgMetaV1) + size, std::nothrow); + bool r = false; + if (p) { + MsgMetaV1 meta; + meta.data_size_ = size; + memcpy(meta.src_id_, &Id(), sizeof(MQId)); + meta.Pack(p); + + memcpy(static_cast<char*>(p) + sizeof(meta), data, size); + if (remote->Write(p, timeout_ms)) { + return true; + } else { + shm().deallocate(p); + } + } + } + } + return false; +} + +bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms) +{ + Msg msg; + if (Read(msg, timeout_ms)) { + auto ptr = msg.get<char>(); + if (ptr) { + DEFER1(shm().deallocate(ptr);); + MsgMetaV1 meta; + meta.Parse(ptr); + memcpy(&source_id, meta.src_id_, sizeof(MQId)); + size = meta.data_size_; + data = malloc(size); + if (data) { + memcpy(data, ptr + meta.self_size_, size); + return true; + } + } + } + source_id = EmptyId(); + data = 0; + size = 0; + return false; +} + +} // namespace bhome_shm + diff --git a/src/shm_queue.h b/src/shm_queue.h new file mode 100644 index 0000000..a98fcf4 --- /dev/null +++ b/src/shm_queue.h @@ -0,0 +1,97 @@ +/* + * ===================================================================================== + * + * Filename: shm_queue.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�25鏃� 10鏃�35鍒�09绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ + +#ifndef SHM_QUEUE_JE0OEUP3 +#define SHM_QUEUE_JE0OEUP3 + +#include "shm.h" +#include "msg.h" +#include <boost/circular_buffer.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> + +namespace bhome_shm { + +template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >; + +typedef boost::uuids::uuid MQId; + +template <class D> +class SyncedQueue : private Circular<D> +{ + typedef Circular<D> Super; + Mutex mutex_; + Cond cond_read_; + Cond cond_write_; + Mutex & mutex() { return mutex_; } + const MQId id_; + + static boost::posix_time::ptime MSFromNow(const int ms) + { + using namespace boost::posix_time; + ptime cur = boost::posix_time::microsec_clock::universal_time(); + return cur + millisec(ms); + } + +public: + // template <class...T> SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {} + SyncedQueue(const MQId &id, const uint32_t len, Allocator<D> const& alloc):Super(len, alloc), id_(id) {} + using Super::size; + using Super::capacity; + const MQId &Id() const { return id_; } + bool Write(D buf, const int timeout_ms) { + Guard lock(mutex()); + if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) { + this->push_back(buf); + cond_read_.notify_one(); + return true; + } else { + return false; + } + } + + bool Read(D &buf, const int timeout_ms){ + Guard lock(mutex()); + if (cond_read_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->empty(); })) { + buf = this->front(); + this->pop_front(); + cond_write_.notify_one(); + return true; + } else { + return false; + } + } +}; + +class ShmMsgQueue : private ShmObject<SyncedQueue<Msg> > +{ + typedef ShmObject<SyncedQueue<Msg> > SharedQueue; + typedef SharedQueue::Data Queue; + bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } + bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } +public: + ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len); + ShmMsgQueue(ShmType &segment, const uint32_t len); + ~ShmMsgQueue(); + bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); + bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms); + const MQId &Id() const { return data()->Id(); } +}; + +} // namespace bhome_shm + +#endif // end of include guard: SHM_QUEUE_JE0OEUP3 diff --git a/utest/utest.cpp b/utest/utest.cpp index 38d924f..57b0e9c 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,6 +1,4 @@ #include <stdio.h> -#include "../src/shm.h" -#include "../src/bh_util.h" #include <string> #include <vector> #include <thread> @@ -12,6 +10,8 @@ #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/microsec_time_clock.hpp> #include <boost/uuid/uuid_generators.hpp> +#include "shm_queue.h" +#include "bh_util.h" using namespace std::chrono_literals; using namespace bhome_shm; @@ -48,7 +48,7 @@ ~ShmRemover() { SharedMemory::Remove(name_); } }; -BOOST_AUTO_TEST_CASE(ShmBasic) +BOOST_AUTO_TEST_CASE(ShmBasicTest) { const std::string shm_name("basic"); ShmRemover auto_remove(shm_name); @@ -105,7 +105,7 @@ BOOST_CHECK_EQUAL(init_avail, Avail()); } -BOOST_AUTO_TEST_CASE(TimedWait) +BOOST_AUTO_TEST_CASE(TimedWaitTest) { const std::string shm_name("shm_wait"); ShmRemover auto_remove(shm_name); @@ -123,7 +123,32 @@ } } -BOOST_AUTO_TEST_CASE(MsgHeader) +BOOST_AUTO_TEST_CASE(RefCountTest) +{ + const std::string shm_name("ShmRefCount"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024*1024); + + Msg m0(shm.allocate(1000), shm.New<RefCount>()); + BOOST_CHECK_EQUAL(m0.AddRef(), 1); + Msg m1 = m0; + BOOST_CHECK_EQUAL(m1.AddRef(), 2); + BOOST_CHECK_EQUAL(m0.AddRef(), 3); + BOOST_CHECK_EQUAL(m0.RemoveRef(), 2); + BOOST_CHECK_EQUAL(m0.RemoveRef(), 1); + BOOST_CHECK_EQUAL(m1.RemoveRef(), 0); + { + Msg::CountGuard guard(m0); + BOOST_CHECK_EQUAL(m1.AddRef(), 2); + { + Msg::CountGuard guard(m0); + BOOST_CHECK_EQUAL(m1.RemoveRef(), 2); + } + } + BOOST_CHECK_EQUAL(m1.Count(), 0); +} + +BOOST_AUTO_TEST_CASE(MsgHeaderTest) { MsgMetaV1 head; BOOST_CHECK_EQUAL(head.self_size_, sizeof(head)); @@ -146,7 +171,7 @@ BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0); } -BOOST_AUTO_TEST_CASE(RequestReply) +BOOST_AUTO_TEST_CASE(RequestReplyTest) { const std::string shm_name("ShmReqRep"); ShmRemover auto_remove(shm_name); @@ -191,7 +216,7 @@ auto cur = Now(); if (last_time.exchange(cur) != cur) { std::cout << "time: " << Now(); - printf(", total msg:%10ld, speed:%8ld, used mem:%8ld\n", count.load(), count - last_count.exchange(count), init_avail - Avail()); + printf(", total msg:%10ld, speed:%8ld/s, used mem:%8ld\n", count.load(), count - last_count.exchange(count), init_avail - Avail()); last_time = cur; } -- Gitblit v1.8.0