sendq use less shm, copy data.
| | |
| | | uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | } |
| | | |
| | | void *Pack(SharedMemory &shm, const std::string &content) |
| | | { |
| | | void *addr = shm.Alloc(content.size()); |
| | | if (addr) { |
| | | memcpy(addr, content.data(), content.size()); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | bool MakeRC(SharedMemory &shm, void *addr); |
| | | bool Make(SharedMemory &shm, void *addr); |
| | | |
| | |
| | | } |
| | | |
| | | bool EnableRefCount(SharedMemory &shm); |
| | | |
| | | template <class Body> |
| | | inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | |
| | | auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; }; |
| | | return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p); |
| | | } |
| | | template <class Body> |
| | | static inline std::string Serialize(const BHMsgHead &head, const Body &body) |
| | | { |
| | | uint32_t head_len = head.ByteSizeLong(); |
| | | uint32_t body_len = body.ByteSizeLong(); |
| | | std::string s(4 + head_len + 4 + body_len, '\0'); |
| | | size_t pos = 0; |
| | | auto add1 = [&](auto &&msg, auto &&size) { |
| | | Put32(&s[pos], size); |
| | | pos += 4; |
| | | msg.SerializeToArray(&s[pos], size); |
| | | pos += size; |
| | | }; |
| | | add1(head, head_len); |
| | | add1(body, body_len); |
| | | assert(pos == s.size()); |
| | | return s; |
| | | } |
| | | inline bool Make(SharedMemory &shm, const std::string &content) |
| | | { |
| | | void *p = Pack(shm, content); |
| | | return Make(shm, p); |
| | | } |
| | | |
| | | bool ParseHead(BHMsgHead &head) const; |
| | | template <class Body> |
| | |
| | | #include "shm_queue.h" |
| | | #include <chrono> |
| | | |
| | | //TODO change to save head, body, instead of MsgI. |
| | | // as MsgI which is in shm, but head, body are in current process. |
| | | // Then if node crashes, shm will not be affected by msgs in sendq. |
| | | // but pulishing ref-counted msg need some work. |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr) |
| | | { |
| | | auto FirstNotExpired = [](Array &l) { |
| | |
| | | for (auto it = arr.begin(); it != pos; ++it) { |
| | | auto &info = it->data(); |
| | | if (info.on_expire_) { |
| | | info.on_expire_(info.msg_); |
| | | info.on_expire_(info.data_); |
| | | } |
| | | info.msg_.Release(mq.shm()); |
| | | if (info.data_.index() == 0) { |
| | | boost::variant2::get<0>(info.data_).Release(mq.shm()); |
| | | } |
| | | } |
| | | |
| | | int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(arr.end())); |
| | | for (int i = 0; i < n; ++i) { |
| | | auto &msg = pos->data().msg_; |
| | | if (msg.IsCounted()) { |
| | | msg.Release(mq.shm()); |
| | | auto SendData = [&](Data &d) { |
| | | bool r = false; |
| | | if (d.index() == 0) { |
| | | auto &msg = boost::variant2::get<0>(pos->data().data_); |
| | | r = mq.TrySend(*(MQId *) remote.data(), msg); |
| | | if (r && msg.IsCounted()) { |
| | | msg.Release(mq.shm()); |
| | | } |
| | | } else { |
| | | auto &content = boost::variant2::get<1>(pos->data().data_); |
| | | MsgI msg; |
| | | if (msg.Make(mq.shm(), content)) { |
| | | r = mq.TrySend(*(MQId *) remote.data(), msg); |
| | | if (!r || msg.IsCounted()) { |
| | | msg.Release(mq.shm()); |
| | | } |
| | | } |
| | | } |
| | | return r; |
| | | }; |
| | | |
| | | while (pos != arr.end() && SendData(pos->data().data_)) { |
| | | ++pos; |
| | | } |
| | | |
| | | int nprocessed = std::distance(arr.begin(), pos); |
| | | arr.erase(arr.begin(), pos); |
| | | return n; |
| | | return nprocessed; |
| | | } |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al) |
| | |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "timed_queue.h" |
| | | #include <boost/variant2/variant.hpp> |
| | | #include <deque> |
| | | #include <functional> |
| | | #include <list> |
| | |
| | | public: |
| | | typedef std::string Remote; |
| | | typedef bhome_msg::MsgI MsgI; |
| | | typedef std::function<void(const MsgI &msg)> OnMsgEvent; |
| | | typedef std::string Content; |
| | | typedef boost::variant2::variant<MsgI, Content> Data; |
| | | typedef std::function<void(const Data &)> OnMsgEvent; |
| | | struct MsgInfo { |
| | | MsgI msg_; |
| | | Data data_; |
| | | OnMsgEvent on_expire_; |
| | | // OnMsgEvent on_send_; |
| | | }; |
| | | typedef TimedData<MsgInfo> TimedMsg; |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | template <class... Rest> |
| | | void Append(const MQId &id, Rest &&...rest) |
| | | { |
| | | Append(std::string((const char *) &id, sizeof(id)), msg, onExpire); |
| | | Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); |
| | | } |
| | | |
| | | void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | using namespace std::chrono_literals; |
| | | Append(addr, msg, Now() + 60s, onExpire); |
| | | msg.AddRef(); |
| | | AppendData(addr, Data(msg), DefaultExpire(), onExpire); |
| | | } |
| | | void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); |
| | | } |
| | | bool TrySend(bhome_shm::ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) |
| | | static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } |
| | | void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | //TODO simple queue, organize later ? |
| | | |
| | | msg.AddRef(); |
| | | TimedMsg tmp(expire, MsgInfo{msg, onExpire}); |
| | | TimedMsg tmp(expire, MsgInfo{std::move(data), std::move(onExpire)}); |
| | | std::unique_lock<std::mutex> lock(mutex_in_); |
| | | auto &al = in_[addr]; |
| | | if (!al.empty()) { |
| | |
| | | |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr); |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr); |
| | | |
| | | class MsgIter |
| | | { |
| | | Array::iterator iter_; |
| | | |
| | | public: |
| | | MsgIter(Array::iterator iter) : |
| | | iter_(iter) {} |
| | | MsgIter &operator++() { return ++iter_, *this; } |
| | | bool operator==(const MsgIter &a) { return iter_ == a.iter_; } |
| | | MsgI &operator*() { return iter_->data().msg_; } |
| | | }; |
| | | |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |
| | |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | | bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) |
| | | template <class... T> |
| | | bool SendImpl(const void *valid_remote, T &&...rest) |
| | | { |
| | | // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) { |
| | | send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); |
| | | // } |
| | | send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | } |
| | | |
| | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB()) |
| | | { |
| | | MsgI msg; |
| | | if (msg.Make(shm(), head, body)) { |
| | | DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); |
| | | std::string msg_id(head.msg_id()); |
| | | try { |
| | | if (!cb) { |
| | | return SendImpl(valid_remote, msg); |
| | | return SendImpl(valid_remote, MsgI::Serialize(head, body)); |
| | | } else { |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Add(msg_id, cb); |
| | | auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Find(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, msg, onExpireRemoveCB); |
| | | return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB); |
| | | } |
| | | } else { |
| | | SetLastError(ENOMEM, "Out of mem"); |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool Send(const void *valid_remote, const MsgI &imsg) |
| | |
| | | } |
| | | } |
| | | |
| | | namespace |
| | | { |
| | | struct CCC { |
| | | }; |
| | | void F(CCC &&c) {} |
| | | |
| | | template <class... T> |
| | | void Pass(T &&...t) |
| | | { |
| | | F(std::forward<decltype(t)>(t)...); |
| | | } |
| | | |
| | | } // namespace |
| | | BOOST_AUTO_TEST_CASE(ApiTest) |
| | | { |
| | | auto max_time = std::chrono::steady_clock::time_point::max(); |
| | |
| | | MsgStatus last; |
| | | while (*run) { |
| | | auto &st = Status(); |
| | | std::this_thread::sleep_for(1s); |
| | | Sleep(1s, false); |
| | | printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n", |
| | | st.nrequest_.load(), st.nrequest_ - last.nrequest_, |
| | | st.nfailed_.load(), |
| | |
| | | |
| | | int same = 0; |
| | | int64_t last = 0; |
| | | while (last < nreq * ncli && same < 3) { |
| | | Sleep(1s); |
| | | while (last < nreq * ncli && same < 2) { |
| | | Sleep(1s, false); |
| | | auto cur = Status().nreply_.load(); |
| | | if (last == cur) { |
| | | ++same; |