From 02ba913dc7bb5d711471b27f2ea23a897d0f2e28 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 15:34:26 +0800 Subject: [PATCH] bind msgi to shm, change offset_ptr to abs offset. --- utest/simple_tests.cpp | 21 +- src/msg.h | 173 +++++++++++++++++------- src/socket.cpp | 4 utest/utest.cpp | 19 +- src/sendq.cpp | 8 src/topic_node.cpp | 16 +- src/msg.cpp | 70 ---------- box/center_main.cc | 1 utest/speed_test.cpp | 22 +- utest/api_test.cpp | 43 +++++ utest/util.h | 4 src/failed_msg.cpp | 2 src/bh_api.cpp | 1 13 files changed, 212 insertions(+), 172 deletions(-) diff --git a/box/center_main.cc b/box/center_main.cc index 1f181b8..7f4b26b 100644 --- a/box/center_main.cc +++ b/box/center_main.cc @@ -86,6 +86,7 @@ int center_main(int argc, const char *argv[]) { auto &shm = BHomeShm(); + MsgI::BindShm(shm); AppArg args(argc, argv); if (args.Has("remove")) { diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 8a4b947..f0ba26d 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -10,6 +10,7 @@ { TopicNode &ProcNode() { + static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm()); static TopicNode node(BHomeShm()); return node; } diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp index 0b3ee42..d8a3182 100644 --- a/src/failed_msg.cpp +++ b/src/failed_msg.cpp @@ -23,7 +23,7 @@ return [remote, msg](void *valid_sock) mutable { assert(valid_sock); ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); - DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release. + DEFER1(msg.Release()); // Release() is not const, but it's safe to release. return sock.Send(remote.data(), msg); }; } \ No newline at end of file diff --git a/src/msg.cpp b/src/msg.cpp index ba844da..7ab0434 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -20,76 +20,6 @@ namespace bhome_msg { -/*TODO change msg format, header has proc info; -reply has errer msg - center accept request and route.; -//*/ const uint32_t kMsgTag = 0xf1e2d3c4; - -void *MsgI::Alloc(SharedMemory &shm, const size_t size) -{ - void *p = shm.Alloc(sizeof(Meta) + size); - if (p) { - auto pmeta = new (p) Meta; - p = pmeta + 1; - } - return p; -} -void MsgI::Free(SharedMemory &shm) -{ - assert(valid()); - shm.Dealloc(meta()); - ptr_ = nullptr; - assert(!valid()); -} - -void *MsgI::Pack(SharedMemory &shm, - const uint32_t head_len, const ToArray &headToArray, - const uint32_t body_len, const ToArray &bodyToArray) -{ - void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len); - if (addr) { - auto p = static_cast<char *>(addr); - auto Pack1 = [&p](auto len, auto &writer) { - Put32(p, len); - p += sizeof(len); - writer(p, len); - p += len; - }; - Pack1(head_len, headToArray); - Pack1(body_len, bodyToArray); - } - return addr; -} - -bool MsgI::ParseHead(BHMsgHead &head) const -{ - auto p = get<char>(); - assert(p); - uint32_t msg_size = Get32(p); - p += 4; - return head.ParseFromArray(p, msg_size); -} - -bool MsgI::Make(SharedMemory &shm, void *p) -{ - if (!p) { - return false; - } - MsgI(p).swap(*this); - return true; -} - -int MsgI::Release(SharedMemory &shm) -{ - if (!valid()) { - return 0; - } - auto n = meta()->count_.Dec(); - if (n == 0) { - Free(shm); - } - return n; -} } // namespace bhome_msg diff --git a/src/msg.h b/src/msg.h index 452567e..99b3a09 100644 --- a/src/msg.h +++ b/src/msg.h @@ -31,83 +31,150 @@ { using namespace bhome_shm; -// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. -// message format: header(meta) + body(data). +// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. +// message content layout: (meta) / header_size + header + data_size + data typedef boost::uuids::uuid MQId; -// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. -class RefCount : private boost::noncopyable -{ - std::atomic<int> num_; - -public: - RefCount() : - num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } - int Inc() { return ++num_; } - int Dec() { return --num_; } - int Get() { return num_.load(); } -}; - -// message content layout: header_size + header + data_size + data -class MsgI +class ShmMsg { private: + // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. + class RefCount : private boost::noncopyable + { + std::atomic<int> num_; + + public: + RefCount() : + num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } + int Inc() { return ++num_; } + int Dec() { return --num_; } + int Get() { return num_.load(); } + }; + typedef int64_t Offset; + static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } + static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } + static inline Offset BaseAddr() + { + static const Offset base = Addr(shm().get_address()); // cache value. + return base; + } + static inline SharedMemory &shm() + { + if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); } + return *pshm(); + } + static inline SharedMemory *&pshm() + { + static SharedMemory *pshm = 0; + return pshm; + } + struct Meta { RefCount count_; }; - offset_ptr<void> ptr_; - void *Alloc(SharedMemory &shm, const size_t size); - void Free(SharedMemory &shm); + Offset offset_; + void *Alloc(const size_t size) + { + void *p = shm().Alloc(sizeof(Meta) + size); + if (p) { + auto pmeta = new (p) Meta; + p = pmeta + 1; + } + return p; + } + void Free() + { + assert(valid()); + shm().Dealloc(meta()); + offset_ = 0; + assert(!valid()); + } Meta *meta() const { return get<Meta>() - 1; } typedef std::function<void(void *p, int len)> ToArray; - void *Pack(SharedMemory &shm, - const uint32_t head_len, const ToArray &headToArray, - const uint32_t body_len, const ToArray &bodyToArray); + void *Pack(const uint32_t head_len, const ToArray &headToArray, + const uint32_t body_len, const ToArray &bodyToArray) + { + void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); + if (addr) { + auto p = static_cast<char *>(addr); + auto Pack1 = [&p](auto len, auto &writer) { + Put32(p, len); + p += sizeof(len); + writer(p, len); + p += len; + }; + Pack1(head_len, headToArray); + Pack1(body_len, bodyToArray); + } + return addr; + } template <class Body> - void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) + void *Pack(const BHMsgHead &head, const Body &body) { return Pack( - shm, uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); } - void *Pack(SharedMemory &shm, const std::string &content) + void *Pack(const std::string &content) { - void *addr = Alloc(shm, content.size()); + void *addr = Alloc(content.size()); if (addr) { memcpy(addr, content.data(), content.size()); } return addr; } - bool Make(SharedMemory &shm, void *addr); - MsgI(void *p) : - ptr_(p) {} + bool Make(void *addr) + { + if (!addr) { + return false; + } + ShmMsg(addr).swap(*this); + return true; + } + ShmMsg(void *p) : + offset_(p ? (Addr(p) - BaseAddr()) : 0) {} + + template <class T = void> + T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } public: - MsgI() : - MsgI(nullptr) {} - MsgI(SharedMemory &shm, const size_t size) : - MsgI(Alloc(shm, size)) {} - void swap(MsgI &a) { std::swap(ptr_, a.ptr_); } - bool valid() const { return static_cast<bool>(ptr_); } - template <class T = void> - T *get() const { return static_cast<T *>(ptr_.get()); } + static bool BindShm(SharedMemory &shm) + { + assert(!pshm()); + pshm() = &shm; + return true; + } + + ShmMsg() : + ShmMsg(nullptr) {} + explicit ShmMsg(const size_t size) : + ShmMsg(Alloc(size)) {} + void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } + bool valid() const { return static_cast<bool>(offset_); } // AddRef and Release works for both counted and not counted msg. int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } - int Release(SharedMemory &shm); + int Release() + { + if (!valid()) { + return 0; + } + auto n = meta()->count_.Dec(); + if (n == 0) { + Free(); + } + return n; + } int Count() const { return valid() ? meta()->count_.Get() : 1; } template <class Body> - inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) - { - return Make(shm, Pack(shm, head, body)); - } + inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } + inline bool Make(const std::string &content) { return Make(Pack(content)); } template <class Body> static inline std::string Serialize(const BHMsgHead &head, const Body &body) { @@ -126,17 +193,19 @@ assert(pos == s.size()); return s; } - inline bool Make(SharedMemory &shm, const std::string &content) - { - void *p = Pack(shm, content); - return Make(shm, p); - } - bool ParseHead(BHMsgHead &head) const; + bool ParseHead(BHMsgHead &head) const + { + auto p = get<char>(); + assert(p); + uint32_t msg_size = Get32(p); + p += 4; + return head.ParseFromArray(p, msg_size); + } template <class Body> bool ParseBody(Body &body) const { - auto p = static_cast<char *>(ptr_.get()); + auto p = get<char>(); assert(p); uint32_t size = Get32(p); p += 4; @@ -147,7 +216,9 @@ } }; -inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } +inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); } + +typedef ShmMsg MsgI; } // namespace bhome_msg diff --git a/src/sendq.cpp b/src/sendq.cpp index 8aa7214..54de419 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -33,7 +33,7 @@ info.on_expire_(info.data_); } if (info.data_.index() == 0) { - boost::variant2::get<0>(info.data_).Release(mq.shm()); + boost::variant2::get<0>(info.data_).Release(); } } @@ -43,13 +43,13 @@ auto &msg = boost::variant2::get<0>(pos->data().data_); r = mq.TrySend(*(MQId *) remote.data(), msg); if (r) { - msg.Release(mq.shm()); + msg.Release(); } } else { auto &content = boost::variant2::get<1>(pos->data().data_); MsgI msg; - if (msg.Make(mq.shm(), content)) { - DEFER1(msg.Release(mq.shm());); + if (msg.Make(content)) { + DEFER1(msg.Release();); r = mq.TrySend(*(MQId *) remote.data(), msg); } } diff --git a/src/socket.cpp b/src/socket.cpp index c664982..313c212 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -59,7 +59,7 @@ return false; } auto onMsg = [&](MsgI &imsg) { - DEFER1(imsg.Release(shm())); + DEFER1(imsg.Release()); BHMsgHead head; if (imsg.ParseHead(head)) { onRecvWithPerMsgCB(*this, imsg, head); @@ -118,7 +118,7 @@ if (msg.ParseHead(head)) { return true; } else { - msg.Release(shm()); + msg.Release(); } } return false; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 4f0c96f..a5d48b7 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -111,7 +111,7 @@ return sock.Send(&BHTopicCenterAddress(), head, body, onResult); } else { MsgI reply; - DEFER1(reply.Release(shm_);); + DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); if (r) { @@ -139,7 +139,7 @@ return sock.Send(&BHTopicCenterAddress(), head, body); } else { MsgI reply; - DEFER1(reply.Release(shm_);); + DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); @@ -172,7 +172,7 @@ return sock.Send(&BHTopicCenterAddress(), head, body); } else { MsgI reply; - DEFER1(reply.Release(shm_);); + DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; @@ -366,7 +366,7 @@ head.set_topic(request.topic()); MsgI reply_msg; - DEFER1(reply_msg.Release(shm_);); + DEFER1(reply_msg.Release();); BHMsgHead reply_head; if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && @@ -403,7 +403,7 @@ AddRoute(head, sock.id()); MsgI reply; - DEFER1(reply.Release(shm_)); + DEFER1(reply.Release()); BHMsgHead reply_head; if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { @@ -442,7 +442,7 @@ return sock.Send(&BHTopicBusAddress(), head, pub); } else { MsgI reply; - DEFER1(reply.Release(shm());); + DEFER1(reply.Release();); BHMsgHead reply_head; MsgCommonReply reply_body; return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && @@ -475,7 +475,7 @@ return sock.Send(&BHTopicBusAddress(), head, sub); } else { MsgI reply; - DEFER1(reply.Release(shm());); + DEFER1(reply.Release();); BHMsgHead reply_head; return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && @@ -515,7 +515,7 @@ auto &sock = SockSub(); MsgI msg; - DEFER1(msg.Release(shm());); + DEFER1(msg.Release();); BHMsgHead head; //TODO error msg. if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { diff --git a/utest/api_test.cpp b/utest/api_test.cpp index a91db43..200ae99 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -153,17 +153,50 @@ } void unlock() { mutex_.unlock(); } }; + +namespace +{ +typedef int64_t Offset; +Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } +void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } +} // namespace + BOOST_AUTO_TEST_CASE(MutexTest) { - const std::string shm_name("ShmMutex"); - // ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024 * 1024 * 10); + SharedMemory &shm = TestShm(); + MsgI::BindShm(shm); + + void *base_ptr = shm.get_address(); + auto PrintPtr = [&](void *p) { + printf("addr: %ld, ptr: %p, offset: %ld\n", Addr(p), p, Addr(p) - Addr(base_ptr)); + }; + + printf("base"); + PrintPtr(base_ptr); + + MsgI msg; + msg.Make("string data"); + for (int i = 0; i < 10; ++i) { + int n = msg.AddRef(); + printf("add %d ref: %d\n", i, n); + } + for (int i = 0; i < 10; ++i) { + int n = msg.Release(); + printf("release %d, ref : %d\n", i, n); + } + std::this_thread::sleep_for(1s); + msg.Release(); const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(3s); - + auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); auto pi = shm.find_or_construct<int>(int_name.c_str())(100); + + printf("mutetx "); + PrintPtr(mtx); + printf("int "); + PrintPtr(pi); + typedef std::chrono::steady_clock Clock; auto Now = []() { return Clock::now().time_since_epoch(); }; if (pi) { diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp index 817bdac..33c78f5 100644 --- a/utest/simple_tests.cpp +++ b/utest/simple_tests.cpp @@ -106,9 +106,8 @@ BOOST_AUTO_TEST_CASE(TimedWaitTest) { - const std::string shm_name("shm_wait"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024 * 1024); + SharedMemory &shm = TestShm(); + MsgI::BindShm(shm); ShmMsgQueue q(shm, 64); for (int i = 0; i < 2; ++i) { int ms = i * 100; @@ -122,19 +121,19 @@ BOOST_AUTO_TEST_CASE(RefCountTest) { - const std::string shm_name("ShmRefCount"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024 * 1024); + SharedMemory &shm = TestShm(); + typedef MsgI Msg; + Msg::BindShm(shm); - MsgI m0(shm, 1000); + Msg m0(1000); BOOST_CHECK(m0.valid()); BOOST_CHECK_EQUAL(m0.Count(), 1); - MsgI m1 = m0; + Msg m1 = m0; BOOST_CHECK(m1.valid()); BOOST_CHECK_EQUAL(m1.AddRef(), 2); BOOST_CHECK_EQUAL(m0.AddRef(), 3); - BOOST_CHECK_EQUAL(m0.Release(shm), 2); - BOOST_CHECK_EQUAL(m0.Release(shm), 1); - BOOST_CHECK_EQUAL(m1.Release(shm), 0); + BOOST_CHECK_EQUAL(m0.Release(), 2); + BOOST_CHECK_EQUAL(m0.Release(), 1); + BOOST_CHECK_EQUAL(m1.Release(), 0); BOOST_CHECK(!m1.valid()); } diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 5de3c93..4615c53 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -22,16 +22,16 @@ BOOST_AUTO_TEST_CASE(SpeedTest) { - const std::string shm_name("ShmSpeed"); - ShmRemover auto_remove(shm_name); const int mem_size = 1024 * 1024 * 50; + SharedMemory &shm = TestShm(); + MsgI::BindShm(shm); + MQId id = boost::uuids::random_generator()(); const int timeout = 1000; const uint32_t data_size = 4000; const std::string proc_id = "demo_proc"; auto Writer = [&](int writer_id, uint64_t n) { - SharedMemory shm(shm_name, mem_size); ShmMsgQueue mq(shm, 64); std::string str(data_size, 'a'); MsgI msg; @@ -39,22 +39,21 @@ body.set_topic("topic"); body.set_data(str); auto head(InitMsgHead(GetType(body), proc_id)); - msg.Make(shm, head, body); + msg.Make(head, body); assert(msg.valid()); - DEFER1(msg.Release(shm);); + DEFER1(msg.Release();); for (uint64_t i = 0; i < n; ++i) { while (!mq.TrySend(id, msg)) {} } }; auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { - SharedMemory shm(shm_name, mem_size); ShmMsgQueue mq(id, shm, 1000); while (*run) { MsgI msg; BHMsgHead head; if (mq.Recv(msg, timeout)) { - DEFER1(msg.Release(shm)); + DEFER1(msg.Release()); // ok } else if (isfork) { exit(0); // for forked quit after 1s. @@ -62,7 +61,6 @@ } }; auto State = [&](std::atomic<bool> *run) { - SharedMemory shm(shm_name, mem_size); auto init = shm.get_free_memory(); printf("shm init : %ld\n", init); while (*run) { @@ -116,8 +114,6 @@ // Send Recv Test BOOST_AUTO_TEST_CASE(SRTest) { - const std::string shm_name("ShmSendRecv"); - ShmRemover auto_remove(shm_name); const int qlen = 64; const size_t msg_length = 100; std::string msg_content(msg_length, 'a'); @@ -125,7 +121,9 @@ const std::string client_proc_id = "client_proc"; const std::string server_proc_id = "server_proc"; - SharedMemory shm(shm_name, 1024 * 1024 * 512); + SharedMemory &shm = TestShm(); + MsgI::BindShm(shm); + auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); ShmSocket srv(shm, qlen); @@ -174,7 +172,7 @@ while (!stop) { if (srv.SyncRecv(req, req_head, 10)) { - DEFER1(req.Release(shm)); + DEFER1(req.Release()); if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { auto &mqid = req_head.route()[0].mq_id(); diff --git a/utest/utest.cpp b/utest/utest.cpp index fae22b1..ff5d2ed 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -10,7 +10,14 @@ #include <thread> #include <vector> +using namespace bhome_shm; using namespace bhome_msg; + +SharedMemory &TestShm() +{ + static SharedMemory shm("utest_0", 1024 * 1024 * 512); + return shm; +} template <class A, class B> struct IsSameType { @@ -84,10 +91,9 @@ BOOST_AUTO_TEST_CASE(PubSubTest) { - const std::string shm_name("ShmPubSub"); - ShmRemover auto_remove(shm_name); //remove twice? in case of killed? - SharedMemory shm(shm_name, 1024 * 1024 * 50); - DEFER1(shm.Remove()); + SharedMemory &shm = TestShm(); + MsgI::BindShm(shm); + auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); int *flag = shm.find_or_construct<int>("flag")(123); @@ -196,9 +202,8 @@ BOOST_AUTO_TEST_CASE(ReqRepTest) { - const std::string shm_name("ShmReqRep"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024 * 1024 * 512); + SharedMemory &shm = TestShm(); + MsgI::BindShm(shm); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); diff --git a/utest/util.h b/utest/util.h index 4d960db..61e5b11 100644 --- a/utest/util.h +++ b/utest/util.h @@ -20,6 +20,7 @@ #define UTIL_W8A0OA5U #include "bh_util.h" +#include "shm.h" #include "topic_node.h" #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/noncopyable.hpp> @@ -34,7 +35,6 @@ #include <vector> using namespace boost::posix_time; - using namespace std::chrono_literals; template <class D> @@ -132,4 +132,6 @@ } }; +bhome_shm::SharedMemory &TestShm(); + #endif // end of include guard: UTIL_W8A0OA5U -- Gitblit v1.8.0