remove sync recv, node cache msgs for sync recv.
| | |
| | | p += 4; |
| | | return head.ParseFromArray(p, msg_size); |
| | | } |
| | | std::string body() const |
| | | { |
| | | auto p = get<char>(); |
| | | assert(p); |
| | | uint32_t size = Get32(p); |
| | | p += 4; |
| | | p += size; |
| | | size = Get32(p); |
| | | p += 4; |
| | | return std::string(p, size); |
| | | } |
| | | template <class Body> |
| | | bool ParseBody(Body &body) const |
| | | { |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms) |
| | | { |
| | | return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms); |
| | | } |
| | | //maybe reimplment, using async cbs? |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) |
| | | { |
| | | // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. |
| | | bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); |
| | | if (got) { |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) |
| | | { |
| | | size_t size = content.size(); |
| | |
| | | { |
| | | return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); |
| | | } |
| | | bool SyncRecv(int64_t &cmd, const int timeout_ms); |
| | | bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | |
| | | |
| | | Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; |
| | | Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; |
| | | |
| | | SendQ send_buffer_; |
| | | |
| | | // node request center alloc memory. |
| | | int node_proc_index_ = -1; |
| | | int socket_index_ = -1; |
| | |
| | | } |
| | | SetProcIndex(reply.proc_index()); |
| | | this->state_ = eStateUnregistered; |
| | | auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | server_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | SockServer().Start(onRequest); |
| | | auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | sub_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | SockSub().Start(onSub); |
| | | } |
| | | } break; |
| | | default: break; |
| | |
| | | |
| | | bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) |
| | | { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | if (acb) { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | | if (!p) { |
| | | throw std::runtime_error("no memory."); |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | | if (!p) { |
| | | throw std::runtime_error("no memory."); |
| | | } |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "error server handle msg:" << e.what(); |
| | | } |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "error server handle msg:" << e.what(); |
| | | } |
| | | }; |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return acb && sock.Start(onRecv, nworker); |
| | | return SockServer().Start(onRecv, nworker); |
| | | } else { |
| | | auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | server_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | return SockServer().Start(onRequest, nworker); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | | MsgI imsg; |
| | | BHMsgHead head; |
| | | if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | if (imsg.ParseBody(request)) { |
| | | std::string body; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | while (!server_buffer_->Read(head, body)) { |
| | | if (steady_clock::now() < end_time) { |
| | | robust::QuickSleep(); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | if (request.ParseFromString(body)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | |
| | | |
| | | bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) |
| | | { |
| | | auto &sock = SockSub(); |
| | | |
| | | auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub); |
| | | if (tdcb) { |
| | | auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | }; |
| | | |
| | | return tdcb && sock.Start(AsyncRecvProc, nworker); |
| | | }; |
| | | return SockSub().Start(AsyncRecvProc, nworker); |
| | | } else { |
| | | auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | sub_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | return SockSub().Start(onSub, nworker); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release();); |
| | | BHMsgHead head; |
| | | std::string body; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | while (!sub_buffer_->Read(head, body)) { |
| | | if (steady_clock::now() < end_time) { |
| | | robust::QuickSleep(); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | //TODO error msg. |
| | | if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { |
| | | if (msg.ParseBody(pub)) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | if (pub.ParseFromString(body)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | return true; |
| | | } |
| | |
| | | int proc_index_ = -1; |
| | | |
| | | TopicQueryCache topic_query_cache_; |
| | | |
| | | class RecvQ |
| | | { |
| | | public: |
| | | void Write(BHMsgHead &&head, std::string &&body) { q_.push_back({std::move(head), std::move(body)}); } |
| | | bool Read(BHMsgHead &head, std::string &body) |
| | | { |
| | | if (q_.empty()) { |
| | | return false; |
| | | } else { |
| | | head = std::move(q_.front().head); |
| | | body = std::move(q_.front().body); |
| | | q_.pop_front(); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private: |
| | | struct MsgData { |
| | | BHMsgHead head; |
| | | std::string body; |
| | | }; |
| | | std::deque<MsgData> q_; |
| | | }; |
| | | Synced<RecvQ> server_buffer_; |
| | | Synced<RecvQ> sub_buffer_; |
| | | }; |
| | | |
| | | #endif // end of include guard: TOPIC_NODE_YVKWA6TF |
| | |
| | | reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); |
| | | if (reg) { |
| | | printf("register ok\n"); |
| | | // bool r = BHUnregister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); |
| | | // printf("unregister %s\n", r ? "ok" : "failed"); |
| | | // reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); |
| | | // if (!reg) { |
| | | // int ec = 0; |
| | | // std::string msg; |
| | | // GetLastError(ec, msg); |
| | | // printf("reg error: %s\n", msg.c_str()); |
| | | // } |
| | | } else { |
| | | int ec = 0; |
| | | std::string msg; |
| | |
| | | |
| | | auto SyncRequest = [&](int idx) { // SyncRequest |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic_ + std::to_string(idx)); |
| | | req.set_topic(topic_ + std::to_string(0)); |
| | | req.set_data("request_data_" + std::to_string(idx)); |
| | | std::string s(req.SerializeAsString()); |
| | | // Sleep(10ms, false); |
| | |
| | | |
| | | std::atomic<bool> run(true); |
| | | |
| | | BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | ThreadManager threads; |
| | | |
| | | #if 0 |
| | | BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | #else |
| | | BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); |
| | | threads.Launch(ServerLoop, &run); |
| | | #endif |
| | | |
| | | boost::timer::auto_cpu_timer timer; |
| | | threads.Launch(hb, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const int64_t nreq = 1000 * 100; |
| | | |
| | | #if 1 |
| | | for (int i = 0; i < ncli; ++i) { |
| | | threads.Launch(asyncRequest, nreq); |
| | | } |
| | | // for (int i = 0; i < 100; ++i) { |
| | | // SyncRequest(0); |
| | | // } |
| | | #else |
| | | for (int i = 0; i < 100; ++i) { |
| | | SyncRequest(i); |
| | | } |
| | | #endif |
| | | |
| | | int same = 0; |
| | | uint64_t last = 0; |
| | |
| | | Req(); |
| | | } |
| | | }; |
| | | auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()}; |
| | | |
| | | std::atomic<bool> stop(false); |
| | | auto Server = [&]() { |
| | | MsgI req; |
| | | BHMsgHead req_head; |
| | | |
| | | while (!stop) { |
| | | if (srv.SyncRecv(req, req_head, 10)) { |
| | | DEFER1(req.Release()); |
| | | |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | | MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()}; |
| | | auto Reply = [&]() { |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); |
| | | return srv.Send(src_mq, reply_head, reply_body); |
| | | }; |
| | | Reply(); |
| | | } |
| | | } |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id())); |
| | | srv.Send(src_mq, reply_head, reply_body); |
| | | } |
| | | }; |
| | | srv.Start(onRequest); |
| | | |
| | | boost::timer::auto_cpu_timer timer; |
| | | DEFER1(printf("Request Reply Test:");); |
| | | |
| | | ThreadManager clients, servers; |
| | | for (int i = 0; i < 2; ++i) { servers.Launch(Server); } |
| | | ThreadManager clients; |
| | | |
| | | printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); |
| | | for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } |
| | | clients.WaitAll(); |
| | |
| | | std::this_thread::sleep_for(100ms); |
| | | } while (count.load() < ncli * nmsg); |
| | | PrintStatus(NowSec()); |
| | | stop = true; |
| | | servers.WaitAll(); |
| | | srv.Stop(); |
| | | // BOOST_CHECK_THROW(reply.Count(), int); |
| | | } |