From ea2ee85202f7b16d7b713bc7a7dcd1fa63bc6213 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 26 三月 2021 11:54:50 +0800
Subject: [PATCH] change refcount to AddRef,Release interface.
---
src/msg.h | 13 ++----
src/shm_queue.h | 6 ++
utest/utest.cpp | 34 +++++++++++------
src/shm_queue.cpp | 22 ++--------
src/msg.cpp | 11 +++++
5 files changed, 46 insertions(+), 40 deletions(-)
diff --git a/src/msg.cpp b/src/msg.cpp
index 4ddb726..66eec4b 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -59,9 +59,18 @@
}
-void Msg::FreeFrom(SharedMemory &shm)
+int Msg::Release(SharedMemory &shm) const
{
+ if (IsCounted()) {
+ const int n = count_->Dec();
+ if (n != 0) {
+ return n;
+ }
+ }
+ // free data
shm.Dealloc(ptr_);
shm.Delete(count_);
+ return 0;
}
+
} // namespace bhome_shm
diff --git a/src/msg.h b/src/msg.h
index 44c961f..910efa5 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -69,20 +69,15 @@
public:
Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
-
- // ~Msg() { RemoveRef(); }
- // Msg(const Msg &a):ptr_(a.ptr_), count_(a.count_) { AddRef(); }
- // Msg(Msg &&a):ptr_(a.ptr_), count_(a.count_) { a.ptr_ = 0; a.count_ = 0; }
- // Msg & operator = (const Msg &a) { Msg(a).swap(*this); }
- // Msg & operator = (Msg &&a) { Msg(std::move(a)).swap(*this); }
-
template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
+
+ // AddRef and Release works for both counted and not counted msg.
int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
- int RemoveRef() const{ return IsCounted() ? count_->Dec() : 0; }
+ int Release(SharedMemory &shm) const;
+
int Count() const{ return IsCounted() ? count_->Get() : 1; }
bool IsCounted() const { return static_cast<bool>(count_); }
bool Build(SharedMemory &shm, const MQId &src_id, const void *p, const size_t size, const bool refcount);
- void FreeFrom(SharedMemory &shm);
};
inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 77add97..5e67d1f 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -48,17 +48,7 @@
bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
{
Queue *remote = find(MsgQIdToName(remote_id));
-
- if(!remote) {
- return false;
- }
- msg.AddRef();
- if (remote->Write(msg, timeout_ms)) {
- return true;
- } else {
- msg.RemoveRef();
- return false;
- }
+ return remote && remote->Write(msg, timeout_ms, [&](){msg.AddRef();});
}
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
@@ -69,13 +59,11 @@
// 1 is about 50% faster than 2, maybe cache related.
Msg msg;
- if (msg.Build(shm(), Id(), data, size, false)) {
- if (Send(remote_id, msg, timeout_ms)) {
+ if(msg.Build(shm(), Id(), data, size, false)) {
+ if(Send(remote_id, msg, timeout_ms)) {
return true;
} else {
- if (msg.RemoveRef() == 0) { // works for both refcounted and not counted.
- msg.FreeFrom(shm());
- }
+ msg.Release(shm());
}
}
return false;
@@ -85,7 +73,7 @@
{
Msg msg;
if (Read(msg, timeout_ms)) {
- DEFER1(if (msg.RemoveRef() == 0) { msg.FreeFrom(shm()); });
+ DEFER1(msg.Release(shm()););
auto ptr = msg.get<char>();
if (ptr) {
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 023b2d1..14f43c0 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -53,9 +53,11 @@
using Super::size;
using Super::capacity;
const MQId &Id() const { return id_; }
- bool Write(const D &buf, const int timeout_ms) {
+ template <class OnWrite>
+ bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
Guard lock(mutex());
if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
+ onWrite();
this->push_back(buf);
cond_read_.notify_one();
return true;
@@ -63,6 +65,7 @@
return false;
}
}
+ bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](){}); }
bool Read(D &buf, const int timeout_ms){
Guard lock(mutex());
@@ -92,6 +95,7 @@
bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
const MQId &Id() const { return data()->Id(); }
bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
+ bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
};
} // namespace bhome_shm
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 6994cbd..2462ecc 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -10,6 +10,7 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/microsec_time_clock.hpp>
#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
#include "shm_queue.h"
#include "bh_util.h"
@@ -151,10 +152,10 @@
BOOST_CHECK(m1.IsCounted());
BOOST_CHECK_EQUAL(m1.AddRef(), 2);
BOOST_CHECK_EQUAL(m0.AddRef(), 3);
- BOOST_CHECK_EQUAL(m0.RemoveRef(), 2);
- BOOST_CHECK_EQUAL(m0.RemoveRef(), 1);
- BOOST_CHECK_EQUAL(m1.RemoveRef(), 0);
- BOOST_CHECK_EQUAL(m1.Count(), 0);
+ BOOST_CHECK_EQUAL(m0.Release(shm), 2);
+ BOOST_CHECK_EQUAL(m0.Release(shm), 1);
+ BOOST_CHECK_EQUAL(m1.Release(shm), 0);
+ BOOST_CHECK_THROW(m1.Count(), std::exception);
}
BOOST_AUTO_TEST_CASE(MsgHeaderTest)
@@ -209,7 +210,7 @@
auto Client = [&](int tid, int nmsg){
for (int i = 0; i < nmsg; ++i) {
auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
- auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
+ // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
if (!Send()) {
printf("********** client send error.\n");
@@ -246,9 +247,9 @@
if (srv.Recv(src_id, data, size, 100)) {
DEFER1(free(data));
auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
- auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
+ // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
- if (SendRefCounted()) {
+ if (Send()) {
if (size != msg_content.size()) {
BOOST_TEST(false, "server msg size error");
}
@@ -263,7 +264,7 @@
ThreadManager clients, servers;
for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
int ncli = 100*1;
- uint64_t nmsg = 100*100;
+ uint64_t nmsg = 100*100*10;
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
clients.WaitAll();
@@ -274,18 +275,27 @@
BOOST_CHECK_EQUAL(request.Count(), 1);
BOOST_CHECK(reply.IsCounted());
BOOST_CHECK_EQUAL(reply.Count(), 1);
- if (request.RemoveRef() == 0) {
- BOOST_CHECK_EQUAL(reply.Count(), 0);
- request.FreeFrom(shm);
- }
+ request.Release(shm);
+ BOOST_CHECK_THROW(request.Count(), std::exception);
+ BOOST_CHECK_THROW(reply.Count(), std::exception);
+ // BOOST_CHECK_THROW(reply.Count(), int);
}
+inline int MyMin(int a, int b) {
+ printf("MyMin\n");
+ return a < b ? a : b;
+}
int test_main(int argc, char *argv[])
{
printf("test main\n");
int a = 0;
int b = 0;
BOOST_CHECK_EQUAL(a, b);
+ int n = MyMin(4,6);
+ for (int i = 0; i < n; ++i) {
+ printf("i = %d\n", i);
+ }
+
return 0;
}
--
Gitblit v1.8.0