| | |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); |
| | | if (!r) { |
| | | printf("send reply failed.\n"); |
| | | } |
| | |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | | MsgCommonReply reply; |
| | | MsgI pubmsg; |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | // send error reply. |
| | | MakeReplyer(socket, head, center->id())(reply); |
| | | } else if (pubmsg.MakeRC(socket.shm(), msg)) { |
| | | DEFER1(pubmsg.Release(socket.shm())); |
| | | } else { |
| | | MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); |
| | | if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | socket.Send(cli.mq_.data(), pubmsg, 10); |
| | | if (!socket.Send(cli.mq_.data(), msg, 100)) { |
| | | printf("center route publish failed. need resend.\n"); |
| | | } |
| | | } |
| | | } |
| | | } |