From dc86ace85e437ecb8a2e728e4dce36d02bbb8a6e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 12:59:50 +0800
Subject: [PATCH] move ref count into msg meta, only 1 poinetr now.
---
utest/speed_test.cpp | 6 +-
src/failed_msg.h | 2
box/status_main.cc | 16 +++++
utest/simple_tests.cpp | 8 +-
box/center.cpp | 1
src/msg.h | 45 ++++++--------
src/failed_msg.cpp | 13 +---
src/sendq.cpp | 6 -
src/msg.cpp | 64 +++++++++------------
9 files changed, 75 insertions(+), 86 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 3059e90..0f547e9 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -454,7 +454,6 @@
replyer(reply);
} else {
replyer(MakeReply(eSuccess));
- if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
if (clients.empty()) { return; }
auto it = clients.begin();
diff --git a/box/status_main.cc b/box/status_main.cc
index 3a0288b..a435c2f 100644
--- a/box/status_main.cc
+++ b/box/status_main.cc
@@ -31,8 +31,22 @@
int status_main(int argc, char const *argv[])
{
- auto &shm = BHomeShm();
+ AppArg args(argc, argv);
+ auto shm_name = args.Get("shm", BHomeShm().name());
+ auto shm_size = std::atol(args.Get("size", "").c_str());
+ if (shm_size <= 0 || shm_size > 512) {
+ shm_size = 50;
+ }
+ auto DisplayName = [&]() -> std::string {
+ if (shm_name == BHomeShm().name()) {
+ return "[bhome shm]";
+ } else {
+ return shm_name;
+ }
+ };
+ printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size);
+ SharedMemory shm(shm_name, 1024 * 1024 * shm_size);
std::atomic<bool> run(true);
auto Now = []() { return steady_clock::now(); };
diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp
index f128499..0b3ee42 100644
--- a/src/failed_msg.cpp
+++ b/src/failed_msg.cpp
@@ -17,18 +17,13 @@
*/
#include "failed_msg.h"
-FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg const &msg)
+FailedMsgQ::Func FailedMsgQ::PrepareSender(const std::string &remote, Msg msg)
{
msg.AddRef();
- return [remote, msg](void *valid_sock) {
+ return [remote, msg](void *valid_sock) mutable {
assert(valid_sock);
ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
- bool r = sock.Send(remote.data(), msg);
- //TODO check remote removed.
- if (r && msg.IsCounted()) {
- auto tmp = msg; // Release() is not const, but it's safe to release.
- tmp.Release(sock.shm());
- }
- return r;
+ DEFER1(msg.Release(sock.shm())); // Release() is not const, but it's safe to release.
+ return sock.Send(remote.data(), msg);
};
}
\ No newline at end of file
diff --git a/src/failed_msg.h b/src/failed_msg.h
index 73030ba..8a810c7 100644
--- a/src/failed_msg.h
+++ b/src/failed_msg.h
@@ -40,7 +40,7 @@
}
private:
- Func PrepareSender(const std::string &remote, Msg const &msg);
+ Func PrepareSender(const std::string &remote, Msg msg);
TimedFuncQ queue_;
};
diff --git a/src/msg.cpp b/src/msg.cpp
index 06b817e..ba844da 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -26,11 +26,28 @@
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
+void *MsgI::Alloc(SharedMemory &shm, const size_t size)
+{
+ void *p = shm.Alloc(sizeof(Meta) + size);
+ if (p) {
+ auto pmeta = new (p) Meta;
+ p = pmeta + 1;
+ }
+ return p;
+}
+void MsgI::Free(SharedMemory &shm)
+{
+ assert(valid());
+ shm.Dealloc(meta());
+ ptr_ = nullptr;
+ assert(!valid());
+}
+
void *MsgI::Pack(SharedMemory &shm,
const uint32_t head_len, const ToArray &headToArray,
const uint32_t body_len, const ToArray &bodyToArray)
{
- void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
+ void *addr = Alloc(shm, sizeof(head_len) + head_len + sizeof(body_len) + body_len);
if (addr) {
auto p = static_cast<char *>(addr);
auto Pack1 = [&p](auto len, auto &writer) {
@@ -47,26 +64,11 @@
bool MsgI::ParseHead(BHMsgHead &head) const
{
- auto p = static_cast<char *>(ptr_.get());
+ auto p = get<char>();
assert(p);
uint32_t msg_size = Get32(p);
p += 4;
return head.ParseFromArray(p, msg_size);
-}
-
-// with ref count;
-bool MsgI::MakeRC(SharedMemory &shm, void *p)
-{
- if (!p) {
- return false;
- }
- RefCount *rc = shm.New<RefCount>();
- if (!rc) {
- shm.Dealloc(p);
- return false;
- }
- MsgI(p, rc).swap(*this);
- return true;
}
bool MsgI::Make(SharedMemory &shm, void *p)
@@ -74,32 +76,20 @@
if (!p) {
return false;
}
- MsgI(p, 0).swap(*this);
+ MsgI(p).swap(*this);
return true;
-}
-
-bool MsgI::EnableRefCount(SharedMemory &shm)
-{
- if (!IsCounted()) {
- count_ = shm.New<RefCount>();
- }
- return IsCounted();
}
int MsgI::Release(SharedMemory &shm)
{
- if (IsCounted()) {
- const int n = count_->Dec();
- if (n != 0) {
- return n;
- }
+ if (!valid()) {
+ return 0;
}
- // free data
- shm.Dealloc(ptr_);
- ptr_ = 0;
- shm.Delete(count_);
- count_ = 0;
- return 0;
+ auto n = meta()->count_.Dec();
+ if (n == 0) {
+ Free(shm);
+ }
+ return n;
}
} // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index feab5ec..452567e 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -53,8 +53,13 @@
class MsgI
{
private:
+ struct Meta {
+ RefCount count_;
+ };
offset_ptr<void> ptr_;
- offset_ptr<RefCount> count_;
+ void *Alloc(SharedMemory &shm, const size_t size);
+ void Free(SharedMemory &shm);
+ Meta *meta() const { return get<Meta>() - 1; }
typedef std::function<void(void *p, int len)> ToArray;
void *Pack(SharedMemory &shm,
@@ -72,48 +77,36 @@
void *Pack(SharedMemory &shm, const std::string &content)
{
- void *addr = shm.Alloc(content.size());
+ void *addr = Alloc(shm, content.size());
if (addr) {
memcpy(addr, content.data(), content.size());
}
return addr;
}
- bool MakeRC(SharedMemory &shm, void *addr);
bool Make(SharedMemory &shm, void *addr);
+ MsgI(void *p) :
+ ptr_(p) {}
public:
- MsgI(void *p = 0, RefCount *c = 0) :
- ptr_(p), count_(c) {}
-
- void swap(MsgI &a)
- {
- std::swap(ptr_, a.ptr_);
- std::swap(count_, a.count_);
- }
+ MsgI() :
+ MsgI(nullptr) {}
+ MsgI(SharedMemory &shm, const size_t size) :
+ MsgI(Alloc(shm, size)) {}
+ void swap(MsgI &a) { std::swap(ptr_, a.ptr_); }
+ bool valid() const { return static_cast<bool>(ptr_); }
template <class T = void>
- T *get() { return static_cast<T *>(ptr_.get()); }
+ T *get() const { return static_cast<T *>(ptr_.get()); }
// AddRef and Release works for both counted and not counted msg.
- int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
+ int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
int Release(SharedMemory &shm);
+ int Count() const { return valid() ? meta()->count_.Get() : 1; }
- int Count() const { return IsCounted() ? count_->Get() : 1; }
- bool IsCounted() const { return static_cast<bool>(count_); }
-
- template <class Body>
- inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
- {
- return MakeRC(shm, Pack(shm, head, body));
- }
-
- bool EnableRefCount(SharedMemory &shm);
template <class Body>
inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
{
- void *p = Pack(shm, head, body);
- auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
- return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
+ return Make(shm, Pack(shm, head, body));
}
template <class Body>
static inline std::string Serialize(const BHMsgHead &head, const Body &body)
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 4be24f1..8aa7214 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -42,17 +42,15 @@
if (d.index() == 0) {
auto &msg = boost::variant2::get<0>(pos->data().data_);
r = mq.TrySend(*(MQId *) remote.data(), msg);
- if (r && msg.IsCounted()) {
+ if (r) {
msg.Release(mq.shm());
}
} else {
auto &content = boost::variant2::get<1>(pos->data().data_);
MsgI msg;
if (msg.Make(mq.shm(), content)) {
+ DEFER1(msg.Release(mq.shm()););
r = mq.TrySend(*(MQId *) remote.data(), msg);
- if (!r || msg.IsCounted()) {
- msg.Release(mq.shm());
- }
}
}
return r;
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index cbbcc2a..817bdac 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -126,15 +126,15 @@
ShmRemover auto_remove(shm_name);
SharedMemory shm(shm_name, 1024 * 1024);
- MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
- BOOST_CHECK(m0.IsCounted());
+ MsgI m0(shm, 1000);
+ BOOST_CHECK(m0.valid());
BOOST_CHECK_EQUAL(m0.Count(), 1);
MsgI m1 = m0;
- BOOST_CHECK(m1.IsCounted());
+ BOOST_CHECK(m1.valid());
BOOST_CHECK_EQUAL(m1.AddRef(), 2);
BOOST_CHECK_EQUAL(m0.AddRef(), 3);
BOOST_CHECK_EQUAL(m0.Release(shm), 2);
BOOST_CHECK_EQUAL(m0.Release(shm), 1);
BOOST_CHECK_EQUAL(m1.Release(shm), 0);
- BOOST_CHECK(!m1.IsCounted());
+ BOOST_CHECK(!m1.valid());
}
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 86367b9..5de3c93 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -39,12 +39,12 @@
body.set_topic("topic");
body.set_data(str);
auto head(InitMsgHead(GetType(body), proc_id));
- msg.MakeRC(shm, head, body);
- assert(msg.IsCounted());
+ msg.Make(shm, head, body);
+ assert(msg.valid());
DEFER1(msg.Release(shm););
for (uint64_t i = 0; i < n; ++i) {
- mq.Send(id, msg, timeout);
+ while (!mq.TrySend(id, msg)) {}
}
};
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
--
Gitblit v1.8.0