From f51636c193d032723c47343e39ff8296db350200 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 18:04:00 +0800
Subject: [PATCH] change msg to use protobuf, add more msg type.

---
 utest/utest.cpp |  100 +++++++++++++++++--------------------------------
 1 files changed, 35 insertions(+), 65 deletions(-)

diff --git a/utest/utest.cpp b/utest/utest.cpp
index 23af0b8..6e41116 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -178,10 +178,8 @@
         int ms = i * 100;
         printf("Timeout Test %4d: ", ms);
         boost::timer::auto_cpu_timer timer;
-        MQId id;
-        void *data;
-        size_t size;
-        bool r = q.Recv(id, data, size, ms);
+        BHMsg msg;
+        bool r = q.Recv(msg, ms);
         BOOST_CHECK(!r);
     }
 }
@@ -192,10 +190,10 @@
     ShmRemover auto_remove(shm_name);
     SharedMemory shm(shm_name, 1024*1024);
 
-    Msg m0(shm.Alloc(1000), shm.New<RefCount>());
+    MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
     BOOST_CHECK(m0.IsCounted());
     BOOST_CHECK_EQUAL(m0.Count(), 1);
-    Msg m1 = m0;
+    MsgI m1 = m0;
     BOOST_CHECK(m1.IsCounted());
     BOOST_CHECK_EQUAL(m1.AddRef(), 2);
     BOOST_CHECK_EQUAL(m0.AddRef(), 3);
@@ -205,25 +203,7 @@
     BOOST_CHECK(!m1.IsCounted());
 }
 
-BOOST_AUTO_TEST_CASE(MsgHeaderTest)
-{
-    MsgMetaV1 head;
-    BOOST_CHECK_EQUAL(head.self_size_, sizeof(head));
-    BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
-    BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
-    BOOST_CHECK_EQUAL(head.data_size_, 0);
-    BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid());
 
-    head.data_size_ = 100;
-    head.src_id_ = boost::uuids::random_generator()();
-    head.type_ = 123;
-
-    char buf[100] = {0};
-    head.Pack(buf);
-    MsgMetaV1 result;
-    result.Parse(buf);
-    BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
-}
 
 BOOST_AUTO_TEST_CASE(SpeedTest)
 {
@@ -238,10 +218,10 @@
         SharedMemory shm(shm_name, mem_size);
         ShmMsgQueue mq(shm, 64);
         std::string str(data_size, 'a');
-        Msg msg;
+        MsgI msg;
         DEFER1(msg.Release(shm););
-        msg.Build(shm, mq.Id(), str.data(), str.size(), true);
-        for (int i = 0; i < n; ++i) {
+        msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
+        for (uint64_t i = 0; i < n; ++i) {
             // mq.Send(id, str.data(), str.size(), timeout);
             mq.Send(id, msg, timeout);
         }
@@ -250,16 +230,9 @@
         SharedMemory shm(shm_name, mem_size);
         ShmMsgQueue mq(id, shm, 1000);
         while(*run) {
-            Msg msg;
+            BHMsg msg;
             if (mq.Recv(msg, timeout)) {
-                MsgMetaV1 header;
-                if (!header.Parse(msg.get())) {
-                    BOOST_CHECK(false);
-                }
-                if (header.data_size_ != data_size) {
-                    BOOST_CHECK(false);
-                }
-                msg.Release(shm);
+                // ok
             } else if (isfork) {
                 exit(0); // for forked quit after 1s.
             }
@@ -332,8 +305,10 @@
     ShmMsgQueue srv(shm, qlen);
     ShmMsgQueue cli(shm, qlen);
 
-    Msg ref_counted_msg;
-    ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
+    MsgI request_rc;
+    request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
+    MsgI reply_rc;
+    reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
 
     std::atomic<uint64_t> count(0);
 
@@ -342,29 +317,25 @@
 
     auto Client = [&](int cli_id, int nmsg){
         for (int i = 0; i < nmsg; ++i) {
-            auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
-            auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
+            auto Req = [&]() {
+                return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
+            };
+            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
 
-            if (!SendRefCounted()) {
+            if (!ReqRC()) {
                 printf("********** client send error.\n");
                 continue;
             }
-            MQId id;
-            void *data = 0;
-            size_t size = 0;
-            if (!cli.Recv(id, data, size, 1000)) {
+            BHMsg msg;
+            if (!cli.Recv(msg, 1000)) {
                 printf("********** client recv error.\n");
             } else {
-                DEFER1(free(data));
-                if(size != msg_length) {
-                    BOOST_CHECK(false);
-                }
                 ++count;
                 auto cur = Now();
                 if (last_time.exchange(cur) != cur) {
                     std::cout << "time: " << Now();
                     printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
-                           count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
+                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
                     last_time = cur;
                 }
 
@@ -374,19 +345,18 @@
 
     std::atomic<bool> stop(false);
     auto Server = [&](){
-        void *data = 0;
-        size_t size = 0;
-        MQId src_id;
+        BHMsg req;
         while (!stop) {
-            if (srv.Recv(src_id, data, size, 100)) {
-                DEFER1(free(data));
-                auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
-                auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
+            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
+                auto &mqid = req.route()[0].mq_id();
+                MQId src_id;
+                memcpy(&src_id, mqid.data(), sizeof(src_id));
+                auto Reply = [&]() {
+                    return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
+                };
+                auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
 
-                if (SendRefCounted()) {
-                    if (size != msg_content.size()) {
-                        BOOST_TEST(false, "server msg size error");
-                    }
+                if (ReplyRC()) {
                 }
             }
         }
@@ -405,10 +375,10 @@
     printf("request ok: %ld\n", count.load());
     stop = true;
     servers.WaitAll();
-    BOOST_CHECK(ref_counted_msg.IsCounted());
-    BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
-    ref_counted_msg.Release(shm);
-    BOOST_CHECK(!ref_counted_msg.IsCounted());
+    BOOST_CHECK(request_rc.IsCounted());
+    BOOST_CHECK_EQUAL(request_rc.Count(), 1);
+    request_rc.Release(shm);
+    BOOST_CHECK(!request_rc.IsCounted());
     // BOOST_CHECK_THROW(reply.Count(), int);
 }
 

--
Gitblit v1.8.0