| | |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | for (auto &p : sockets_) { |
| | | p->Start(default_ignore_msg); |
| | | } |
| | | SockNode().Start(default_ignore_msg); |
| | | // for (auto &p : sockets_) { |
| | | // p->Start(default_ignore_msg); |
| | | // } |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | |
| | | AddId(SockSub().id()); |
| | | AddId(SockPub().id()); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | |
| | | MsgUnregister body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | |
| | | MsgHeartbeat body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | |
| | | MsgRegisterRPC body; |
| | | body.mutable_topics()->Swap(&topics); |
| | | |
| | | auto head(InitMsgHead(GetType(body), proc_id())); |
| | | auto head(InitMsgHead(GetType(body), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id())); |
| | | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); |
| | | BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id)); |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | |
| | | |
| | | auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto &sock = SockClient(); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(req.topic()); |
| | | |
| | |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id())); |
| | | printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id()); |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(request.topic()); |
| | | |
| | |
| | | |
| | | try { |
| | | auto &sock = SockPub(); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | MsgSubscribe sub; |
| | | sub.mutable_topics()->Swap(&topics); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicBusAddress(), head, sub); |