From 708ff9e8af731e2799767ed8bfca7df3b74fc26a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 16 四月 2021 19:20:21 +0800
Subject: [PATCH] sendq use less shm, copy data.
---
src/socket.h | 25 +++----
utest/api_test.cpp | 19 +++++-
src/msg.h | 33 ++++++++++
src/sendq.cpp | 40 +++++++++----
src/sendq.h | 40 ++++++-------
5 files changed, 104 insertions(+), 53 deletions(-)
diff --git a/src/msg.h b/src/msg.h
index 10ad0d2..c239956 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -82,6 +82,15 @@
uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
}
+ void *Pack(SharedMemory &shm, const std::string &content)
+ {
+ void *addr = shm.Alloc(content.size());
+ if (addr) {
+ memcpy(addr, content.data(), content.size());
+ }
+ return addr;
+ }
+
bool MakeRC(SharedMemory &shm, void *addr);
bool Make(SharedMemory &shm, void *addr);
@@ -111,7 +120,6 @@
}
bool EnableRefCount(SharedMemory &shm);
-
template <class Body>
inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
{
@@ -119,6 +127,29 @@
auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
}
+ template <class Body>
+ static inline std::string Serialize(const BHMsgHead &head, const Body &body)
+ {
+ uint32_t head_len = head.ByteSizeLong();
+ uint32_t body_len = body.ByteSizeLong();
+ std::string s(4 + head_len + 4 + body_len, '\0');
+ size_t pos = 0;
+ auto add1 = [&](auto &&msg, auto &&size) {
+ Put32(&s[pos], size);
+ pos += 4;
+ msg.SerializeToArray(&s[pos], size);
+ pos += size;
+ };
+ add1(head, head_len);
+ add1(body, body_len);
+ assert(pos == s.size());
+ return s;
+ }
+ inline bool Make(SharedMemory &shm, const std::string &content)
+ {
+ void *p = Pack(shm, content);
+ return Make(shm, p);
+ }
bool ParseHead(BHMsgHead &head) const;
template <class Body>
diff --git a/src/sendq.cpp b/src/sendq.cpp
index ad293c3..4be24f1 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -19,11 +19,6 @@
#include "shm_queue.h"
#include <chrono>
-//TODO change to save head, body, instead of MsgI.
-// as MsgI which is in shm, but head, body are in current process.
-// Then if node crashes, shm will not be affected by msgs in sendq.
-// but pulishing ref-counted msg need some work.
-
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
{
auto FirstNotExpired = [](Array &l) {
@@ -35,22 +30,41 @@
for (auto it = arr.begin(); it != pos; ++it) {
auto &info = it->data();
if (info.on_expire_) {
- info.on_expire_(info.msg_);
+ info.on_expire_(info.data_);
}
- info.msg_.Release(mq.shm());
+ if (info.data_.index() == 0) {
+ boost::variant2::get<0>(info.data_).Release(mq.shm());
+ }
}
- int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end()));
- for (int i = 0; i < n; ++i) {
- auto &msg = pos->data().msg_;
- if (msg.IsCounted()) {
- msg.Release(mq.shm());
+ auto SendData = [&](Data &d) {
+ bool r = false;
+ if (d.index() == 0) {
+ auto &msg = boost::variant2::get<0>(pos->data().data_);
+ r = mq.TrySend(*(MQId *) remote.data(), msg);
+ if (r && msg.IsCounted()) {
+ msg.Release(mq.shm());
+ }
+ } else {
+ auto &content = boost::variant2::get<1>(pos->data().data_);
+ MsgI msg;
+ if (msg.Make(mq.shm(), content)) {
+ r = mq.TrySend(*(MQId *) remote.data(), msg);
+ if (!r || msg.IsCounted()) {
+ msg.Release(mq.shm());
+ }
+ }
}
+ return r;
+ };
+
+ while (pos != arr.end() && SendData(pos->data().data_)) {
++pos;
}
+ int nprocessed = std::distance(arr.begin(), pos);
arr.erase(arr.begin(), pos);
- return n;
+ return nprocessed;
}
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
diff --git a/src/sendq.h b/src/sendq.h
index b4f3821..bba44af 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -21,6 +21,7 @@
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
+#include <boost/variant2/variant.hpp>
#include <deque>
#include <functional>
#include <list>
@@ -38,36 +39,43 @@
public:
typedef std::string Remote;
typedef bhome_msg::MsgI MsgI;
- typedef std::function<void(const MsgI &msg)> OnMsgEvent;
+ typedef std::string Content;
+ typedef boost::variant2::variant<MsgI, Content> Data;
+ typedef std::function<void(const Data &)> OnMsgEvent;
struct MsgInfo {
- MsgI msg_;
+ Data data_;
OnMsgEvent on_expire_;
- // OnMsgEvent on_send_;
};
typedef TimedData<MsgInfo> TimedMsg;
typedef TimedMsg::TimePoint TimePoint;
typedef TimedMsg::Duration Duration;
- void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
+ template <class... Rest>
+ void Append(const MQId &id, Rest &&...rest)
{
- Append(std::string((const char *) &id, sizeof(id)), msg, onExpire);
+ Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
}
+
void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
{
- using namespace std::chrono_literals;
- Append(addr, msg, Now() + 60s, onExpire);
+ msg.AddRef();
+ AppendData(addr, Data(msg), DefaultExpire(), onExpire);
+ }
+ void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+ {
+ AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
}
bool TrySend(bhome_shm::ShmMsgQueue &mq);
// bool empty() const { return store_.empty(); }
private:
static TimePoint Now() { return TimedMsg::Clock::now(); }
- void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
+ static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
+ void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
{
//TODO simple queue, organize later ?
- msg.AddRef();
- TimedMsg tmp(expire, MsgInfo{msg, onExpire});
+ TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)});
std::unique_lock<std::mutex> lock(mutex_in_);
auto &al = in_[addr];
if (!al.empty()) {
@@ -82,18 +90,6 @@
int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
-
- class MsgIter
- {
- Array::iterator iter_;
-
- public:
- MsgIter(Array::iterator iter) :
- iter_(iter) {}
- MsgIter &operator++() { return ++iter_, *this; }
- bool operator==(const MsgIter &a) { return iter_ == a.iter_; }
- MsgI &operator*() { return iter_->data().msg_; }
- };
std::mutex mutex_in_;
std::mutex mutex_out_;
diff --git a/src/socket.h b/src/socket.h
index dbb161c..db64b36 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,11 +36,10 @@
class ShmSocket : private boost::noncopyable
{
- bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+ template <class... T>
+ bool SendImpl(const void *valid_remote, T &&...rest)
{
- // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
- send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
- // }
+ send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
return true;
}
@@ -69,24 +68,22 @@
template <class Body>
bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
{
- MsgI msg;
- if (msg.Make(shm(), head, body)) {
- DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
- std::string msg_id(head.msg_id());
+ try {
if (!cb) {
- return SendImpl(valid_remote, msg);
+ return SendImpl(valid_remote, MsgI::Serialize(head, body));
} else {
+ std::string msg_id(head.msg_id());
per_msg_cbs_->Add(msg_id, cb);
- auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
+ auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
RecvCB cb_no_use;
per_msg_cbs_->Find(msg_id, cb_no_use);
};
- return SendImpl(valid_remote, msg, onExpireRemoveCB);
+ return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
}
- } else {
- SetLastError(ENOMEM, "Out of mem");
+ } catch (...) {
+ SetLastError(eError, "Send internal error.");
+ return false;
}
- return false;
}
bool Send(const void *valid_remote, const MsgI &imsg)
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 58c73c6..da51044 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -138,6 +138,19 @@
}
}
+namespace
+{
+struct CCC {
+};
+void F(CCC &&c) {}
+
+template <class... T>
+void Pass(T &&...t)
+{
+ F(std::forward<decltype(t)>(t)...);
+}
+
+} // namespace
BOOST_AUTO_TEST_CASE(ApiTest)
{
auto max_time = std::chrono::steady_clock::time_point::max();
@@ -241,7 +254,7 @@
MsgStatus last;
while (*run) {
auto &st = Status();
- std::this_thread::sleep_for(1s);
+ Sleep(1s, false);
printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
st.nrequest_.load(), st.nrequest_ - last.nrequest_,
st.nfailed_.load(),
@@ -270,8 +283,8 @@
int same = 0;
int64_t last = 0;
- while (last < nreq * ncli && same < 3) {
- Sleep(1s);
+ while (last < nreq * ncli && same < 2) {
+ Sleep(1s, false);
auto cur = Status().nreply_.load();
if (last == cur) {
++same;
--
Gitblit v1.8.0