use 2 buf to speed up sendq; socket auto start.
| | |
| | | "program": "${workspaceFolder}/debug/bin/utest", |
| | | "args": [ |
| | | "-t", |
| | | "ApiTest" |
| | | "SRTest" |
| | | ], |
| | | "stopAtEntry": false, |
| | | "cwd": "${workspaceFolder}", |
| | |
| | | |
| | | bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) |
| | | { |
| | | if (out_.empty()) { |
| | | std::unique_lock<std::mutex> lock(mutex_); |
| | | out_.swap(in_); |
| | | } |
| | | |
| | | auto FirstNotExpired = [](MsgList &l) { |
| | | auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; |
| | | return std::lower_bound(l.begin(), l.end(), Now(), Less); |
| | |
| | | info.msg_.Release(mq.shm()); |
| | | } |
| | | |
| | | //TODO maybe use TrySendAll ? |
| | | while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) { |
| | | int n = mq.TrySendAll(*(MQId *) remote.data(), MsgIter(pos), MsgIter(msg_list.end())); |
| | | for (int i = 0; i < n; ++i) { |
| | | auto &msg = pos->data().msg_; |
| | | if (msg.IsCounted()) { |
| | | msg.Release(mq.shm()); |
| | | } |
| | | ++pos; |
| | | } |
| | | |
| | | msg_list.erase(msg_list.begin(), pos); |
| | | }; |
| | | |
| | | if (!store_.empty()) { |
| | | auto rec = store_.begin(); |
| | | if (!out_.empty()) { |
| | | auto rec = out_.begin(); |
| | | do { |
| | | SendOneRemote(rec->first, rec->second); |
| | | if (rec->second.empty()) { |
| | | rec = store_.erase(rec); |
| | | rec = out_.erase(rec); |
| | | } else { |
| | | ++rec; |
| | | } |
| | | } while (rec != store_.end()); |
| | | } while (rec != out_.end()); |
| | | } |
| | | return !store_.empty(); |
| | | return !out_.empty(); |
| | | } |
| | |
| | | #include "timed_queue.h" |
| | | #include <deque> |
| | | #include <functional> |
| | | #include <mutex> |
| | | #include <string> |
| | | #include <unordered_map> |
| | | |
| | |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | //TODO simple queue, organize later ? |
| | | |
| | | msg.AddRef(); |
| | | store_[addr].emplace_back(TimedMsg(expire, MsgInfo{msg, onExpire})); |
| | | TimedMsg tmp(expire, MsgInfo{msg, onExpire}); |
| | | std::unique_lock<std::mutex> lock(mutex_); |
| | | in_[addr].emplace_back(std::move(tmp)); |
| | | } |
| | | typedef std::deque<TimedMsg> MsgList; |
| | | typedef std::unordered_map<Remote, MsgList> Store; |
| | | class MsgIter |
| | | { |
| | | MsgList::iterator iter_; |
| | | |
| | | Store store_; |
| | | public: |
| | | MsgIter(MsgList::iterator iter) : |
| | | iter_(iter) {} |
| | | MsgIter &operator++() { return ++iter_, *this; } |
| | | bool operator==(const MsgIter &a) { return iter_ == a.iter_; } |
| | | MsgI &operator*() { return iter_->data().msg_; } |
| | | }; |
| | | std::mutex mutex_; |
| | | Store in_; |
| | | Store out_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SENDQ_IWKMSK7M |
| | |
| | | return Super::Remove(shm, MsgQIdToName(id)); |
| | | } |
| | | |
| | | ShmMsgQueue::Queue *ShmMsgQueue::FindRemote(SharedMemory &shm, const MQId &remote_id) |
| | | { |
| | | return Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | Queue *remote = FindRemote(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | Queue *remote = FindRemote(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | |
| | | bool TryRecv(MsgI &msg) { return data()->TryRead(msg); } |
| | | template <class OnData> |
| | | int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); } |
| | | |
| | | static Queue *FindRemote(SharedMemory &shm, const MQId &remote_id); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend()); |
| | | static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | template <class Iter> |
| | | static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) |
| | | { |
| | | Queue *remote = FindRemote(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | template <class... Rest> |
| | | bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } |
| | | |
| | | size_t Pending() const { return data()->size(); } |
| | | }; |
| | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : |
| | | run_(false), mq_(id, shm, len) |
| | | { |
| | | Start(); |
| | | } |
| | | ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : |
| | | run_(false), mq_(shm, len) {} |
| | | run_(false), mq_(shm, len) |
| | | { |
| | | Start(); |
| | | } |
| | | |
| | | ShmSocket::~ShmSocket() |
| | | { |
| | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_->TrySend(mq()); }; |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; |
| | | auto DoRecv = [=] { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | |
| | | { |
| | | bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) |
| | | { |
| | | send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); |
| | | // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) { |
| | | send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); |
| | | // } |
| | | return true; |
| | | } |
| | | |
| | |
| | | }; |
| | | |
| | | Synced<AsyncCBs> per_msg_cbs_; |
| | | Synced<SendQ> send_buffer_; |
| | | SendQ send_buffer_; |
| | | // Synced<SendQ> send_buffer_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
| | |
| | | } |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (!cli.SyncRecv(msg, head, 100)) { |
| | | if (!cli.SyncRecv(msg, head, 1000)) { |
| | | printf("********** client recv error.\n"); |
| | | } else { |
| | | DEFER1(msg.Release(shm)); |