From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 17:02:21 +0800
Subject: [PATCH] remove sync recv, node cache msgs for sync recv.
---
box/center.cpp | 160 ++++++++++++++++++++++++++++++++--------------------
1 files changed, 98 insertions(+), 62 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 445d307..9ecd04b 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -98,24 +98,25 @@
if (now < time_to_clean_) {
return;
}
- LOG_FUNCTION;
+ // LOG_FUNCTION;
time_to_clean_ = now + 1;
int64_t limit = std::max(10000ul, msgs_.size() / 10);
int64_t n = 0;
auto it = msgs_.begin();
while (it != msgs_.end() && --limit > 0) {
ShmMsg msg(it->second);
- if (msg.Count() == 0) {
+ auto Free = [&]() {
msg.Free();
it = msgs_.erase(it);
++n;
- } else if (msg.timestamp() + 10 < NowSec()) {
- msg.Free();
- it = msgs_.erase(it);
- ++n;
- // LOG_DEBUG() << "release timeout msg, someone crashed.";
- } else {
+ };
+ int n = now - msg.timestamp();
+ if (n < 10) {
++it;
+ } else if (msg.Count() == 0) {
+ Free();
+ } else if (n > 60) {
+ Free();
}
}
if (n > 0) {
@@ -181,22 +182,24 @@
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
struct NodeInfo {
- ProcState state_; // state
- std::set<Address> addrs_; // registered mqs
- ProcInfo proc_; //
- AddressTopics services_; // address: topics
- AddressTopics subscriptions_; // address: topics
+ ProcState state_; // state
+ std::map<MQId, int64_t> addrs_; // registered mqs
+ ProcInfo proc_; //
+ AddressTopics services_; // address: topics
+ AddressTopics subscriptions_; // address: topics
};
typedef std::shared_ptr<NodeInfo> Node;
typedef std::weak_ptr<NodeInfo> WeakNode;
struct TopicDest {
- Address mq_;
+ MQId mq_id_;
+ int64_t mq_abs_addr_;
WeakNode weak_node_;
- bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
+ bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; }
};
inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
- inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
+ inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); }
+ inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {}
@@ -209,65 +212,65 @@
// center name, no relative to shm.
const std::string &id() const { return id_; }
- void OnNodeInit(ShmSocket &socket, const int64_t val)
+ int64_t OnNodeInit(ShmSocket &socket, const int64_t val)
{
LOG_FUNCTION;
SharedMemory &shm = socket.shm();
- MQId ssn = (val >> 4) & MaskBits(60);
+ MQId ssn = (val >> 4) & MaskBits(56);
+ int reply = EncodeCmd(eCmdNodeInitReply);
+
if (nodes_.find(ssn) != nodes_.end()) {
- return; // ignore in exists.
+ return reply; // ignore if exists.
}
+
auto UpdateRegInfo = [&](Node &node) {
- for (int i = 0; i < 10; ++i) {
- node->addrs_.insert(ssn + i);
- }
node->state_.timestamp_ = NowSec() - offline_time_;
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
// create sockets.
try {
- auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
- // alloc(-1), node, server, sub, request,
- for (int i = 0; i < 4; ++i) {
- CreateSocket(ssn + i);
- node->addrs_.insert(ssn + i);
- }
+ ShmSocket tmp(shm, true, ssn, 16);
+ node->addrs_.emplace(ssn, tmp.AbsAddr());
return true;
} catch (...) {
return false;
}
};
- auto PrepareProcInit = [&]() {
+ auto PrepareProcInit = [&](Node &node) {
bool r = false;
ShmMsg init_msg;
- if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
- // 31bit pointer, 4bit cmd+flag
- int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
- r = SendAllocReply(socket, ssn, reply, init_msg);
- }
- return r;
+ DEFER1(init_msg.Release());
+ MsgProcInit body;
+ auto head = InitMsgHead(GetType(body), id(), ssn);
+ return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) &&
+ init_msg.Fill(ShmMsg::Serialize(head, body)) &&
+ SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
};
Node node(new NodeInfo);
- if (UpdateRegInfo(node) && PrepareProcInit()) {
+ if (UpdateRegInfo(node) && PrepareProcInit(node)) {
+ reply |= (node->addrs_[ssn] << 4);
nodes_[ssn] = node;
LOG_INFO() << "new node ssn (" << ssn << ") init";
} else {
- for (int i = 0; i < 10; ++i) {
- ShmSocket::Remove(shm, ssn + i);
- }
+ ShmSocket::Remove(shm, ssn);
}
+ return reply;
}
- void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
+ void RecordMsg(const MsgI &msg)
+ {
+ msg.reset_managed(true);
+ msgs_.RecordMsg(msg);
+ }
- bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
+ bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
{
RecordMsg(msg);
auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
return socket.Send(dest, reply, onExpireFree);
}
- bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
+ bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg)
{
RecordMsg(msg);
return socket.Send(dest, msg);
@@ -284,7 +287,21 @@
if (proc_rec.proc_.empty()) {
return;
}
- Address dest = proc_rec.ssn_ + socket_index;
+
+ MQInfo dest = {proc_rec.ssn_ + socket_index, 0};
+ auto FindMq = [&]() {
+ auto pos = nodes_.find(proc_rec.ssn_);
+ if (pos != nodes_.end()) {
+ for (auto &&mq : pos->second->addrs_) {
+ if (mq.first == dest.id_) {
+ dest.offset_ = mq.second;
+ return true;
+ }
+ }
+ }
+ return false;
+ };
+ if (!FindMq()) { return; }
auto size = GetAllocSize((val >> 52) & MaskBits(8));
MsgI new_msg;
@@ -309,7 +326,6 @@
assert(IsCmd(val));
int cmd = DecodeCmd(val);
switch (cmd) {
- case eCmdNodeInit: OnNodeInit(socket, val); break;
case eCmdAllocRequest0: OnAlloc(socket, val); break;
case eCmdFree: OnFree(socket, val); break;
default: return false;
@@ -320,10 +336,28 @@
MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
{
LOG_DEBUG() << "center got proc init.";
+ auto pos = nodes_.find(head.ssn_id());
+ if (pos == nodes_.end()) {
+ return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised");
+ }
auto index = procs_.Put(head.proc_id(), head.ssn_id());
auto reply(MakeReply<MsgProcInitReply>(eSuccess));
reply.set_proc_index(index);
- return reply;
+
+ auto &node = pos->second;
+ try {
+ for (int i = 0; i < msg.extra_mq_num(); ++i) {
+ ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
+ node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
+ auto addr = reply.add_extra_mqs();
+ addr->set_mq_id(tmp.id());
+ addr->set_abs_addr(tmp.AbsAddr());
+ }
+ return reply;
+ } catch (...) {
+ LOG_ERROR() << "proc init create mq error";
+ return MakeReply<MsgProcInitReply>(eError, "Create mq failed.");
+ }
}
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -337,10 +371,6 @@
// when node restart, ssn will change,
// and old node will be removed after timeout.
auto UpdateRegInfo = [&](Node &node) {
- node->addrs_.insert(SrcAddr(head));
- for (auto &addr : msg.addrs()) {
- node->addrs_.insert(addr.mq_id());
- }
node->proc_.Swap(msg.mutable_proc());
node->state_.timestamp_ = head.timestamp();
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
@@ -420,11 +450,11 @@
auto src = SrcAddr(head);
auto &topics = msg.topics().topic_list();
node->services_[src].insert(topics.begin(), topics.end());
- TopicDest dest = {src, node};
+ TopicDest dest = {src, SrcAbsAddr(head), node};
for (auto &topic : topics) {
service_map_[topic].insert(dest);
}
- LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << *node->addrs_.begin() << " serve " << topics.size() << " topics:\n";
+ LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n";
for (auto &topic : topics) {
LOG_DEBUG() << "\t" << topic;
}
@@ -464,7 +494,8 @@
if (dest_node && Valid(*dest_node)) {
auto node_addr = reply.add_node_address();
node_addr->set_proc_id(dest_node->proc_.proc_id());
- node_addr->mutable_addr()->set_mq_id(dest.mq_);
+ node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+ node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
}
}
return reply;
@@ -482,7 +513,7 @@
auto src = SrcAddr(head);
auto &topics = msg.topics().topic_list();
node->subscriptions_[src].insert(topics.begin(), topics.end());
- TopicDest dest = {src, node};
+ TopicDest dest = {src, SrcAbsAddr(head), node};
for (auto &topic : topics) {
subscribe_map_[topic].insert(dest);
}
@@ -505,7 +536,7 @@
};
if (pos != node->subscriptions_.end()) {
- const TopicDest &dest = {src, node};
+ const TopicDest &dest = {src, SrcAbsAddr(head), node};
auto &topics = msg.topics().topic_list();
// clear node sub records;
for (auto &topic : topics) {
@@ -602,7 +633,7 @@
{
auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
for (auto &addr_topics : node_rec) {
- TopicDest dest{addr_topics.first, node};
+ TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used.
for (auto &topic : addr_topics.second) {
auto pos = rec_map.find(topic);
if (pos != rec_map.end()) {
@@ -626,7 +657,7 @@
}
for (auto &addr : node->addrs_) {
- cleaner_(addr);
+ cleaner_(addr.first);
}
node->addrs_.clear();
@@ -678,7 +709,7 @@
{
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
- auto remote = head.route(0).mq_id();
+ MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
MsgI msg;
if (msg.Make(reply_head, rep_body)) {
DEFER1(msg.Release(););
@@ -698,6 +729,10 @@
// now we can talk.
auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
auto ¢er = *center_ptr;
+ auto onInit = [&](const int64_t request) {
+ return center->OnNodeInit(socket, request);
+ };
+ BHCenterHandleInit(onInit);
center->OnTimer();
};
@@ -741,7 +776,7 @@
if (node) {
// should also make sure that mq is not killed before msg expires.
// it would be ok if (kill_time - offline_time) is longer than expire time.
- socket.Send(cli.mq_, msg);
+ socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
++it;
} else {
it = clients.erase(it);
@@ -772,9 +807,9 @@
return rec;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len)
{
- Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
+ Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len};
return true;
}
@@ -787,12 +822,13 @@
}
};
- auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2);
+ auto nsec = seconds(NodeTimeoutSec());
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
AddCenter(center_ptr);
for (auto &kv : Centers()) {
auto &info = kv.second;
- sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
+ sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
}
}
--
Gitblit v1.8.0