| | |
| | | return; |
| | | } |
| | | // LOG_FUNCTION; |
| | | const size_t total = msgs_.size(); |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t limit = std::max(10000ul, total / 10); |
| | | int64_t n = 0; |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | |
| | | ++n; |
| | | }; |
| | | int n = now - msg.timestamp(); |
| | | if (n < 10) { |
| | | if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > NodeTimeoutSec()) { |
| | | Free(); |
| | | } else { |
| | | ++it; |
| | | } else if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > 60) { |
| | | Free(); |
| | | } |
| | | } |
| | | if (n > 0) { |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; |
| | | } |
| | | } |
| | | |
| | |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | | } |
| | | bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | |
| | | NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) |
| | | { |
| | | auto ssn = dest.id_ - (dest.id_ % 10); |
| | | LOG_DEBUG() << "prox ssn " << ssn; |
| | | Node node; |
| | | auto ssn = mq_id - (mq_id % 10); |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos == nodes_.end()) { |
| | | LOG_ERROR() << "proxy msg, ssn not found."; |
| | | if (pos != nodes_.end()) { |
| | | node = pos->second; |
| | | } |
| | | return node; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | return false; |
| | | } |
| | | auto &node = pos->second; |
| | | if (!Valid(*node)) { return false; } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | | auto route = head.add_route(); |
| | |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node) { |
| | | LOG_ERROR() << id() << " pass remote reply , ssn not found."; |
| | | return false; |
| | | } |
| | | auto offset = node->addrs_[dest.id_]; |
| | | if (offset != dest.offset_) { |
| | | LOG_ERROR() << id() << " pass remote reply, dest address not match"; |
| | | return false; |
| | | } |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return DefaultSender(node->shm_).Send(dest, msg); |
| | | } |
| | | |
| | | void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) |
| | | { |
| | | // LOG_FUNCTION; |