From a76a94009d78a6a41654335dcb9202fb31659de0 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 25 三月 2021 13:23:16 +0800
Subject: [PATCH] bulid msg, refactor.
---
src/shm_queue.cpp | 46 ++++++++++++++++++++++++++--------------------
1 files changed, 26 insertions(+), 20 deletions(-)
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 263fd94..1446446 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -45,26 +45,32 @@
Remove();
}
+bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms)
+{
+ Queue *remote = find(MsgQIdToName(remote_id));
+
+ return remote && remote->Write(msg, timeout_ms);
+
+ if(!remote) {
+ return false;
+ }
+ msg.AddRef();
+ if (remote->Write(msg, timeout_ms)) {
+ return true;
+ } else {
+ msg.RemoveRef();
+ return false;
+ }
+}
+
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
{
- if (data && size) {
- Queue *remote = find(MsgQIdToName(remote_id));
- if (remote) {
- void *p = shm().allocate(sizeof(MsgMetaV1) + size, std::nothrow);
- bool r = false;
- if (p) {
- MsgMetaV1 meta;
- meta.data_size_ = size;
- memcpy(meta.src_id_, &Id(), sizeof(MQId));
- meta.Pack(p);
-
- memcpy(static_cast<char*>(p) + sizeof(meta), data, size);
- if (remote->Write(p, timeout_ms)) {
- return true;
- } else {
- shm().deallocate(p);
- }
- }
+ Msg msg;
+ if (msg.Build(shm(), Id(), data, size, false)) {
+ if (Send(remote_id, msg, timeout_ms)) {
+ return true;
+ } else {
+ msg.FreeFrom(shm());
}
}
return false;
@@ -76,10 +82,10 @@
if (Read(msg, timeout_ms)) {
auto ptr = msg.get<char>();
if (ptr) {
- DEFER1(shm().deallocate(ptr););
+ DEFER1(shm().Dealloc(ptr););
MsgMetaV1 meta;
meta.Parse(ptr);
- memcpy(&source_id, meta.src_id_, sizeof(MQId));
+ source_id = meta.src_id_;
size = meta.data_size_;
data = malloc(size);
if (data) {
--
Gitblit v1.8.0