From dc86ace85e437ecb8a2e728e4dce36d02bbb8a6e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 12:59:50 +0800 Subject: [PATCH] move ref count into msg meta, only 1 poinetr now. --- utest/speed_test.cpp | 6 +- src/failed_msg.h | 2 box/status_main.cc | 16 +++++ utest/simple_tests.cpp | 8 +- box/center.cpp | 1 src/msg.h | 45 ++++++-------- src/failed_msg.cpp | 13 +--- src/sendq.cpp | 6 - src/msg.cpp | 64 +++++++++------------ 9 files changed, 75 insertions(+), 86 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 3059e90..0f547e9 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -454,7 +454,6 @@ replyer(reply); } else { replyer(MakeReply(eSuccess)); - if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? if (clients.empty()) { return; } auto it = clients.begin(); diff --git a/box/status_main.cc b/box/status_main.cc index 3a0288b..a435c2f 100644 --- a/box/status_main.cc +++ b/box/status_main.cc @@ -31,8 +31,22 @@ int status_main(int argc, char const *argv[]) { - auto &shm = BHomeShm(); + AppArg args(argc, argv); + auto shm_name = args.Get("shm", BHomeShm().name()); + auto shm_size = std::atol(args.Get("size", "").c_str()); + if (shm_size <= 0 || shm_size > 512) { + shm_size = 50; + } + auto DisplayName = [&]() -> std::string { + if (shm_name == BHomeShm().name()) { + return "[bhome shm]"; + } else { + return shm_name; + } + }; + printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size); + SharedMemory shm(shm_name, 1024 * 1024 * shm_size); std::atomic<bool> run(true); auto Now = []() { return steady_clock::now(); }; diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp index f128499..0b3ee42 100644 --- a/src/failed_msg.cpp +++ b/src/failed_msg.cpp @@ -17,18 +17,13 @@ */ #include "failed_msg.h" -FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg) +FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg msg) { msg.AddRef(); - return [remote, msg](void *valid_sock) { + return [remote, msg](void *valid_sock) mutable { assert(valid_sock); ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); - bool r = sock.Send(remote.data(), msg); - //TODO check remote removed. - if (r && msg.IsCounted()) { - auto tmp = msg; // Release() is not const, but it's safe to release. - tmp.Release(sock.shm()); - } - return r; + DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release. + return sock.Send(remote.data(), msg); }; } \ No newline at end of file diff --git a/src/failed_msg.h b/src/failed_msg.h index 73030ba..8a810c7 100644 --- a/src/failed_msg.h +++ b/src/failed_msg.h @@ -40,7 +40,7 @@ } private: - Func PrepareSender(const std::string &remote, Msg const &msg); + Func PrepareSender(const std::string &remote, Msg msg); TimedFuncQ queue_; }; diff --git a/src/msg.cpp b/src/msg.cpp index 06b817e..ba844da 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -26,11 +26,28 @@ //*/ const uint32_t kMsgTag = 0xf1e2d3c4; +void *MsgI::Alloc(SharedMemory &shm, const size_t size) +{ + void *p = shm.Alloc(sizeof(Meta) + size); + if (p) { + auto pmeta = new (p) Meta; + p = pmeta + 1; + } + return p; +} +void MsgI::Free(SharedMemory &shm) +{ + assert(valid()); + shm.Dealloc(meta()); + ptr_ = nullptr; + assert(!valid()); +} + void *MsgI::Pack(SharedMemory &shm, const uint32_t head_len, const ToArray &headToArray, const uint32_t body_len, const ToArray &bodyToArray) { - void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); + void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len); if (addr) { auto p = static_cast<char *>(addr); auto Pack1 = [&p](auto len, auto &writer) { @@ -47,26 +64,11 @@ bool MsgI::ParseHead(BHMsgHead &head) const { - auto p = static_cast<char *>(ptr_.get()); + auto p = get<char>(); assert(p); uint32_t msg_size = Get32(p); p += 4; return head.ParseFromArray(p, msg_size); -} - -// with ref count; -bool MsgI::MakeRC(SharedMemory &shm, void *p) -{ - if (!p) { - return false; - } - RefCount *rc = shm.New<RefCount>(); - if (!rc) { - shm.Dealloc(p); - return false; - } - MsgI(p, rc).swap(*this); - return true; } bool MsgI::Make(SharedMemory &shm, void *p) @@ -74,32 +76,20 @@ if (!p) { return false; } - MsgI(p, 0).swap(*this); + MsgI(p).swap(*this); return true; -} - -bool MsgI::EnableRefCount(SharedMemory &shm) -{ - if (!IsCounted()) { - count_ = shm.New<RefCount>(); - } - return IsCounted(); } int MsgI::Release(SharedMemory &shm) { - if (IsCounted()) { - const int n = count_->Dec(); - if (n != 0) { - return n; - } + if (!valid()) { + return 0; } - // free data - shm.Dealloc(ptr_); - ptr_ = 0; - shm.Delete(count_); - count_ = 0; - return 0; + auto n = meta()->count_.Dec(); + if (n == 0) { + Free(shm); + } + return n; } } // namespace bhome_msg diff --git a/src/msg.h b/src/msg.h index feab5ec..452567e 100644 --- a/src/msg.h +++ b/src/msg.h @@ -53,8 +53,13 @@ class MsgI { private: + struct Meta { + RefCount count_; + }; offset_ptr<void> ptr_; - offset_ptr<RefCount> count_; + void *Alloc(SharedMemory &shm, const size_t size); + void Free(SharedMemory &shm); + Meta *meta() const { return get<Meta>() - 1; } typedef std::function<void(void *p, int len)> ToArray; void *Pack(SharedMemory &shm, @@ -72,48 +77,36 @@ void *Pack(SharedMemory &shm, const std::string &content) { - void *addr = shm.Alloc(content.size()); + void *addr = Alloc(shm, content.size()); if (addr) { memcpy(addr, content.data(), content.size()); } return addr; } - bool MakeRC(SharedMemory &shm, void *addr); bool Make(SharedMemory &shm, void *addr); + MsgI(void *p) : + ptr_(p) {} public: - MsgI(void *p = 0, RefCount *c = 0) : - ptr_(p), count_(c) {} - - void swap(MsgI &a) - { - std::swap(ptr_, a.ptr_); - std::swap(count_, a.count_); - } + MsgI() : + MsgI(nullptr) {} + MsgI(SharedMemory &shm, const size_t size) : + MsgI(Alloc(shm, size)) {} + void swap(MsgI &a) { std::swap(ptr_, a.ptr_); } + bool valid() const { return static_cast<bool>(ptr_); } template <class T = void> - T *get() { return static_cast<T *>(ptr_.get()); } + T *get() const { return static_cast<T *>(ptr_.get()); } // AddRef and Release works for both counted and not counted msg. - int AddRef() const { return IsCounted() ? count_->Inc() : 1; } + int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } int Release(SharedMemory &shm); + int Count() const { return valid() ? meta()->count_.Get() : 1; } - int Count() const { return IsCounted() ? count_->Get() : 1; } - bool IsCounted() const { return static_cast<bool>(count_); } - - template <class Body> - inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) - { - return MakeRC(shm, Pack(shm, head, body)); - } - - bool EnableRefCount(SharedMemory &shm); template <class Body> inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) { - void *p = Pack(shm, head, body); - auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; }; - return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p); + return Make(shm, Pack(shm, head, body)); } template <class Body> static inline std::string Serialize(const BHMsgHead &head, const Body &body) diff --git a/src/sendq.cpp b/src/sendq.cpp index 4be24f1..8aa7214 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -42,17 +42,15 @@ if (d.index() == 0) { auto &msg = boost::variant2::get<0>(pos->data().data_); r = mq.TrySend(*(MQId *) remote.data(), msg); - if (r && msg.IsCounted()) { + if (r) { msg.Release(mq.shm()); } } else { auto &content = boost::variant2::get<1>(pos->data().data_); MsgI msg; if (msg.Make(mq.shm(), content)) { + DEFER1(msg.Release(mq.shm());); r = mq.TrySend(*(MQId *) remote.data(), msg); - if (!r || msg.IsCounted()) { - msg.Release(mq.shm()); - } } } return r; diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp index cbbcc2a..817bdac 100644 --- a/utest/simple_tests.cpp +++ b/utest/simple_tests.cpp @@ -126,15 +126,15 @@ ShmRemover auto_remove(shm_name); SharedMemory shm(shm_name, 1024 * 1024); - MsgI m0(shm.Alloc(1000), shm.New<RefCount>()); - BOOST_CHECK(m0.IsCounted()); + MsgI m0(shm, 1000); + BOOST_CHECK(m0.valid()); BOOST_CHECK_EQUAL(m0.Count(), 1); MsgI m1 = m0; - BOOST_CHECK(m1.IsCounted()); + BOOST_CHECK(m1.valid()); BOOST_CHECK_EQUAL(m1.AddRef(), 2); BOOST_CHECK_EQUAL(m0.AddRef(), 3); BOOST_CHECK_EQUAL(m0.Release(shm), 2); BOOST_CHECK_EQUAL(m0.Release(shm), 1); BOOST_CHECK_EQUAL(m1.Release(shm), 0); - BOOST_CHECK(!m1.IsCounted()); + BOOST_CHECK(!m1.valid()); } diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 86367b9..5de3c93 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -39,12 +39,12 @@ body.set_topic("topic"); body.set_data(str); auto head(InitMsgHead(GetType(body), proc_id)); - msg.MakeRC(shm, head, body); - assert(msg.IsCounted()); + msg.Make(shm, head, body); + assert(msg.valid()); DEFER1(msg.Release(shm);); for (uint64_t i = 0; i < n; ++i) { - mq.Send(id, msg, timeout); + while (!mq.TrySend(id, msg)) {} } }; auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { -- Gitblit v1.8.0