From ea2ee85202f7b16d7b713bc7a7dcd1fa63bc6213 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 26 三月 2021 11:54:50 +0800
Subject: [PATCH] change refcount to AddRef,Release interface.

---
 src/msg.h         |   13 ++----
 src/shm_queue.h   |    6 ++
 utest/utest.cpp   |   34 +++++++++++------
 src/shm_queue.cpp |   22 ++--------
 src/msg.cpp       |   11 +++++
 5 files changed, 46 insertions(+), 40 deletions(-)

diff --git a/src/msg.cpp b/src/msg.cpp
index 4ddb726..66eec4b 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -59,9 +59,18 @@
 
 }
 
-void Msg::FreeFrom(SharedMemory &shm)
+int Msg::Release(SharedMemory &shm) const
 {
+    if (IsCounted()) {
+        const int n = count_->Dec();
+        if (n != 0) {
+            return n;
+        }
+    }
+    // free data
     shm.Dealloc(ptr_);
     shm.Delete(count_);
+    return 0;
 }
+
 } // namespace bhome_shm
diff --git a/src/msg.h b/src/msg.h
index 44c961f..910efa5 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -69,20 +69,15 @@
 public:
     Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
     void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
-
-    // ~Msg() { RemoveRef(); }
-    // Msg(const Msg &a):ptr_(a.ptr_), count_(a.count_) { AddRef(); }
-    // Msg(Msg &&a):ptr_(a.ptr_), count_(a.count_) { a.ptr_ = 0; a.count_ = 0; }
-    // Msg & operator = (const Msg &a) { Msg(a).swap(*this); }
-    // Msg & operator = (Msg &&a) { Msg(std::move(a)).swap(*this); }
-
     template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
+
+    // AddRef and Release works for both counted and not counted msg.
     int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
-    int RemoveRef()  const{ return IsCounted() ? count_->Dec() : 0; }
+    int Release(SharedMemory &shm) const;
+
     int Count()  const{ return IsCounted() ? count_->Get() : 1; }
     bool IsCounted() const { return static_cast<bool>(count_); }
     bool Build(SharedMemory &shm, const MQId &src_id, const void *p, const size_t size, const bool refcount);
-    void FreeFrom(SharedMemory &shm);
 };
 
 inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 77add97..5e67d1f 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -48,17 +48,7 @@
 bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
 {
     Queue *remote = find(MsgQIdToName(remote_id));
-    
-    if(!remote) {
-        return false;
-    }
-    msg.AddRef();
-    if (remote->Write(msg, timeout_ms)) {
-        return true;
-    } else {
-        msg.RemoveRef();
-        return false;
-    }
+    return remote && remote->Write(msg, timeout_ms, [&](){msg.AddRef();});
 }
 
 bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
@@ -69,13 +59,11 @@
     // 1 is about 50% faster than 2, maybe cache related.
 
     Msg msg;
-    if (msg.Build(shm(), Id(), data, size, false)) {
-        if (Send(remote_id, msg, timeout_ms)) {
+    if(msg.Build(shm(), Id(), data, size, false)) {
+        if(Send(remote_id, msg, timeout_ms)) {
             return true;
         } else {
-            if (msg.RemoveRef() == 0) { // works for both refcounted and not counted.
-                msg.FreeFrom(shm());
-            }
+            msg.Release(shm());
         }
     }
     return false;
@@ -85,7 +73,7 @@
 {
     Msg msg;
     if (Read(msg, timeout_ms)) {
-        DEFER1(if (msg.RemoveRef() == 0) { msg.FreeFrom(shm()); });
+        DEFER1(msg.Release(shm()););
 
         auto ptr = msg.get<char>();
         if (ptr) {
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 023b2d1..14f43c0 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -53,9 +53,11 @@
     using Super::size;
     using Super::capacity;
     const MQId &Id() const { return id_; }
-    bool Write(const D &buf, const int timeout_ms) {
+    template <class OnWrite>
+    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
         Guard lock(mutex());
         if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) {
+            onWrite();
             this->push_back(buf);
             cond_read_.notify_one();
             return true;
@@ -63,6 +65,7 @@
             return false;
         }
     }
+    bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](){}); }
 
     bool Read(D &buf, const int timeout_ms){
         Guard lock(mutex());
@@ -92,6 +95,7 @@
     bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
     const MQId &Id() const { return data()->Id(); }
     bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
+    bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
 };
 
 } // namespace bhome_shm
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 6994cbd..2462ecc 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -10,6 +10,7 @@
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/date_time/microsec_time_clock.hpp>
 #include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
 #include "shm_queue.h"
 #include "bh_util.h"
 
