From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 17:02:21 +0800
Subject: [PATCH] remove sync recv, node cache msgs for sync recv.
---
box/center.cpp | 71 +++++++++++++++++++++++------------
1 files changed, 47 insertions(+), 24 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 2f244b4..9ecd04b 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -212,30 +212,27 @@
// center name, no relative to shm.
const std::string &id() const { return id_; }
- void OnNodeInit(ShmSocket &socket, const int64_t val)
+ int64_t OnNodeInit(ShmSocket &socket, const int64_t val)
{
LOG_FUNCTION;
SharedMemory &shm = socket.shm();
- MQId ssn = (val >> 4) & MaskBits(60);
+ MQId ssn = (val >> 4) & MaskBits(56);
+ int reply = EncodeCmd(eCmdNodeInitReply);
+
if (nodes_.find(ssn) != nodes_.end()) {
- return; // ignore in exists.
+ return reply; // ignore if exists.
}
+
auto UpdateRegInfo = [&](Node &node) {
node->state_.timestamp_ = NowSec() - offline_time_;
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
// create sockets.
- const int nsocks = 4;
try {
- for (int i = 0; i < nsocks; ++i) {
- ShmSocket tmp(shm, true, ssn + i, 16);
- node->addrs_.emplace(ssn + i, tmp.AbsAddr());
- }
+ ShmSocket tmp(shm, true, ssn, 16);
+ node->addrs_.emplace(ssn, tmp.AbsAddr());
return true;
} catch (...) {
- for (int i = 0; i < nsocks; ++i) {
- ShmSocket::Remove(shm, ssn + i);
- }
return false;
}
};
@@ -243,25 +240,29 @@
auto PrepareProcInit = [&](Node &node) {
bool r = false;
ShmMsg init_msg;
- if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
- // 31bit pointer, 4bit cmd+flag
- int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
- r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg);
- }
- return r;
+ DEFER1(init_msg.Release());
+ MsgProcInit body;
+ auto head = InitMsgHead(GetType(body), id(), ssn);
+ return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) &&
+ init_msg.Fill(ShmMsg::Serialize(head, body)) &&
+ SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
};
Node node(new NodeInfo);
if (UpdateRegInfo(node) && PrepareProcInit(node)) {
+ reply |= (node->addrs_[ssn] << 4);
nodes_[ssn] = node;
LOG_INFO() << "new node ssn (" << ssn << ") init";
} else {
- for (int i = 0; i < 10; ++i) {
- ShmSocket::Remove(shm, ssn + i);
- }
+ ShmSocket::Remove(shm, ssn);
}
+ return reply;
}
- void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
+ void RecordMsg(const MsgI &msg)
+ {
+ msg.reset_managed(true);
+ msgs_.RecordMsg(msg);
+ }
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
{
@@ -325,7 +326,6 @@
assert(IsCmd(val));
int cmd = DecodeCmd(val);
switch (cmd) {
- case eCmdNodeInit: OnNodeInit(socket, val); break;
case eCmdAllocRequest0: OnAlloc(socket, val); break;
case eCmdFree: OnFree(socket, val); break;
default: return false;
@@ -336,10 +336,28 @@
MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
{
LOG_DEBUG() << "center got proc init.";
+ auto pos = nodes_.find(head.ssn_id());
+ if (pos == nodes_.end()) {
+ return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised");
+ }
auto index = procs_.Put(head.proc_id(), head.ssn_id());
auto reply(MakeReply<MsgProcInitReply>(eSuccess));
reply.set_proc_index(index);
- return reply;
+
+ auto &node = pos->second;
+ try {
+ for (int i = 0; i < msg.extra_mq_num(); ++i) {
+ ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
+ node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
+ auto addr = reply.add_extra_mqs();
+ addr->set_mq_id(tmp.id());
+ addr->set_abs_addr(tmp.AbsAddr());
+ }
+ return reply;
+ } catch (...) {
+ LOG_ERROR() << "proc init create mq error";
+ return MakeReply<MsgProcInitReply>(eError, "Create mq failed.");
+ }
}
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -711,6 +729,10 @@
// now we can talk.
auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
auto ¢er = *center_ptr;
+ auto onInit = [&](const int64_t request) {
+ return center->OnNodeInit(socket, request);
+ };
+ BHCenterHandleInit(onInit);
center->OnTimer();
};
@@ -800,7 +822,8 @@
}
};
- auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2);
+ auto nsec = seconds(NodeTimeoutSec());
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
AddCenter(center_ptr);
for (auto &kv : Centers()) {
--
Gitblit v1.8.0