From aa1542b6d6a4680088ac715c4ce40f97ada554fb Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 14 四月 2021 17:52:31 +0800 Subject: [PATCH] add SendQ TrySend() TryRecv(); handle re-register. --- src/socket.h | 66 +++-- box/center.cpp | 65 +++-- src/socket.cpp | 69 +++-- utest/utest.cpp | 5 src/sendq.cpp | 62 +++++ src/shm_queue.cpp | 25 +- src/topic_node.cpp | 150 ++++++----- utest/speed_test.cpp | 8 src/shm_queue.h | 138 +++++++--- src/topic_node.h | 4 src/bh_api.h | 3 src/failed_msg.cpp | 2 src/sendq.h | 74 ++++++ src/timed_queue.h | 5 src/bh_api.cpp | 5 15 files changed, 446 insertions(+), 235 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index f9044d4..0dd4ed4 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -18,7 +18,6 @@ #include "center.h" #include "bh_util.h" #include "defs.h" -#include "failed_msg.h" #include "shm.h" #include <chrono> #include <set> @@ -52,7 +51,7 @@ }; struct ProcState { - int64_t timestamp_; + int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { @@ -111,15 +110,32 @@ } try { - Node node(new NodeInfo); - node->addrs_.insert(SrcAddr(head)); - for (auto &addr : msg.addrs()) { - node->addrs_.insert(addr.mq_id()); + auto UpdateRegInfo = [&](Node &node) { + node->addrs_.insert(SrcAddr(head)); + for (auto &addr : msg.addrs()) { + node->addrs_.insert(addr.mq_id()); + } + node->proc_.Swap(msg.mutable_proc()); + node->state_.timestamp_ = head.timestamp(); + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + }; + + auto pos = nodes_.find(head.proc_id()); + if (pos == nodes_.end()) { // new client + Node node(new NodeInfo); + UpdateRegInfo(node); + nodes_[node->proc_.proc_id()] = node; + } else { + Node &node = pos->second; + if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { + // node restarted, release old mq. + for (auto &addr : node->addrs_) { + cleaner_(addr); + } + node->addrs_.clear(); + } + UpdateRegInfo(node); } - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - nodes_[node->proc_.proc_id()] = node; return MakeReply(eSuccess); } catch (...) { return MakeReply(eError, "register node error."); @@ -134,7 +150,7 @@ if (pos == nodes_.end()) { return MakeReply<Reply>(eNotRegistered, "Node is not registered."); } else { - auto node = pos->second; + auto &node = pos->second; if (!MatchAddr(node->addrs_, SrcAddr(head))) { return MakeReply<Reply>(eAddressNotMatch, "Node address error."); } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { @@ -342,8 +358,7 @@ auto node = weak.lock(); return node && Valid(*node); } - void CheckAllNodes(); //TODO, call it in timer. - std::string id_; // center proc id; + std::string id_; // center proc id; std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; @@ -385,30 +400,25 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); - auto center_failed_q = std::make_shared<FailedMsgQ>(); - auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { + auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); MsgI msg; if (msg.Make(socket.shm(), reply_head, rep_body)) { auto &remote = head.route(0).mq_id(); - bool r = socket.Send(remote.data(), msg, timeout_ms); - if (!r) { - failq.Push(remote, msg, 60s); // for later retry. - } + bool r = socket.Send(remote.data(), msg); } }; }; - auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { + auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; - center_failed_q->TrySend(socket); center->OnTimer(); }; auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); + auto replyer = MakeReplyer(socket, head, center->id()); switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); @@ -419,11 +429,10 @@ } }; - auto bus_failed_q = std::make_shared<FailedMsgQ>(); - auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; + auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); + auto replyer = MakeReplyer(socket, head, center->id()); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; @@ -442,9 +451,9 @@ auto &cli = *it; auto node = cli.weak_node_.lock(); if (node) { - if (!socket.Send(cli.mq_.data(), msg, 0)) { - bus_failed_q->Push(cli.mq_, msg, 60s); - } + // should also make sure that mq is not killed before msg expires. + // it would be ok if (kill_time - offline_time) is longer than expire time. + socket.Send(cli.mq_.data(), msg); ++it; } else { it = clients.erase(it); diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 39e38d3..78b8a59 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -186,15 +186,14 @@ bool BHSendReply(void *src, const void *reply, - const int reply_len, - const int timeout_ms) + const int reply_len) { MsgRequestTopicReply rep; if (!rep.ParseFromArray(reply, reply_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } - return ProcNode().ServerSendReply(src, rep, timeout_ms); + return ProcNode().ServerSendReply(src, rep); } int BHCleanUp() diff --git a/src/bh_api.h b/src/bh_api.h index 1b351ca..1023ba4 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -64,8 +64,7 @@ bool BHSendReply(BHSrcInfo *src, const void *reply, - const int reply_len, - const int timeout_ms); + const int reply_len); // int BHCleanUp(); diff --git a/src/failed_msg.cpp b/src/failed_msg.cpp index 0b4c443..f128499 100644 --- a/src/failed_msg.cpp +++ b/src/failed_msg.cpp @@ -23,7 +23,7 @@ return [remote, msg](void *valid_sock) { assert(valid_sock); ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); - bool r = sock.Send(remote.data(), msg, 0); + bool r = sock.Send(remote.data(), msg); //TODO check remote removed. if (r && msg.IsCounted()) { auto tmp = msg; // Release() is not const, but it's safe to release. diff --git a/src/sendq.cpp b/src/sendq.cpp new file mode 100644 index 0000000..290e9d1 --- /dev/null +++ b/src/sendq.cpp @@ -0,0 +1,62 @@ +/* + * ===================================================================================== + * + * Filename: sendq.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�14鏃� 09鏃�22鍒�50绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "sendq.h" +#include "shm_queue.h" +#include <chrono> + +bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) +{ + auto FirstNotExpired = [](MsgList &l) { + auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; + return std::lower_bound(l.begin(), l.end(), Now(), Less); + }; + + auto SendOneRemote = [&](const Remote &remote, MsgList &msg_list) { + auto pos = FirstNotExpired(msg_list); + for (auto it = msg_list.begin(); it != pos; ++it) { + auto &info = it->data(); + if (info.on_expire_) { + info.on_expire_(info.msg_); + } + info.msg_.Release(mq.shm()); + } + + //TODO maybe use TrySendAll ? + while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) { + auto &msg = pos->data().msg_; + if (msg.IsCounted()) { + msg.Release(mq.shm()); + } + ++pos; + } + msg_list.erase(msg_list.begin(), pos); + }; + + if (!store_.empty()) { + auto rec = store_.begin(); + do { + SendOneRemote(rec->first, rec->second); + if (rec->second.empty()) { + rec = store_.erase(rec); + } else { + ++rec; + } + } while (rec != store_.end()); + } + return !store_.empty(); +} \ No newline at end of file diff --git a/src/sendq.h b/src/sendq.h new file mode 100644 index 0000000..c6f270b --- /dev/null +++ b/src/sendq.h @@ -0,0 +1,74 @@ +/* + * ===================================================================================== + * + * Filename: sendq.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�14鏃� 09鏃�22鍒�59绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef SENDQ_IWKMSK7M +#define SENDQ_IWKMSK7M + +#include "defs.h" +#include "msg.h" +#include "timed_queue.h" +#include <deque> +#include <functional> +#include <string> +#include <unordered_map> + +namespace bhome_shm +{ +class ShmMsgQueue; +} // namespace bhome_shm + +class SendQ +{ +public: + typedef std::string Remote; + typedef bhome_msg::MsgI MsgI; + typedef std::function<void(const MsgI &msg)> OnMsgEvent; + struct MsgInfo { + MsgI msg_; + OnMsgEvent on_expire_; + // OnMsgEvent on_send_; + }; + typedef TimedData<MsgInfo> TimedMsg; + typedef TimedMsg::TimePoint TimePoint; + typedef TimedMsg::Duration Duration; + + void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + { + Append(std::string((const char *) &id, sizeof(id)), msg, onExpire); + } + void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + { + using namespace std::chrono_literals; + Append(addr, msg, Now() + 60s, onExpire); + } + bool TrySend(bhome_shm::ShmMsgQueue &mq); + // bool empty() const { return store_.empty(); } + +private: + static TimePoint Now() { return TimedMsg::Clock::now(); } + void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) + { + msg.AddRef(); + store_[addr].emplace_back(TimedMsg(expire, MsgInfo{msg, onExpire})); + } + typedef std::deque<TimedMsg> MsgList; + typedef std::unordered_map<Remote, MsgList> Store; + + Store store_; +}; + +#endif // end of include guard: SENDQ_IWKMSK7M diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index 8e4e56e..2006496 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -73,17 +73,26 @@ { Queue *remote = Find(shm, MsgQIdToName(remote_id)); if (remote) { - return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + if (onsend) { + return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + } else { + return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); + } } else { // SetLestError(eNotFound); return false; } } -bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) + +bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend) { Queue *remote = Find(shm, MsgQIdToName(remote_id)); if (remote) { - return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); + if (onsend) { + return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); + } else { + return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); + } } else { // SetLestError(eNotFound); return false; @@ -94,15 +103,5 @@ // 1) build msg first, then find remote queue; // 2) find remote queue first, then build msg; // 1 is about 50% faster than 2, maybe cache related. - -// bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms) -// { -// if (Read(imsg, timeout_ms)) { -// // DEFER1(imsg.Release(shm());); -// return imsg.ParseHead(head); -// } else { -// return false; -// } -// } } // namespace bhome_shm diff --git a/src/shm_queue.h b/src/shm_queue.h index 20ff3dc..2d5ec3a 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -48,6 +48,63 @@ return cur + millisec(ms); } + auto TimedReadPred(const int timeout_ms) + { + auto endtime = MSFromNow(timeout_ms); + return [this, endtime](Guard &lock) { + return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })); + }; + } + auto TryReadPred() + { + return [this](Guard &lock) { return !this->empty(); }; + } + + template <class Pred, class OnData> + int ReadAllOnCond(Pred const &pred, OnData const &onData) + { + Guard lock(this->mutex()); + int n = 0; + while (pred(lock)) { + ++n; + onData(this->front()); + this->pop_front(); + this->cond_write_.notify_one(); + } + return n; + } + + template <class Pred> + bool ReadOnCond(D &buf, Pred const &pred) + { + int flag = 0; + auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); }; + auto onData = [&buf](D &d) { + using std::swap; + swap(buf, d); + }; + return ReadAllOnCond(only_once, onData); + } + + template <class Iter, class Pred, class OnWrite> + int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite) + { + if (begin == end) { return 0; } + + int n = 0; + Guard lock(mutex()); + while (pred(lock)) { + onWrite(*begin); + this->push_back(*begin); + ++n; + cond_read_.notify_one(); + if (++begin == end) { + break; + } + } + return n; + } + public: SharedQueue(const uint32_t len, Allocator<D> const &alloc) : Super(len, alloc) {} @@ -56,60 +113,42 @@ template <class Iter, class OnWrite> int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) { - int n = 0; - if (begin != end) { - auto endtime = MSFromNow(timeout_ms); - Guard lock(mutex()); - while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) { - onWrite(*begin); - this->push_back(*begin); - ++n; - cond_read_.notify_one(); - if (++begin == end) { - break; - } - } - } - return n; + auto endtime = MSFromNow(timeout_ms); + auto timedWritePred = [this, endtime](Guard &lock) { + return (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })); + }; + return WriteAllOnCond(begin, end, timedWritePred, onWrite); } template <class OnWrite> - bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) - { - return Write(&buf, (&buf) + 1, timeout_ms, onWrite); - } + bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); } bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf) {}); } - template <class OnData> - bool Read(const int timeout_ms, OnData onData) + template <class Iter, class OnWrite> + int TryWrite(Iter begin, Iter end, const OnWrite &onWrite) { - int n = 0; - auto endtime = MSFromNow(timeout_ms); - Guard lock(mutex()); - while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) { - const bool more = onData(this->front()); - this->pop_front(); - cond_write_.notify_one(); - ++n; - if (!more) { - break; - } - } - return n; + auto tryWritePred = [this](Guard &lock) { return !this->full(); }; + return WriteAllOnCond(begin, end, tryWritePred, onWrite); } - bool Read(D &buf, const int timeout_ms) + template <class OnWrite> + bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); } + + bool TryWrite(const D &buf) { - auto read1 = [&](D &d) { - using std::swap; - swap(buf, d); - return false; - }; - return Read(timeout_ms, read1) == 1; + return TryWrite(buf, [](const D &buf) {}); } + + template <class OnData> + int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); } + template <class OnData> + int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); } + + bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); } + bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); } }; using namespace bhome_msg; @@ -119,8 +158,6 @@ typedef ShmObject<SharedQueue<MsgI>> Super; typedef Super::Data Queue; typedef std::function<void()> OnSend; - bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } - bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } MQId id_; protected: @@ -131,14 +168,21 @@ ~ShmMsgQueue(); static bool Remove(SharedMemory &shm, const MQId &id); const MQId &Id() const { return id_; } + using Super::shm; - // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms); - bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } - static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); - static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); + bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); } + bool TryRecv(MsgI &msg) { return data()->TryRead(msg); } + template <class OnData> + int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); } + + static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend()); + static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); template <class... Rest> bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } + template <class... Rest> + bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } + size_t Pending() const { return data()->size(); } }; diff --git a/src/socket.cpp b/src/socket.cpp index 2c55665..b7ef4f3 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -30,11 +30,11 @@ } // namespace ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : - shm_(shm), run_(false), mq_(id, shm, len) + run_(false), mq_(id, shm, len) { } ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : - shm_(shm), run_(false), mq_(shm, len) {} + run_(false), mq_(shm, len) {} ShmSocket::~ShmSocket() { @@ -43,28 +43,38 @@ bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { - auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { - RecvCB cb; - if (per_msg_cbs_->Find(head.msg_id(), cb)) { - cb(socket, imsg, head); - } else if (onData) { - onData(socket, imsg, head); - } else { // else ignored, or dropped - } - }; + auto ioProc = [this, onData, onIdle]() { + auto DoSend = [this]() { return send_buffer_->TrySend(mq()); }; + auto DoRecv = [=] { + auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + RecvCB cb; + if (per_msg_cbs_->Find(head.msg_id(), cb)) { + cb(socket, imsg, head); + } else if (onData) { + onData(socket, imsg, head); + } + }; - auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { - try { - MsgI imsg; - if (mq().Recv(imsg, 10)) { + // do not recv if no cb is set. + if (!onData && per_msg_cbs_->empty()) { + return false; + } + auto onMsg = [&](MsgI &imsg) { DEFER1(imsg.Release(shm())); BHMsgHead head; if (imsg.ParseHead(head)) { onRecvWithPerMsgCB(*this, imsg, head); } - } - if (onIdle) { - onIdle(*this); + }; + return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs. + }; + + try { + bool more_to_send = DoSend(); + bool more_to_recv = DoRecv(); + if (onIdle) { onIdle(*this); } + if (!more_to_send && !more_to_recv) { + std::this_thread::yield(); } } catch (...) { } @@ -75,7 +85,7 @@ run_.store(true); for (int i = 0; i < nworker; ++i) { - workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } }); + workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); } return true; } @@ -100,18 +110,17 @@ return false; } +//maybe reimplment, using async cbs? bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) { - std::lock_guard<std::mutex> lock(mutex_); - auto Recv = [&]() { - if (mq().Recv(msg, timeout_ms)) { - if (msg.ParseHead(head)) { - return true; - } else { - msg.Release(shm()); - } + // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. + bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); + if (got) { + if (msg.ParseHead(head)) { + return true; + } else { + msg.Release(shm()); } - return false; - }; - return !RunningNoLock() && Recv(); + } + return false; } diff --git a/src/socket.h b/src/socket.h index 5973ab6..9b47e42 100644 --- a/src/socket.h +++ b/src/socket.h @@ -21,6 +21,7 @@ #include "bh_util.h" #include "defs.h" +#include "sendq.h" #include "shm_queue.h" #include <atomic> #include <boost/noncopyable.hpp> @@ -35,13 +36,10 @@ class ShmSocket : private boost::noncopyable { - template <class DoSend> - inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) + bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) { - bool r = false; - DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); - r = doSend(msg); - return r; + send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); + return true; } protected: @@ -58,7 +56,6 @@ ~ShmSocket(); static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); } const MQId &id() const { return mq().Id(); } - Shm &shm() { return shm_; } // start recv. bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } @@ -66,29 +63,36 @@ bool Stop(); size_t Pending() const { return mq().Pending(); } - bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) + template <class Body> + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) { - assert(valid_remote); - return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg); } - //TODO reimplment, using async. + + template <class Body> + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb) + { + //TODO send_buffer_ need flag, and remove callback on expire. + MsgI msg; + if (msg.Make(shm(), head, body)) { + std::string msg_id(head.msg_id()); + per_msg_cbs_->Add(msg_id, cb); + auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Find(msg_id, cb_no_use); + }; + return SendImpl(valid_remote, msg, onExpireRemoveCB); + } + return false; + } + + bool Send(const void *valid_remote, const MsgI &imsg) + { + return SendImpl(valid_remote, imsg); + } + bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); - - template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) - { - auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); }; - MsgI msg; - return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); - } - - template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) - { - auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; - MsgI msg; - return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); - } template <class Body> bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) @@ -114,7 +118,7 @@ }; std::unique_lock<std::mutex> lk(st->mutex); - bool sendok = Send(remote, head, body, timeout_ms, OnRecv); + bool sendok = Send(remote, head, body, OnRecv); if (!sendok) { printf("send timeout\n"); } @@ -129,8 +133,9 @@ } } + Shm &shm() const { return mq().shm(); } + protected: - const Shm &shm() const { return shm_; } Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid. const Queue &mq() const { return mq_; } std::mutex &mutex() { return mutex_; } @@ -139,7 +144,6 @@ bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } - Shm &shm_; std::vector<std::thread> workers_; std::mutex mutex_; std::atomic<bool> run_; @@ -150,6 +154,7 @@ std::unordered_map<std::string, RecvCB> store_; public: + bool empty() const { return store_.empty(); } bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } bool Find(const std::string &id, RecvCB &cb) { @@ -165,6 +170,7 @@ }; Synced<AsyncCBs> per_msg_cbs_; + Synced<SendQ> send_buffer_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/src/timed_queue.h b/src/timed_queue.h index b2a941b..af77b11 100644 --- a/src/timed_queue.h +++ b/src/timed_queue.h @@ -1,7 +1,7 @@ /* * ===================================================================================== * - * Filename: failed_msg.h + * Filename: timed_queue.h * * Description: * @@ -35,7 +35,8 @@ expire_(expire), data_(data) {} TimedData(const TimePoint &expire, Data &&data) : expire_(expire), data_(std::move(data)) {} - bool Expired() { return Clock::now() > expire_; } + bool Expired() const { return Clock::now() > expire_; } + const TimePoint &expire() const { return expire_; } Data &data() { return data_; } Data const &data() const { return data_; } diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 8f039de..4ce2c97 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -17,7 +17,6 @@ */ #include "topic_node.h" #include "bh_util.h" -#include "failed_msg.h" #include <chrono> #include <list> @@ -33,9 +32,8 @@ std::string msg_id; }; -typedef FailedMsgQ ServerFailedQ; - } // namespace + TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { @@ -76,15 +74,20 @@ auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); - BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r && IsSuccess(reply_body.errmsg().errcode())) { - info_ = body; + if (timeout_ms == 0) { + return sock.Send(&BHTopicCenterAddress(), head, body); + } else { + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); + if (r && IsSuccess(reply_body.errmsg().errcode())) { + info_ = body; + return true; + } + return false; } - return r; } bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) @@ -96,22 +99,23 @@ auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); - BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r && IsSuccess(reply_body.errmsg().errcode())) { - // TODO update proc info + if (timeout_ms == 0) { + return sock.Send(&BHTopicCenterAddress(), head, body); + } else { + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); + return (r && IsSuccess(reply_body.errmsg().errcode())); } - return r; } bool TopicNode::Heartbeat(const int timeout_ms) { ProcInfo proc; proc.set_proc_id(proc_id()); MsgCommonReply reply_body; - return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode()); + return Heartbeat(proc, reply_body, timeout_ms); } bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) @@ -124,50 +128,43 @@ auto head(InitMsgHead(GetType(body), proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); - BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply; - r = r && reply.ParseBody(reply_body); - return r; + if (timeout_ms == 0) { + return sock.Send(&BHTopicCenterAddress(), head, body); + } else { + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply; + r = r && reply.ParseBody(reply_body); + return r; + } } bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) { - auto failed_q = std::make_shared<ServerFailedQ>(); + auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } - auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; + MsgRequestTopicReply reply_body; + if (rcb(head.proc_id(), req, reply_body)) { + BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); - auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { - MsgRequestTopic req; - if (imsg.ParseBody(req)) { - MsgRequestTopicReply reply_body; - if (rcb(head.proc_id(), req, reply_body)) { - BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); - - for (int i = 0; i < head.route_size() - 1; ++i) { - reply_head.add_route()->Swap(head.mutable_route(i)); - } - MsgI msg; - if (msg.Make(sock.shm(), reply_head, reply_body)) { - auto &remote = head.route().rbegin()->mq_id(); - if (!sock.Send(remote.data(), msg, 10)) { - failed_q->Push(remote, msg, 10s); - } - } - } + for (int i = 0; i < head.route_size() - 1; ++i) { + reply_head.add_route()->Swap(head.mutable_route(i)); } - } else { - // ignored, or dropped + MsgI msg; + if (msg.Make(sock.shm(), reply_head, reply_body)) { + auto &remote = head.route().rbegin()->mq_id(); + sock.Send(remote.data(), msg); + } } - - onIdle(sock); }; auto &sock = SockServer(); - return rcb && sock.Start(onRecv, onIdle, nworker); + return rcb && sock.Start(onRecv, nworker); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -189,7 +186,7 @@ return false; } -bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms) +bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) { auto &sock = SockServer(); @@ -202,7 +199,7 @@ for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } - return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); + return sock.Send(p->route.back().mq_id().data(), head, body); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -222,7 +219,7 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb) +bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) { auto Call = [&](const void *remote) { auto &sock = SockRequest(); @@ -239,15 +236,15 @@ } } }; - return sock.Send(remote, head, req, timeout_ms, onRecv); + return sock.Send(remote, head, req, onRecv); } else { - return sock.Send(remote, head, req, timeout_ms); + return sock.Send(remote, head, req); } }; try { BHAddress addr; - if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) { + if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { return Call(addr.mq_id().data()); } else { SetLastError(eNotFound, "remote not found."); @@ -333,14 +330,18 @@ BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm());); - BHMsgHead reply_head; - MsgCommonReply reply_body; - return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && - reply_head.type() == kMsgTypeCommonReply && - reply.ParseBody(reply_body) && - IsSuccess(reply_body.errmsg().errcode()); + if (timeout_ms == 0) { + return sock.Send(&BHTopicBusAddress(), head, pub); + } else { + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } } catch (...) { } return false; @@ -357,8 +358,19 @@ BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); AddRoute(head, sock.id()); - - return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); + if (timeout_ms == 0) { + return sock.Send(&BHTopicBusAddress(), head, sub); + } else { + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } + // TODO wait for result? } catch (...) { return false; } diff --git a/src/topic_node.h b/src/topic_node.h index d671026..0627930 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -48,12 +48,12 @@ bool ServerStart(ServerCB const &cb, const int nworker = 2); bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); - bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms); + bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); // topic client typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB()); bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 7f77b02..0c40f14 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -161,9 +161,9 @@ req_body.set_topic("topic"); req_body.set_data(msg_content); auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); - return cli.Send(&srv.id(), req_head, req_body, 100); + return cli.Send(&srv.id(), req_head, req_body); }; - auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); }; + auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); }; if (!ReqRC()) { printf("********** client send error.\n"); @@ -204,9 +204,9 @@ reply_body.set_topic("topic"); reply_body.set_data(msg_content); auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); - return srv.Send(&src_id, reply_head, reply_body, 100); + return srv.Send(&src_id, reply_head, reply_body); }; - auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); }; + auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); }; if (ReplyRC()) { } diff --git a/utest/utest.cpp b/utest/utest.cpp index 5a04842..817cbaf 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,6 +1,5 @@ #include "center.h" #include "defs.h" -#include "failed_msg.h" #include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> @@ -21,8 +20,6 @@ struct IsSameType<A, A> { static const bool value = true; }; - -typedef FailedMsgQ ServerFailedQ; BOOST_AUTO_TEST_CASE(Temp) { @@ -232,7 +229,7 @@ MsgRequestTopic req; req.set_topic(topic); req.set_data("data " + std::to_string(i)); - if (!client.ClientAsyncRequest(req, 1000)) { + if (!client.ClientAsyncRequest(req)) { printf("client request failed\n"); ++count; } -- Gitblit v1.8.0