@@ -151,10 +152,10 @@
     BOOST_CHECK(m1.IsCounted());
     BOOST_CHECK_EQUAL(m1.AddRef(), 2);
     BOOST_CHECK_EQUAL(m0.AddRef(), 3);
-    BOOST_CHECK_EQUAL(m0.RemoveRef(), 2);
-    BOOST_CHECK_EQUAL(m0.RemoveRef(), 1);
-    BOOST_CHECK_EQUAL(m1.RemoveRef(), 0);
-    BOOST_CHECK_EQUAL(m1.Count(), 0);
+    BOOST_CHECK_EQUAL(m0.Release(shm), 2);
+    BOOST_CHECK_EQUAL(m0.Release(shm), 1);
+    BOOST_CHECK_EQUAL(m1.Release(shm), 0);
+    BOOST_CHECK_THROW(m1.Count(), std::exception);
 }
 
 BOOST_AUTO_TEST_CASE(MsgHeaderTest)
@@ -209,7 +210,7 @@
     auto Client = [&](int tid, int nmsg){
         for (int i = 0; i < nmsg; ++i) {
             auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
-            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
+            // auto SendRefCounted = [&]() { return cli.Send(srv.Id(), request, 1000); };
 
             if (!Send()) {
                 printf("********** client send error.\n");
@@ -246,9 +247,9 @@
             if (srv.Recv(src_id, data, size, 100)) {
                 DEFER1(free(data));
                 auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
-                auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
+                // auto SendRefCounted = [&](){ return srv.Send(src_id, reply, 100); };
 
-                if (SendRefCounted()) {
+                if (Send()) {
                     if (size != msg_content.size()) {
                         BOOST_TEST(false, "server msg size error");
                     }
@@ -263,7 +264,7 @@
     ThreadManager clients, servers;
     for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
     int ncli = 100*1;
-    uint64_t nmsg = 100*100;
+    uint64_t nmsg = 100*100*10;
     printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
     for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
     clients.WaitAll();
@@ -274,18 +275,27 @@
     BOOST_CHECK_EQUAL(request.Count(), 1);
     BOOST_CHECK(reply.IsCounted());
     BOOST_CHECK_EQUAL(reply.Count(), 1);
-    if (request.RemoveRef() == 0) {
-        BOOST_CHECK_EQUAL(reply.Count(), 0);
-        request.FreeFrom(shm);
-    }
+    request.Release(shm);
+    BOOST_CHECK_THROW(request.Count(), std::exception);
+    BOOST_CHECK_THROW(reply.Count(), std::exception);
+    // BOOST_CHECK_THROW(reply.Count(), int);
 }
 
+inline int MyMin(int a, int b) {
+    printf("MyMin\n");
+    return a < b ? a : b;
+}
 int test_main(int argc, char *argv[])
 {
     printf("test main\n");
     int a = 0;
     int b = 0;
     BOOST_CHECK_EQUAL(a, b);
+    int n = MyMin(4,6);
+    for (int i = 0; i < n; ++i) {
+        printf("i = %d\n", i);
+    }
+
     return 0;
 }
 

--
Gitblit v1.8.0