From 6c07fe29a5185835f28059f627a1d30e462da28b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 29 六月 2021 14:01:19 +0800 Subject: [PATCH] add notify node change. --- box/tcp_connection.cpp | 33 +++++++++++++++++++-------------- 1 files changed, 19 insertions(+), 14 deletions(-) diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp index 02001bb..6506369 100644 --- a/box/tcp_connection.cpp +++ b/box/tcp_connection.cpp @@ -173,21 +173,26 @@ if (recv_done) { LOG_TRACE() << "tcp server recv request data, size: " << size; MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()}; - if (remote.id_ && remote.offset_) { - auto self(shared_from_this()); - auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - send_buffer_ = imsg.content(); - async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); - }; - auto &scenter = *pscenter_; - if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { - return; - } - } else { - LOG_DEBUG() << "no address"; - } - Close(); + auto self(shared_from_this()); + auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + send_buffer_ = imsg.content(); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + }; + auto &scenter = *pscenter_; + if (head.type() == kMsgTypePublish) { + auto reply = MakeReply(eSuccess); + auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id()); + send_buffer_ = MsgI::Serialize(rep_head, reply); + async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); })); + + scenter->RemotePublish(head, body_content); + return; + } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) { + return; + } else { + Close(); + } } else { // not complete, read again LOG_TRACE() << "not complete, read again " << recv_buffer_.size(); socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); })); -- Gitblit v1.8.0