| | |
| | | #include "center.h" |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "failed_msg.h" |
| | | #include "shm.h" |
| | | #include <chrono> |
| | | #include <set> |
| | |
| | | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | { |
| | | |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner); |
| | | |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | auto center_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); |
| | | MsgI msg; |
| | | if (msg.Make(socket.shm(), reply_head, rep_body)) { |
| | | auto &remote = head.route(0).mq_id(); |
| | | bool r = socket.Send(remote.data(), msg, timeout_ms); |
| | | if (!r) { |
| | | printf("send reply failed.\n"); |
| | | failq.Push(remote, msg, 60s); // for later retry. |
| | | } |
| | | //TODO resend failed. |
| | | } |
| | | }; |
| | | }; |
| | | |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | | center_failed_q->TrySend(socket); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q); |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | |
| | | } |
| | | }; |
| | | |
| | | auto OnBusIdle = [](ShmSocket &socket) {}; |
| | | auto bus_failed_q = std::make_shared<FailedMsgQ>(); |
| | | auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | MakeReplyer(socket, head, center->id())(reply); |
| | | replyer(reply); |
| | | } else { |
| | | MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); |
| | | replyer(MakeReply(eSuccess)); |
| | | if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? |
| | | if (clients.empty()) { return; } |
| | | |
| | | for (auto &cli : clients) { |
| | | auto it = clients.begin(); |
| | | do { |
| | | auto &cli = *it; |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | if (!socket.Send(cli.mq_.data(), msg, 100)) { |
| | | printf("center route publish failed. need resend.\n"); |
| | | if (!socket.Send(cli.mq_.data(), msg, 0)) { |
| | | bus_failed_q->Push(cli.mq_, msg, 60s); |
| | | } |
| | | ++it; |
| | | } else { |
| | | it = clients.erase(it); |
| | | } |
| | | } |
| | | } while (it != clients.end()); |
| | | } |
| | | }; |
| | | switch (head.type()) { |
| | |
| | | { |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | sockets_[info.name_]->Start(info.handler_); |
| | | sockets_[info.name_]->Start(info.handler_, info.idle_); |
| | | } |
| | | |
| | | return true; |