| | |
| | | */ |
| | | #include "topic_node.h" |
| | | #include "bh_util.h" |
| | | #include "failed_msg.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | |
| | |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | typedef FailedMsgQ ServerFailedQ; |
| | | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | { |
| | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | info_ = body; |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | info_ = body; |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | // TODO update proc info |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | return (r && IsSuccess(reply_body.errmsg().errcode())); |
| | | } |
| | | return r; |
| | | } |
| | | bool TopicNode::Heartbeat(const int timeout_ms) |
| | | { |
| | | ProcInfo proc; |
| | | proc.set_proc_id(proc_id()); |
| | | MsgCommonReply reply_body; |
| | | return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode()); |
| | | return Heartbeat(proc, reply_body, timeout_ms); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | auto head(InitMsgHead(GetType(body), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) |
| | | { |
| | | auto failed_q = std::make_shared<ServerFailedQ>(); |
| | | auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | |
| | | auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { |
| | | MsgRequestTopic req; |
| | | if (imsg.ParseBody(req)) { |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | if (!sock.Send(remote.data(), msg, 10)) { |
| | | failed_q->Push(remote, msg, 10s); |
| | | } |
| | | } |
| | | } |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), msg); |
| | | } |
| | | } |
| | | |
| | | onIdle(sock); |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | | return rcb && sock.Start(onRecv, onIdle, nworker); |
| | | return rcb && sock.Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms) |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); |
| | | return sock.Send(p->route.back().mq_id().data(), head, body); |
| | | } |
| | | |
| | | bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) |
| | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb) |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(remote, head, req, timeout_ms, onRecv); |
| | | return sock.Send(remote, head, req, onRecv); |
| | | } else { |
| | | return sock.Send(remote, head, req, timeout_ms); |
| | | return sock.Send(remote, head, req); |
| | | } |
| | | }; |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) { |
| | | if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicBusAddress(), head, pub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | } |
| | | } catch (...) { |
| | | } |
| | | return false; |
| | |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicBusAddress(), head, sub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | } |
| | | // TODO wait for result? |
| | | } catch (...) { |
| | | return false; |
| | | } |