| | |
| | | } |
| | | SetProcIndex(reply.proc_index()); |
| | | this->state_ = eStateUnregistered; |
| | | |
| | | ServerStart(ServerAsyncCB(), 1); |
| | | SubscribeStartWorker(SubDataCB(), 1); |
| | | } |
| | | } break; |
| | | default: break; |
| | |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryProcReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | |
| | | |
| | | bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) |
| | | { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | if (acb) { |
| | | auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | | if (!p) { |
| | | throw std::runtime_error("no memory."); |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | | if (!p) { |
| | | throw std::runtime_error("no memory."); |
| | | } |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "error server handle msg:" << e.what(); |
| | | } |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "error server handle msg:" << e.what(); |
| | | } |
| | | }; |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return acb && sock.Start(onRecv, nworker); |
| | | return SockServer().Start(onRecv, nworker); |
| | | } else { |
| | | auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | server_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | return SockServer().Start(onRequest, nworker); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | | MsgI imsg; |
| | | BHMsgHead head; |
| | | if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | if (imsg.ParseBody(request)) { |
| | | std::string body; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | while (!server_buffer_->Read(head, body)) { |
| | | if (steady_clock::now() < end_time) { |
| | | robust::QuickSleep(); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (head.type() == kMsgTypeRequestTopic) { |
| | | if (request.ParseFromString(body)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | |
| | | |
| | | bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) |
| | | { |
| | | auto &sock = SockSub(); |
| | | |
| | | auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub); |
| | | if (tdcb) { |
| | | auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | }; |
| | | |
| | | return tdcb && sock.Start(AsyncRecvProc, nworker); |
| | | }; |
| | | return SockSub().Start(AsyncRecvProc, nworker); |
| | | } else { |
| | | auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | sub_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | return SockSub().Start(onSub, nworker); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release();); |
| | | BHMsgHead head; |
| | | std::string body; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | while (!sub_buffer_->Read(head, body)) { |
| | | if (steady_clock::now() < end_time) { |
| | | robust::QuickSleep(); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | //TODO error msg. |
| | | if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { |
| | | if (msg.ParseBody(pub)) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | if (pub.ParseFromString(body)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | return true; |
| | | } |