From cf05ea3d9f43e4e84d621e1f9d54cbef552b6e2b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 18 五月 2021 16:53:28 +0800 Subject: [PATCH] fix center init mutex. --- src/topic_node.cpp | 73 +++++++++++++----------------------- 1 files changed, 27 insertions(+), 46 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 51a0ab7..43d748f 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -70,69 +70,50 @@ } LOG_DEBUG() << "Node Init, id " << ssn_id_; auto NodeInit = [&]() { - auto SendInitCmd = [&]() { - int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); - auto end_time = steady_clock::now() + 3s; - bool r = false; - do { - r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd); - } while (!r && steady_clock::now() < end_time); - return r; - }; - if (SendInitCmd()) { - LOG_DEBUG() << "node send init ok"; - auto end_time = steady_clock::now() + 3s; - do { - try { - //TODO recv offset, avoid query. - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); - } - break; - } catch (...) { - sockets_.clear(); - std::this_thread::sleep_for(100ms); - } - } while (steady_clock::now() < end_time); + int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); + int64_t reply = 0; + if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { + int64_t abs_addr = reply >> 4; + sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_)); + LOG_DEBUG() << "node init ok"; + } else { + LOG_ERROR() << "Node Init Error"; } }; if (sockets_.empty()) { NodeInit(); } if (!sockets_.empty()) { - auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) { - LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val); - switch (DecodeCmd(val)) { - case eCmdNodeInitReply: { - MsgI msg(val >> 4); - DEFER1(msg.Release()); + auto onMsg = [this](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + LOG_DEBUG() << "node recv type: " << head.type(); + switch (head.type()) { + case kMsgTypeProcInit: { + // reuse msg to send proc init. MsgProcInit body; + body.set_extra_mq_num(eSockEnd - eSockStart - 1); auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); AddRoute(head, socket); - if (msg.Fill(head, body)) { - socket.Send(BHTopicCenterAddress(), msg); + if (imsg.Fill(head, body)) { + socket.Send(BHTopicCenterAddress(), imsg); } } break; - default: - break; - } - return true; - }; - - // recv msgs to avoid memory leak. - auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - LOG_DEBUG() << "node recv type: " << head.type(); - if (head.type() == kMsgTypeProcInitReply) { + case kMsgTypeProcInitReply: { LOG_DEBUG() << "got proc init reply"; MsgProcInitReply reply; - if (imsg.ParseBody(reply)) { + if (imsg.ParseBody(reply) && IsSuccess(reply.errmsg().errcode())) { + for (auto &addr : reply.extra_mqs()) { + LOG_DEBUG() << "add socket " << addr.abs_addr() << ", id:" << addr.mq_id(); + sockets_.emplace_back(new ShmSocket(addr.abs_addr(), shm(), addr.mq_id())); + } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; } + } break; + default: break; } return true; }; - SockNode().Start(1, onMsg, onNodeCmd); + SockNode().Start(1, onMsg); return true; } return false; @@ -174,10 +155,10 @@ } auto end_time = steady_clock::now() + milliseconds(timeout_ms); - while (state_ != eStateUnregistered && steady_clock::now() < end_time) { + while (!Valid() && steady_clock::now() < end_time) { std::this_thread::yield(); } - if (state_ != eStateUnregistered) { + if (!Valid()) { SetLastError(eError, kErrMsgNotInit); return false; } -- Gitblit v1.8.0