add SendQ TrySend() TryRecv(); handle re-register.
| | |
| | | #include "center.h" |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "failed_msg.h" |
| | | #include "shm.h" |
| | | #include <chrono> |
| | | #include <set> |
| | |
| | | }; |
| | | |
| | | struct ProcState { |
| | | int64_t timestamp_; |
| | | int64_t timestamp_ = 0; |
| | | uint32_t flag_ = 0; // reserved |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | |
| | | } |
| | | |
| | | try { |
| | | Node node(new NodeInfo); |
| | | node->addrs_.insert(SrcAddr(head)); |
| | | for (auto &addr : msg.addrs()) { |
| | | node->addrs_.insert(addr.mq_id()); |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->addrs_.insert(SrcAddr(head)); |
| | | for (auto &addr : msg.addrs()) { |
| | | node->addrs_.insert(addr.mq_id()); |
| | | } |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = head.timestamp(); |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | }; |
| | | |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | if (pos == nodes_.end()) { // new client |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | } else { |
| | | Node &node = pos->second; |
| | | if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { |
| | | // node restarted, release old mq. |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | | } |
| | | node->addrs_.clear(); |
| | | } |
| | | UpdateRegInfo(node); |
| | | } |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = head.timestamp(); |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | | return MakeReply(eError, "register node error."); |
| | |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotRegistered, "Node is not registered."); |
| | | } else { |
| | | auto node = pos->second; |
| | | auto &node = pos->second; |
| | | if (!MatchAddr(node->addrs_, SrcAddr(head))) { |
| | | return MakeReply<Reply>(eAddressNotMatch, "Node address error."); |
| | | } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { |
| | |
| | | auto node = weak.lock(); |
| | | return node && Valid(*node); |
| | | } |
| | | void CheckAllNodes(); //TODO, call it in timer. |
| | | std::string id_; // center proc id; |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); |
| | | auto center_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | MsgI msg; |
| | | if (msg.Make(socket.shm(), reply_head, rep_body)) { |
| | | auto &remote = head.route(0).mq_id(); |
| | | bool r = socket.Send(remote.data(), msg, timeout_ms); |
| | | if (!r) { |
| | | failq.Push(remote, msg, 60s); // for later retry. |
| | | } |
| | | bool r = socket.Send(remote.data(), msg); |
| | | } |
| | | }; |
| | | }; |
| | | |
| | | auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | | center_failed_q->TrySend(socket); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | |
| | | } |
| | | }; |
| | | |
| | | auto bus_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | |
| | | auto &cli = *it; |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | if (!socket.Send(cli.mq_.data(), msg, 0)) { |
| | | bus_failed_q->Push(cli.mq_, msg, 60s); |
| | | } |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | socket.Send(cli.mq_.data(), msg); |
| | | ++it; |
| | | } else { |
| | | it = clients.erase(it); |
| | |
| | | |
| | | bool BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len, |
| | | const int timeout_ms) |
| | | const int reply_len) |
| | | { |
| | | MsgRequestTopicReply rep; |
| | | if (!rep.ParseFromArray(reply, reply_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | return ProcNode().ServerSendReply(src, rep, timeout_ms); |
| | | return ProcNode().ServerSendReply(src, rep); |
| | | } |
| | | |
| | | int BHCleanUp() |
| | |
| | | |
| | | bool BHSendReply(BHSrcInfo *src, |
| | | const void *reply, |
| | | const int reply_len, |
| | | const int timeout_ms); |
| | | const int reply_len); |
| | | |
| | | // int BHCleanUp(); |
| | | |
| | |
| | | return [remote, msg](void *valid_sock) { |
| | | assert(valid_sock); |
| | | ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock); |
| | | bool r = sock.Send(remote.data(), msg, 0); |
| | | bool r = sock.Send(remote.data(), msg); |
| | | //TODO check remote removed. |
| | | if (r && msg.IsCounted()) { |
| | | auto tmp = msg; // Release() is not const, but it's safe to release. |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: sendq.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月14日 09时22分50秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "sendq.h" |
| | | #include "shm_queue.h" |
| | | #include <chrono> |
| | | |
| | | bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq) |
| | | { |
| | | auto FirstNotExpired = [](MsgList &l) { |
| | | auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; |
| | | return std::lower_bound(l.begin(), l.end(), Now(), Less); |
| | | }; |
| | | |
| | | auto SendOneRemote = [&](const Remote &remote, MsgList &msg_list) { |
| | | auto pos = FirstNotExpired(msg_list); |
| | | for (auto it = msg_list.begin(); it != pos; ++it) { |
| | | auto &info = it->data(); |
| | | if (info.on_expire_) { |
| | | info.on_expire_(info.msg_); |
| | | } |
| | | info.msg_.Release(mq.shm()); |
| | | } |
| | | |
| | | //TODO maybe use TrySendAll ? |
| | | while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) { |
| | | auto &msg = pos->data().msg_; |
| | | if (msg.IsCounted()) { |
| | | msg.Release(mq.shm()); |
| | | } |
| | | ++pos; |
| | | } |
| | | msg_list.erase(msg_list.begin(), pos); |
| | | }; |
| | | |
| | | if (!store_.empty()) { |
| | | auto rec = store_.begin(); |
| | | do { |
| | | SendOneRemote(rec->first, rec->second); |
| | | if (rec->second.empty()) { |
| | | rec = store_.erase(rec); |
| | | } else { |
| | | ++rec; |
| | | } |
| | | } while (rec != store_.end()); |
| | | } |
| | | return !store_.empty(); |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: sendq.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月14日 09时22分59秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef SENDQ_IWKMSK7M |
| | | #define SENDQ_IWKMSK7M |
| | | |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "timed_queue.h" |
| | | #include <deque> |
| | | #include <functional> |
| | | #include <string> |
| | | #include <unordered_map> |
| | | |
| | | namespace bhome_shm |
| | | { |
| | | class ShmMsgQueue; |
| | | } // namespace bhome_shm |
| | | |
| | | class SendQ |
| | | { |
| | | public: |
| | | typedef std::string Remote; |
| | | typedef bhome_msg::MsgI MsgI; |
| | | typedef std::function<void(const MsgI &msg)> OnMsgEvent; |
| | | struct MsgInfo { |
| | | MsgI msg_; |
| | | OnMsgEvent on_expire_; |
| | | // OnMsgEvent on_send_; |
| | | }; |
| | | typedef TimedData<MsgInfo> TimedMsg; |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | Append(std::string((const char *) &id, sizeof(id)), msg, onExpire); |
| | | } |
| | | void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | using namespace std::chrono_literals; |
| | | Append(addr, msg, Now() + 60s, onExpire); |
| | | } |
| | | bool TrySend(bhome_shm::ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | msg.AddRef(); |
| | | store_[addr].emplace_back(TimedMsg(expire, MsgInfo{msg, onExpire})); |
| | | } |
| | | typedef std::deque<TimedMsg> MsgList; |
| | | typedef std::unordered_map<Remote, MsgList> Store; |
| | | |
| | | Store store_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SENDQ_IWKMSK7M |
| | |
| | | { |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | if (remote) { |
| | | return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | if (onsend) { |
| | | return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return false; |
| | | } |
| | | } |
| | | bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | if (remote) { |
| | | return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); |
| | | if (onsend) { |
| | | return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | return false; |
| | |
| | | // 1) build msg first, then find remote queue; |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | // bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms) |
| | | // { |
| | | // if (Read(imsg, timeout_ms)) { |
| | | // // DEFER1(imsg.Release(shm());); |
| | | // return imsg.ParseHead(head); |
| | | // } else { |
| | | // return false; |
| | | // } |
| | | // } |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | return cur + millisec(ms); |
| | | } |
| | | |
| | | auto TimedReadPred(const int timeout_ms) |
| | | { |
| | | auto endtime = MSFromNow(timeout_ms); |
| | | return [this, endtime](Guard &lock) { |
| | | return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })); |
| | | }; |
| | | } |
| | | auto TryReadPred() |
| | | { |
| | | return [this](Guard &lock) { return !this->empty(); }; |
| | | } |
| | | |
| | | template <class Pred, class OnData> |
| | | int ReadAllOnCond(Pred const &pred, OnData const &onData) |
| | | { |
| | | Guard lock(this->mutex()); |
| | | int n = 0; |
| | | while (pred(lock)) { |
| | | ++n; |
| | | onData(this->front()); |
| | | this->pop_front(); |
| | | this->cond_write_.notify_one(); |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | template <class Pred> |
| | | bool ReadOnCond(D &buf, Pred const &pred) |
| | | { |
| | | int flag = 0; |
| | | auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); }; |
| | | auto onData = [&buf](D &d) { |
| | | using std::swap; |
| | | swap(buf, d); |
| | | }; |
| | | return ReadAllOnCond(only_once, onData); |
| | | } |
| | | |
| | | template <class Iter, class Pred, class OnWrite> |
| | | int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite) |
| | | { |
| | | if (begin == end) { return 0; } |
| | | |
| | | int n = 0; |
| | | Guard lock(mutex()); |
| | | while (pred(lock)) { |
| | | onWrite(*begin); |
| | | this->push_back(*begin); |
| | | ++n; |
| | | cond_read_.notify_one(); |
| | | if (++begin == end) { |
| | | break; |
| | | } |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | public: |
| | | SharedQueue(const uint32_t len, Allocator<D> const &alloc) : |
| | | Super(len, alloc) {} |
| | |
| | | template <class Iter, class OnWrite> |
| | | int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) |
| | | { |
| | | int n = 0; |
| | | if (begin != end) { |
| | | auto endtime = MSFromNow(timeout_ms); |
| | | Guard lock(mutex()); |
| | | while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) { |
| | | onWrite(*begin); |
| | | this->push_back(*begin); |
| | | ++n; |
| | | cond_read_.notify_one(); |
| | | if (++begin == end) { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | return n; |
| | | auto endtime = MSFromNow(timeout_ms); |
| | | auto timedWritePred = [this, endtime](Guard &lock) { |
| | | return (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })); |
| | | }; |
| | | return WriteAllOnCond(begin, end, timedWritePred, onWrite); |
| | | } |
| | | |
| | | template <class OnWrite> |
| | | bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) |
| | | { |
| | | return Write(&buf, (&buf) + 1, timeout_ms, onWrite); |
| | | } |
| | | bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); } |
| | | bool Write(const D &buf, const int timeout_ms) |
| | | { |
| | | return Write(buf, timeout_ms, [](const D &buf) {}); |
| | | } |
| | | |
| | | template <class OnData> |
| | | bool Read(const int timeout_ms, OnData onData) |
| | | template <class Iter, class OnWrite> |
| | | int TryWrite(Iter begin, Iter end, const OnWrite &onWrite) |
| | | { |
| | | int n = 0; |
| | | auto endtime = MSFromNow(timeout_ms); |
| | | Guard lock(mutex()); |
| | | while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) { |
| | | const bool more = onData(this->front()); |
| | | this->pop_front(); |
| | | cond_write_.notify_one(); |
| | | ++n; |
| | | if (!more) { |
| | | break; |
| | | } |
| | | } |
| | | return n; |
| | | auto tryWritePred = [this](Guard &lock) { return !this->full(); }; |
| | | return WriteAllOnCond(begin, end, tryWritePred, onWrite); |
| | | } |
| | | |
| | | bool Read(D &buf, const int timeout_ms) |
| | | template <class OnWrite> |
| | | bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); } |
| | | |
| | | bool TryWrite(const D &buf) |
| | | { |
| | | auto read1 = [&](D &d) { |
| | | using std::swap; |
| | | swap(buf, d); |
| | | return false; |
| | | }; |
| | | return Read(timeout_ms, read1) == 1; |
| | | return TryWrite(buf, [](const D &buf) {}); |
| | | } |
| | | |
| | | template <class OnData> |
| | | int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); } |
| | | template <class OnData> |
| | | int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); } |
| | | |
| | | bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); } |
| | | bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); } |
| | | }; |
| | | |
| | | using namespace bhome_msg; |
| | |
| | | typedef ShmObject<SharedQueue<MsgI>> Super; |
| | | typedef Super::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } |
| | | bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } |
| | | MQId id_; |
| | | |
| | | protected: |
| | |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(SharedMemory &shm, const MQId &id); |
| | | const MQId &Id() const { return id_; } |
| | | using Super::shm; |
| | | |
| | | // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms); |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return data()->TryRead(msg); } |
| | | template <class OnData> |
| | | int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); } |
| | | |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend()); |
| | | static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | |
| | | template <class... Rest> |
| | | bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | |
| | | size_t Pending() const { return data()->size(); } |
| | | }; |
| | | |
| | |
| | | } // namespace |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : |
| | | shm_(shm), run_(false), mq_(id, shm, len) |
| | | run_(false), mq_(id, shm, len) |
| | | { |
| | | } |
| | | ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : |
| | | shm_(shm), run_(false), mq_(shm, len) {} |
| | | run_(false), mq_(shm, len) {} |
| | | |
| | | ShmSocket::~ShmSocket() |
| | | { |
| | |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (per_msg_cbs_->Find(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | | } else { // else ignored, or dropped |
| | | } |
| | | }; |
| | | auto ioProc = [this, onData, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_->TrySend(mq()); }; |
| | | auto DoRecv = [=] { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (per_msg_cbs_->Find(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | | } |
| | | }; |
| | | |
| | | auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { |
| | | try { |
| | | MsgI imsg; |
| | | if (mq().Recv(imsg, 10)) { |
| | | // do not recv if no cb is set. |
| | | if (!onData && per_msg_cbs_->empty()) { |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release(shm())); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | | } |
| | | } |
| | | if (onIdle) { |
| | | onIdle(*this); |
| | | }; |
| | | return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs. |
| | | }; |
| | | |
| | | try { |
| | | bool more_to_send = DoSend(); |
| | | bool more_to_recv = DoRecv(); |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | std::this_thread::yield(); |
| | | } |
| | | } catch (...) { |
| | | } |
| | |
| | | |
| | | run_.store(true); |
| | | for (int i = 0; i < nworker; ++i) { |
| | | workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } }); |
| | | workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | return false; |
| | | } |
| | | |
| | | //maybe reimplment, using async cbs? |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) |
| | | { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | auto Recv = [&]() { |
| | | if (mq().Recv(msg, timeout_ms)) { |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. |
| | | bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); |
| | | if (got) { |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | return false; |
| | | }; |
| | | return !RunningNoLock() && Recv(); |
| | | } |
| | | return false; |
| | | } |
| | |
| | | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "sendq.h" |
| | | #include "shm_queue.h" |
| | | #include <atomic> |
| | | #include <boost/noncopyable.hpp> |
| | |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | | template <class DoSend> |
| | | inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) |
| | | bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) |
| | | { |
| | | bool r = false; |
| | | DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); |
| | | r = doSend(msg); |
| | | return r; |
| | | send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); |
| | | return true; |
| | | } |
| | | |
| | | protected: |
| | |
| | | ~ShmSocket(); |
| | | static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); } |
| | | const MQId &id() const { return mq().Id(); } |
| | | Shm &shm() { return shm_; } |
| | | // start recv. |
| | | bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } |
| | |
| | | bool Stop(); |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) |
| | | { |
| | | assert(valid_remote); |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg); |
| | | } |
| | | //TODO reimplment, using async. |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb) |
| | | { |
| | | //TODO send_buffer_ need flag, and remove callback on expire. |
| | | MsgI msg; |
| | | if (msg.Make(shm(), head, body)) { |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Add(msg_id, cb); |
| | | auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Find(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, msg, onExpireRemoveCB); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool Send(const void *valid_remote, const MsgI &imsg) |
| | | { |
| | | return SendImpl(valid_remote, imsg); |
| | | } |
| | | |
| | | bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) |
| | | { |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) |
| | | { |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | |
| | | }; |
| | | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | bool sendok = Send(remote, head, body, timeout_ms, OnRecv); |
| | | bool sendok = Send(remote, head, body, OnRecv); |
| | | if (!sendok) { |
| | | printf("send timeout\n"); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | Shm &shm() const { return mq().shm(); } |
| | | |
| | | protected: |
| | | const Shm &shm() const { return shm_; } |
| | | Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid. |
| | | const Queue &mq() const { return mq_; } |
| | | std::mutex &mutex() { return mutex_; } |
| | |
| | | bool StopNoLock(); |
| | | bool RunningNoLock() { return !workers_.empty(); } |
| | | |
| | | Shm &shm_; |
| | | std::vector<std::thread> workers_; |
| | | std::mutex mutex_; |
| | | std::atomic<bool> run_; |
| | |
| | | std::unordered_map<std::string, RecvCB> store_; |
| | | |
| | | public: |
| | | bool empty() const { return store_.empty(); } |
| | | bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } |
| | | bool Find(const std::string &id, RecvCB &cb) |
| | | { |
| | |
| | | }; |
| | | |
| | | Synced<AsyncCBs> per_msg_cbs_; |
| | | Synced<SendQ> send_buffer_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: failed_msg.h |
| | | * Filename: timed_queue.h |
| | | * |
| | | * Description: |
| | | * |
| | |
| | | expire_(expire), data_(data) {} |
| | | TimedData(const TimePoint &expire, Data &&data) : |
| | | expire_(expire), data_(std::move(data)) {} |
| | | bool Expired() { return Clock::now() > expire_; } |
| | | bool Expired() const { return Clock::now() > expire_; } |
| | | const TimePoint &expire() const { return expire_; } |
| | | Data &data() { return data_; } |
| | | Data const &data() const { return data_; } |
| | | |
| | |
| | | */ |
| | | #include "topic_node.h" |
| | | #include "bh_util.h" |
| | | #include "failed_msg.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | |
| | |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | typedef FailedMsgQ ServerFailedQ; |
| | | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | { |
| | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | info_ = body; |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | info_ = body; |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | // TODO update proc info |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | return (r && IsSuccess(reply_body.errmsg().errcode())); |
| | | } |
| | | return r; |
| | | } |
| | | bool TopicNode::Heartbeat(const int timeout_ms) |
| | | { |
| | | ProcInfo proc; |
| | | proc.set_proc_id(proc_id()); |
| | | MsgCommonReply reply_body; |
| | | return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode()); |
| | | return Heartbeat(proc, reply_body, timeout_ms); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | auto head(InitMsgHead(GetType(body), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) |
| | | { |
| | | auto failed_q = std::make_shared<ServerFailedQ>(); |
| | | auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | |
| | | auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { |
| | | MsgRequestTopic req; |
| | | if (imsg.ParseBody(req)) { |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | if (!sock.Send(remote.data(), msg, 10)) { |
| | | failed_q->Push(remote, msg, 10s); |
| | | } |
| | | } |
| | | } |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), msg); |
| | | } |
| | | } |
| | | |
| | | onIdle(sock); |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return rcb && sock.Start(onRecv, onIdle, nworker); |
| | | return rcb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms) |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); |
| | | return sock.Send(p->route.back().mq_id().data(), head, body); |
| | | } |
| | | |
| | | bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) |
| | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb) |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(remote, head, req, timeout_ms, onRecv); |
| | | return sock.Send(remote, head, req, onRecv); |
| | | } else { |
| | | return sock.Send(remote, head, req, timeout_ms); |
| | | return sock.Send(remote, head, req); |
| | | } |
| | | }; |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) { |
| | | if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicBusAddress(), head, pub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | } |
| | | } catch (...) { |
| | | } |
| | | return false; |
| | |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicBusAddress(), head, sub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | } |
| | | // TODO wait for result? |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | bool ServerStart(ServerCB const &cb, const int nworker = 2); |
| | | bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); |
| | | bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); |
| | | |
| | | // topic client |
| | | typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB; |
| | | bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); |
| | | bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); |
| | | |
| | | // publish |
| | |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | return cli.Send(&srv.id(), req_head, req_body, 100); |
| | | return cli.Send(&srv.id(), req_head, req_body); |
| | | }; |
| | | auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); }; |
| | | auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); }; |
| | | |
| | | if (!ReqRC()) { |
| | | printf("********** client send error.\n"); |
| | |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); |
| | | return srv.Send(&src_id, reply_head, reply_body, 100); |
| | | return srv.Send(&src_id, reply_head, reply_body); |
| | | }; |
| | | auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); }; |
| | | auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); }; |
| | | |
| | | if (ReplyRC()) { |
| | | } |
| | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "failed_msg.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | |
| | | struct IsSameType<A, A> { |
| | | static const bool value = true; |
| | | }; |
| | | |
| | | typedef FailedMsgQ ServerFailedQ; |
| | | |
| | | BOOST_AUTO_TEST_CASE(Temp) |
| | | { |
| | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data("data " + std::to_string(i)); |
| | | if (!client.ClientAsyncRequest(req, 1000)) { |
| | | if (!client.ClientAsyncRequest(req)) { |
| | | printf("client request failed\n"); |
| | | ++count; |
| | | } |