| | |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | | } |
| | | bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | auto ssn = dest.id_ - (dest.id_ % 10); |
| | | LOG_DEBUG() << "prox ssn " << ssn; |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos == nodes_.end()) { |
| | | LOG_ERROR() << "proxy msg, ssn not found."; |
| | | return false; |
| | | } |
| | | auto &node = pos->second; |
| | | if (!Valid(*node)) { return false; } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(sender.id()); |
| | | route->set_abs_addr(sender.AbsAddr()); |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) |
| | | { |