From f51636c193d032723c47343e39ff8296db350200 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 18:04:00 +0800
Subject: [PATCH] change msg to use protobuf, add more msg type.
---
src/msg.h | 46 +++----
src/shm_queue.h | 16 +-
proto/source/bhome_msg.proto | 3
src/pubsub.cpp | 7
src/CMakeLists.txt | 2
utest/utest.cpp | 100 +++++----------
src/shm_queue.cpp | 64 +++++-----
src/msg.cpp | 121 +++++++++++++++----
8 files changed, 196 insertions(+), 163 deletions(-)
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 01498fa..8365d2f 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -1,5 +1,7 @@
syntax = "proto3";
+option optimize_for = LITE_RUNTIME;
+
package bhome.msg;
message BHAddress {
@@ -22,6 +24,7 @@
kMsgTypeReply = 2;
kMsgTypePublish = 3;
kMsgTypeSubscribe = 4;
+ kMsgTypeUnsubscribe = 5;
}
message DataPub {
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ac5f05f..1db0086 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,4 +5,4 @@
add_library(${Target} ${sources})
-target_link_libraries(${Target} pthread rt)
+target_link_libraries(${Target} bhome_msg pthread rt)
diff --git a/src/msg.cpp b/src/msg.cpp
index bb193ec..ed8adba 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -16,50 +16,115 @@
* =====================================================================================
*/
#include "msg.h"
+#include "bh_util.h"
namespace bhome_msg {
-
-bool MsgMetaV1::Parse(const void *p)
+const uint32_t kMsgTag = 0xf1e2d3c4;
+const uint32_t kMsgPrefixLen = 4;
+
+BHMsg InitMsg(MsgType type)
{
- assert(p);
- *this = *static_cast<const MsgMetaV1*>(p);
- return tag_ == kMsgMetaTag;
+ BHMsg msg;
+ msg.set_type(type);
+ time_t tm = 0;
+ msg.set_timestamp(time(&tm));
+ return msg;
}
-void MsgMetaV1::Pack(void *p)
+BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
{
- *static_cast<MsgMetaV1*>(p) = *this;
+ assert(data && size);
+ BHMsg msg(InitMsg(kMsgTypeRequest));
+ msg.set_body(data, size);
+ BHAddress addr;
+ msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
+ return msg;
}
-bool Msg::Build(SharedMemory &shm, const MQId &src_id, const void *data, const size_t size, const bool refcount)
+BHMsg MakeReply(const void *data, const size_t size)
{
- if (!data || !size) {
- return false;
+ assert(data && size);
+ BHMsg msg(InitMsg(kMsgTypeReply));
+ msg.set_body(data, size);
+ return msg;
+}
+
+BHMsg MakeSubUnsub(const std::vector<std::string> &topics, const MsgType sub_unsub)
+{
+ assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
+ BHMsg msg(InitMsg(sub_unsub));
+ DataSub subs;
+ for (auto &t : topics) {
+ subs.add_topics(t);
}
- void *p = shm.Alloc(sizeof(MsgMetaV1) + size);
- if (!p) {
- return false;
- }
- RefCount *rc = 0;
- if (refcount) {
- rc = shm.New<RefCount>();
- if (!rc) {
+ msg.set_body(subs.SerializeAsString());
+ return msg;
+}
+
+BHMsg MakeSub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeSubscribe); }
+BHMsg MakeUnsub(const std::vector<std::string> &topics) { return MakeSubUnsub(topics, kMsgTypeUnsubscribe); }
+
+BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
+{
+ assert(data && size);
+ BHMsg msg(InitMsg(kMsgTypePublish));
+ DataPub pub;
+ pub.set_topic(topic);
+ pub.set_data(data, size);
+ msg.set_body(pub.SerializeAsString());
+ return msg;
+}
+
+void *Pack(SharedMemory &shm, const BHMsg &msg)
+{
+ uint32_t msg_size = msg.ByteSizeLong();
+ void *p = shm.Alloc(4 + msg_size);
+ if(p) {
+ Put32(p, msg_size);
+ if (!msg.SerializeToArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size)) {
shm.Dealloc(p);
- return false;
+ p = 0;
}
}
- MsgMetaV1 meta;
- meta.data_size_ = size;
- meta.src_id_ = src_id;
- meta.Pack(p);
- memcpy(static_cast<char *>(p) + sizeof(meta), data, size);
- Msg(p, rc).swap(*this);
- return true;
-
+ return p;
}
-int Msg::Release(SharedMemory &shm)
+bool MsgI::Unpack(BHMsg &msg) const
+{
+ void *p = ptr_.get();
+ assert(p);
+ uint32_t msg_size = Get32(p);
+ return msg.ParseFromArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size);
+}
+
+// with ref count;
+bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
+{
+ void *p = Pack(shm, msg);
+ if(!p) {
+ return false;
+ }
+ RefCount *rc = shm.New<RefCount>();
+ if (!rc) {
+ shm.Dealloc(p);
+ return false;
+ }
+ MsgI(p, rc).swap(*this);
+ return true;
+}
+
+bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
+{
+ void *p = Pack(shm, msg);
+ if(!p) {
+ return false;
+ }
+ MsgI(p, 0).swap(*this);
+ return true;
+}
+
+int MsgI::Release(SharedMemory &shm)
{
if (IsCounted()) {
const int n = count_->Dec();
diff --git a/src/msg.h b/src/msg.h
index 99fb2e2..5cb0ce4 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -26,32 +26,12 @@
namespace bhome_msg {
using namespace bhome_shm;
- using namespace bhome::msg; // for serialized data in Msg
+ using namespace bhome::msg; // for serialized data in MsgI
-// msg is safe to be stored in shared memory, so POD data or offset_ptr is required.
+// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
-enum MsgType {
- kMsgTypeNull = 0,
- kMsgTypeNormal = 1,
- kMsgTypeMaxValue
-};
typedef boost::uuids::uuid MQId;
-
-const uint32_t kMsgMetaTag = 0xf1e2d3c4;
-
-struct MsgMetaV1 {
- uint16_t self_size_ = sizeof(MsgMetaV1); // sizeof(*this)
- uint16_t type_ = kMsgTypeNormal; // msg type.
- uint32_t tag_ = kMsgMetaTag;
- uint32_t data_size_ = 0;
- MQId src_id_;
- // more fields add at end, must not change
-
- MsgMetaV1():src_id_(boost::uuids::nil_uuid()){}
- bool Parse(const void *p);
- void Pack(void *p);
-};
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
@@ -65,13 +45,21 @@
int num_ = 1;
};
-class Msg {
+BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
+BHMsg MakeReply(const void *data, const size_t size);
+BHMsg MakeSub(const std::vector<std::string> &topics);
+BHMsg MakeUnsub(const std::vector<std::string> &topics);
+BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
+
+class MsgI {
private:
offset_ptr<void> ptr_;
offset_ptr<RefCount> count_;
+
+ bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
public:
- Msg(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
- void swap(Msg &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
+ MsgI(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
+ void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
// AddRef and Release works for both counted and not counted msg.
@@ -79,11 +67,15 @@
int Release(SharedMemory &shm);
int Count() const{ return IsCounted() ? count_->Get() : 1; }
+
bool IsCounted() const { return static_cast<bool>(count_); }
- bool Build(SharedMemory &shm, const MQId &src_id, const void *p, const size_t size, const bool refcount);
+
+ bool Make(SharedMemory &shm, const BHMsg &msg);
+ bool MakeRC(SharedMemory &shm, const BHMsg &msg);
+ bool Unpack(BHMsg &msg) const;
};
-inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
+inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
} // namespace bhome_msg
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index e38c445..ee2614a 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -44,7 +44,7 @@
while (this->run_) {
std::this_thread::sleep_for(100ms);
BusManager &self = *this;
- Msg msg;
+ BHMsg msg;
const int timeout_ms = 100;
if (!self.busq_.Recv(msg, timeout_ms)) {
continue;
@@ -59,12 +59,13 @@
for (int i = 0; i < n; ++i) {
workers_.emplace_back(Worker);
}
+ return true;
}
bool BusManager::Stop()
{
std::lock_guard<std::mutex> guard(mutex_);
- StopNoLock();
+ return StopNoLock();
}
bool BusManager::StopNoLock()
@@ -75,7 +76,9 @@
w.join();
}
}
+ return true;
}
+ return false;
}
} // namespace bhome_shm
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 8d90083..ffc7c21 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -15,6 +15,7 @@
*
* =====================================================================================
*/
+
#include "shm_queue.h"
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_generators.hpp>
@@ -47,10 +48,10 @@
ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len):
Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
id_(id)
-{
-}
+{}
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):ShmMsgQueue(NewId(), segment, len)
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):
+ShmMsgQueue(NewId(), segment, len)
{}
ShmMsgQueue::~ShmMsgQueue()
@@ -58,21 +59,16 @@
Remove();
}
-bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
+bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
Queue *remote = find(MsgQIdToName(remote_id));
- return remote && remote->Write(msg, timeout_ms, [](const Msg&msg){msg.AddRef();});
+ return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
}
-bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
+bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
{
- // Test shows that in the 2 cases:
- // 1) build msg first, then find remote queue;
- // 2) find remote queue first, then build msg;
- // 1 is about 50% faster than 2, maybe cache related.
-
- Msg msg;
- if(msg.Build(shm(), Id(), data, size, false)) {
+ MsgI msg;
+ if(msg.Make(shm(), data)) {
if(Send(remote_id, msg, timeout_ms)) {
return true;
} else {
@@ -82,30 +78,34 @@
return false;
}
-bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms)
+/*
+bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
{
- Msg msg;
- if (Read(msg, timeout_ms)) {
- DEFER1(msg.Release(shm()););
+ // Test shows that in the 2 cases:
+ // 1) build msg first, then find remote queue;
+ // 2) find remote queue first, then build msg;
+ // 1 is about 50% faster than 2, maybe cache related.
- auto ptr = msg.get<char>();
- if (ptr) {
- MsgMetaV1 meta;
- meta.Parse(ptr);
- source_id = meta.src_id_;
- size = meta.data_size_;
- data = malloc(size);
- if (data) {
- memcpy(data, ptr + meta.self_size_, size);
- return true;
- }
+ MsgI msg;
+ if(msg.BuildRequest(shm(), Id(), data, size)) {
+ if(Send(remote_id, msg, timeout_ms)) {
+ return true;
+ } else {
+ msg.Release(shm());
}
}
- source_id = EmptyId();
- data = 0;
- size = 0;
return false;
+}
+//*/
+bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
+{
+ MsgI imsg;
+ if (Read(imsg, timeout_ms)) {
+ DEFER1(imsg.Release(shm()););
+ return imsg.Unpack(msg);
+ } else {
+ return false;
+ }
}
} // namespace bhome_shm
-
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 5b30380..9d08016 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -102,12 +102,12 @@
using namespace bhome_msg;
-class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
+class ShmMsgQueue : private ShmObject<SharedQueue<MsgI> >
{
- typedef ShmObject<SharedQueue<Msg> > Super;
+ typedef ShmObject<SharedQueue<MsgI> > Super;
typedef Super::Data Queue;
- bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
- bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+ bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
+ bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
MQId id_;
protected:
ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
@@ -115,11 +115,11 @@
ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
ShmMsgQueue(ShmType &segment, const int len);
~ShmMsgQueue();
- bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms);
- bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms);
+ // bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); // request
+ bool Recv(BHMsg &msg, const int timeout_ms);
+ bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
const MQId &Id() const { return id_; }
- bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms);
- bool Recv(Msg &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
+ bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms);
};
} // namespace bhome_shm
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 23af0b8..6e41116 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -178,10 +178,8 @@
int ms = i * 100;
printf("Timeout Test %4d: ", ms);
boost::timer::auto_cpu_timer timer;
- MQId id;
- void *data;
- size_t size;
- bool r = q.Recv(id, data, size, ms);
+ BHMsg msg;
+ bool r = q.Recv(msg, ms);
BOOST_CHECK(!r);
}
}
@@ -192,10 +190,10 @@
ShmRemover auto_remove(shm_name);
SharedMemory shm(shm_name, 1024*1024);
- Msg m0(shm.Alloc(1000), shm.New<RefCount>());
+ MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
BOOST_CHECK(m0.IsCounted());
BOOST_CHECK_EQUAL(m0.Count(), 1);
- Msg m1 = m0;
+ MsgI m1 = m0;
BOOST_CHECK(m1.IsCounted());
BOOST_CHECK_EQUAL(m1.AddRef(), 2);
BOOST_CHECK_EQUAL(m0.AddRef(), 3);
@@ -205,25 +203,7 @@
BOOST_CHECK(!m1.IsCounted());
}
-BOOST_AUTO_TEST_CASE(MsgHeaderTest)
-{
- MsgMetaV1 head;
- BOOST_CHECK_EQUAL(head.self_size_, sizeof(head));
- BOOST_CHECK_EQUAL(head.type_, kMsgTypeNormal);
- BOOST_CHECK_EQUAL(head.tag_, kMsgMetaTag);
- BOOST_CHECK_EQUAL(head.data_size_, 0);
- BOOST_CHECK(head.src_id_ == boost::uuids::nil_uuid());
- head.data_size_ = 100;
- head.src_id_ = boost::uuids::random_generator()();
- head.type_ = 123;
-
- char buf[100] = {0};
- head.Pack(buf);
- MsgMetaV1 result;
- result.Parse(buf);
- BOOST_CHECK_EQUAL(memcmp(&head, &result, sizeof(head)), 0);
-}
BOOST_AUTO_TEST_CASE(SpeedTest)
{
@@ -238,10 +218,10 @@
SharedMemory shm(shm_name, mem_size);
ShmMsgQueue mq(shm, 64);
std::string str(data_size, 'a');
- Msg msg;
+ MsgI msg;
DEFER1(msg.Release(shm););
- msg.Build(shm, mq.Id(), str.data(), str.size(), true);
- for (int i = 0; i < n; ++i) {
+ msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
+ for (uint64_t i = 0; i < n; ++i) {
// mq.Send(id, str.data(), str.size(), timeout);
mq.Send(id, msg, timeout);
}
@@ -250,16 +230,9 @@
SharedMemory shm(shm_name, mem_size);
ShmMsgQueue mq(id, shm, 1000);
while(*run) {
- Msg msg;
+ BHMsg msg;
if (mq.Recv(msg, timeout)) {
- MsgMetaV1 header;
- if (!header.Parse(msg.get())) {
- BOOST_CHECK(false);
- }
- if (header.data_size_ != data_size) {
- BOOST_CHECK(false);
- }
- msg.Release(shm);
+ // ok
} else if (isfork) {
exit(0); // for forked quit after 1s.
}
@@ -332,8 +305,10 @@
ShmMsgQueue srv(shm, qlen);
ShmMsgQueue cli(shm, qlen);
- Msg ref_counted_msg;
- ref_counted_msg.Build(shm, cli.Id(), msg_content.data(), msg_content.size(), true);
+ MsgI request_rc;
+ request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
+ MsgI reply_rc;
+ reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
std::atomic<uint64_t> count(0);
@@ -342,29 +317,25 @@
auto Client = [&](int cli_id, int nmsg){
for (int i = 0; i < nmsg; ++i) {
- auto Send = [&]() { return cli.Send(srv.Id(), msg_content.data(), msg_content.size(), 1000); };
- auto SendRefCounted = [&]() { return cli.Send(srv.Id(), ref_counted_msg, 1000); };
+ auto Req = [&]() {
+ return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
+ };
+ auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
- if (!SendRefCounted()) {
+ if (!ReqRC()) {
printf("********** client send error.\n");
continue;
}
- MQId id;
- void *data = 0;
- size_t size = 0;
- if (!cli.Recv(id, data, size, 1000)) {
+ BHMsg msg;
+ if (!cli.Recv(msg, 1000)) {
printf("********** client recv error.\n");
} else {
- DEFER1(free(data));
- if(size != msg_length) {
- BOOST_CHECK(false);
- }
++count;
auto cur = Now();
if (last_time.exchange(cur) != cur) {
std::cout << "time: " << Now();
printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
- count.load(), count - last_count.exchange(count), init_avail - Avail(), ref_counted_msg.Count());
+ count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
last_time = cur;
}
@@ -374,19 +345,18 @@
std::atomic<bool> stop(false);
auto Server = [&](){
- void *data = 0;
- size_t size = 0;
- MQId src_id;
+ BHMsg req;
while (!stop) {
- if (srv.Recv(src_id, data, size, 100)) {
- DEFER1(free(data));
- auto Send = [&](){ return srv.Send(src_id, data, size, 100); };
- auto SendRefCounted = [&](){ return srv.Send(src_id, ref_counted_msg, 100); };
+ if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
+ auto &mqid = req.route()[0].mq_id();
+ MQId src_id;
+ memcpy(&src_id, mqid.data(), sizeof(src_id));
+ auto Reply = [&]() {
+ return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
+ };
+ auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
- if (SendRefCounted()) {
- if (size != msg_content.size()) {
- BOOST_TEST(false, "server msg size error");
- }
+ if (ReplyRC()) {
}
}
}
@@ -405,10 +375,10 @@
printf("request ok: %ld\n", count.load());
stop = true;
servers.WaitAll();
- BOOST_CHECK(ref_counted_msg.IsCounted());
- BOOST_CHECK_EQUAL(ref_counted_msg.Count(), 1);
- ref_counted_msg.Release(shm);
- BOOST_CHECK(!ref_counted_msg.IsCounted());
+ BOOST_CHECK(request_rc.IsCounted());
+ BOOST_CHECK_EQUAL(request_rc.Count(), 1);
+ request_rc.Release(shm);
+ BOOST_CHECK(!request_rc.IsCounted());
// BOOST_CHECK_THROW(reply.Count(), int);
}
--
Gitblit v1.8.0