/*
|
* =====================================================================================
|
*
|
* Filename: node_center.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年05月20日 11时32分55秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "node_center.h"
|
#include "json.h"
|
#include "log.h"
|
|
using ssjson::Json;
|
|
namespace
|
{
|
std::string Join(const std::string &parent, const std::string &child)
|
{
|
return parent + kTopicSep + child;
|
}
|
const std::string kTopicCenterRoot = "#center";
|
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
|
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
|
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
|
const std::string kTopicNodeService = Join(kTopicNode, "service");
|
const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
|
const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
|
} // namespace
|
|
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
|
{
|
if (procs_.size() >= kMaxProcs) {
|
return -1;
|
}
|
auto pos_isnew = proc_index_.emplace(proc_id, procs_.size());
|
int index = pos_isnew.first->second;
|
if (pos_isnew.second) {
|
procs_.emplace_back(ProcRec{proc_id, ssn});
|
} else { // update ssn
|
procs_[index].ssn_ = ssn;
|
}
|
return index;
|
}
|
const ProcRecords::ProcRec &ProcRecords::Get(const ProcIndex index) const
|
{
|
static ProcRec empty_rec;
|
return (index < procs_.size()) ? procs_[index] : empty_rec;
|
}
|
|
void MsgRecords::FreeMsg(MsgId id)
|
{
|
auto pos = msgs_.find(id);
|
if (pos != msgs_.end()) {
|
pos->second.Free();
|
msgs_.erase(pos);
|
} else {
|
LOG_TRACE() << "ignore late free request.";
|
}
|
}
|
void MsgRecords::AutoRemove()
|
{
|
auto now = NowSec();
|
if (now < time_to_clean_) {
|
return;
|
}
|
// LOG_FUNCTION;
|
const size_t total = msgs_.size();
|
time_to_clean_ = now + 1;
|
int64_t limit = std::max(10000ul, total / 10);
|
int64_t n = 0;
|
auto it = msgs_.begin();
|
while (it != msgs_.end() && --limit > 0) {
|
ShmMsg msg(it->second);
|
auto Free = [&]() {
|
msg.Free();
|
it = msgs_.erase(it);
|
++n;
|
};
|
int n = now - msg.timestamp();
|
if (msg.Count() == 0) {
|
Free();
|
} else if (n > NodeTimeoutSec()) {
|
Free();
|
} else {
|
++it;
|
}
|
}
|
if (n > 0) {
|
LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
|
}
|
}
|
|
void MsgRecords::DebugPrint() const
|
{
|
LOG_TRACE() << "msgs : " << size();
|
int i = 0;
|
int total_count = 0;
|
for (auto &kv : msgs_) {
|
auto &msg = kv.second;
|
total_count += msg.Count();
|
LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
|
}
|
LOG_TRACE() << "total count: " << total_count;
|
}
|
|
// NodeCenter::ProcState
|
void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time)
|
{
|
state_.timestamp_ = NowSec() - offline_time;
|
state_.flag_ = kStateOffline;
|
center_.Notify(kTopicNodeOffline, *this);
|
}
|
|
void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
|
{
|
if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
|
Json json;
|
json.put("proc_id", node.proc_.proc_id());
|
Publish(node.shm_, topic, json.dump());
|
}
|
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
|
{
|
auto old = state_.flag_;
|
auto diff = now - state_.timestamp_;
|
|
LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
|
if (diff < offline_time) {
|
state_.flag_ = kStateNormal;
|
if (old != state_.flag_) {
|
center_.Notify(kTopicNodeOnline, *this);
|
}
|
} else if (diff < kill_time) {
|
state_.flag_ = kStateOffline;
|
if (old != state_.flag_) {
|
center_.Notify(kTopicNodeOffline, *this);
|
}
|
} else {
|
state_.flag_ = kStateKillme;
|
}
|
}
|
|
int64_t NodeCenter::OnNodeInit(ShmSocket &socket, const int64_t val)
|
{
|
LOG_FUNCTION;
|
SharedMemory &shm = socket.shm();
|
MQId ssn = (val >> 4) & MaskBits(56);
|
int reply = EncodeCmd(eCmdNodeInitReply);
|
|
if (nodes_.find(ssn) != nodes_.end()) {
|
return reply; // ignore if exists.
|
}
|
|
auto UpdateRegInfo = [&](Node &node) {
|
node->state_.timestamp_ = NowSec() - offline_time_;
|
node->UpdateState(NowSec(), offline_time_, kill_time_);
|
|
// create sockets.
|
try {
|
ShmSocket tmp(shm, ssn, eCreate);
|
node->addrs_.emplace(ssn, tmp.AbsAddr());
|
return true;
|
} catch (...) {
|
return false;
|
}
|
};
|
|
auto PrepareProcInit = [&](Node &node) {
|
bool r = false;
|
ShmMsg init_msg(shm);
|
DEFER1(init_msg.Release());
|
MsgProcInit body;
|
auto head = InitMsgHead(GetType(body), id(), ssn);
|
return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) &&
|
init_msg.Fill(ShmMsg::Serialize(head, body)) &&
|
SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
|
};
|
|
Node node(new NodeInfo(*this, shm));
|
if (UpdateRegInfo(node) && PrepareProcInit(node)) {
|
reply |= (node->addrs_[ssn] << 4);
|
nodes_[ssn] = node;
|
LOG_INFO() << "new node ssn (" << ssn << ") init";
|
} else {
|
ShmSocket::Remove(shm, ssn);
|
}
|
return reply;
|
}
|
void NodeCenter::RecordMsg(const MsgI &msg)
|
{
|
msg.reset_managed(true);
|
msgs_.RecordMsg(msg);
|
}
|
|
bool NodeCenter::SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
|
{
|
RecordMsg(msg);
|
auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
|
return socket.Send(dest, reply, onExpireFree);
|
}
|
bool NodeCenter::SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg)
|
{
|
RecordMsg(msg);
|
return socket.Send(dest, msg);
|
}
|
|
NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
|
{
|
Node node;
|
auto ssn = mq_id - (mq_id % 10);
|
auto pos = nodes_.find(ssn);
|
if (pos != nodes_.end()) {
|
node = pos->second;
|
}
|
return node;
|
}
|
|
bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
|
{
|
Node node;
|
|
auto FindDest = [&]() {
|
auto pos = service_map_.find(head.topic());
|
if (pos != service_map_.end() && !pos->second.empty()) {
|
auto &clients = pos->second;
|
for (auto &cli : clients) {
|
node = cli.weak_node_.lock();
|
if (node && Valid(*node)) {
|
dest.id_ = cli.mq_id_;
|
dest.offset_ = cli.mq_abs_addr_;
|
return true;
|
}
|
}
|
}
|
return false;
|
};
|
|
if (dest.id_ == 0) {
|
if (!FindDest()) {
|
LOG_ERROR() << id() << " pass remote request, topic dest not found.";
|
return false;
|
}
|
} else {
|
node = GetNode(dest.id_);
|
if (!node || !Valid(*node)) {
|
LOG_ERROR() << id() << " pass remote request, dest not found.";
|
return false;
|
}
|
}
|
|
ShmSocket &sender(DefaultSender(node->shm_));
|
auto route = head.add_route();
|
route->set_mq_id(sender.id());
|
route->set_abs_addr(sender.AbsAddr());
|
|
ShmMsg msg(node->shm_);
|
if (!msg.Make(head, body_content)) { return false; }
|
DEFER1(msg.Release(););
|
RecordMsg(msg);
|
return sender.Send(dest, msg, head.msg_id(), std::move(cb));
|
}
|
|
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
|
{
|
// LOG_FUNCTION;
|
auto &topic = head.topic();
|
auto clients = DoFindClients(topic, true);
|
if (clients.empty()) { return true; }
|
|
std::vector<MsgI> msgs;
|
auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
|
DEFER1(ReleaseAll(););
|
|
for (auto &cli : clients) {
|
auto Send1 = [&](Node node) {
|
auto &shm = node->shm_;
|
for (auto &msg : msgs) {
|
if (msg.shm().name() == shm.name()) {
|
DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
|
return;
|
}
|
}
|
MsgI msg(shm);
|
if (msg.Make(head, body_content)) {
|
RecordMsg(msg);
|
msgs.push_back(msg);
|
// LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
|
DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
|
}
|
};
|
auto node = cli.weak_node_.lock();
|
if (node) {
|
Send1(node);
|
// should also make sure that mq is not killed before msg expires.
|
// it would be ok if (kill_time - offline_time) is longer than expire time.
|
}
|
}
|
|
return true;
|
}
|
|
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
|
{
|
Node node(GetNode(dest.id_));
|
if (!node) {
|
LOG_ERROR() << id() << " pass remote reply , ssn not found.";
|
return false;
|
}
|
auto offset = node->addrs_[dest.id_];
|
if (offset != dest.offset_) {
|
LOG_ERROR() << id() << " pass remote reply, dest address not match";
|
return false;
|
}
|
|
ShmMsg msg(node->shm_);
|
if (!msg.Make(head, body_content)) { return false; }
|
DEFER1(msg.Release(););
|
RecordMsg(msg);
|
return DefaultSender(node->shm_).Send(dest, msg);
|
}
|
|
void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
|
{
|
// LOG_FUNCTION;
|
// 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
|
int64_t msg_id = (val >> 4) & MaskBits(28);
|
int proc_index = (val >> 32) & MaskBits(16);
|
int socket_index = ((val) >> 48) & MaskBits(4);
|
auto proc_rec(procs_.Get(proc_index));
|
if (proc_rec.proc_.empty()) {
|
return;
|
}
|
|
MQInfo dest = {proc_rec.ssn_ + socket_index, 0};
|
auto FindMq = [&]() {
|
auto pos = nodes_.find(proc_rec.ssn_);
|
if (pos != nodes_.end()) {
|
for (auto &&mq : pos->second->addrs_) {
|
if (mq.first == dest.id_) {
|
dest.offset_ = mq.second;
|
return true;
|
}
|
}
|
}
|
return false;
|
};
|
if (!FindMq()) { return; }
|
|
auto size = GetAllocSize((val >> 52) & MaskBits(8));
|
MsgI new_msg(socket.shm());
|
if (new_msg.Make(size)) {
|
// 31bit proc index, 28bit id, ,4bit cmd+flag
|
int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
|
SendAllocReply(socket, dest, reply, new_msg);
|
} else {
|
int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure.
|
socket.Send(dest, reply);
|
}
|
}
|
|
void NodeCenter::OnFree(ShmSocket &socket, const int64_t val)
|
{
|
int64_t msg_id = (val >> 4) & MaskBits(31);
|
msgs_.FreeMsg(msg_id);
|
}
|
|
bool NodeCenter::OnCommand(ShmSocket &socket, const int64_t val)
|
{
|
assert(IsCmd(val));
|
int cmd = DecodeCmd(val);
|
switch (cmd) {
|
case eCmdAllocRequest0: OnAlloc(socket, val); break;
|
case eCmdFree: OnFree(socket, val); break;
|
default: return false;
|
}
|
return true;
|
}
|
|
MsgProcInitReply NodeCenter::ProcInit(const BHMsgHead &head, MsgProcInit &msg)
|
{
|
LOG_DEBUG() << "center got proc init.";
|
auto pos = nodes_.find(head.ssn_id());
|
if (pos == nodes_.end()) {
|
return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised");
|
}
|
auto index = procs_.Put(head.proc_id(), head.ssn_id());
|
auto reply(MakeReply<MsgProcInitReply>(eSuccess));
|
reply.set_proc_index(index);
|
|
auto &node = pos->second;
|
try {
|
for (int i = 0; i < msg.extra_mq_num(); ++i) {
|
ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
|
node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
|
auto addr = reply.add_extra_mqs();
|
addr->set_mq_id(tmp.id());
|
addr->set_abs_addr(tmp.AbsAddr());
|
}
|
return reply;
|
} catch (...) {
|
LOG_ERROR() << "proc init create mq error";
|
return MakeReply<MsgProcInitReply>(eError, "Create mq failed.");
|
}
|
}
|
|
MsgCommonReply NodeCenter::Register(const BHMsgHead &head, MsgRegister &msg)
|
{
|
if (msg.proc().proc_id() != head.proc_id()) {
|
return MakeReply(eInvalidInput, "invalid proc id.");
|
}
|
|
try {
|
MQId ssn = head.ssn_id();
|
// when node restart, ssn will change,
|
// and old node will be removed after timeout.
|
auto pos = nodes_.find(ssn);
|
if (pos == nodes_.end()) {
|
return MakeReply(eInvalidInput, "invalid session.");
|
}
|
|
// try to remove old session
|
auto old = online_node_addr_map_.find(head.proc_id());
|
if (old != online_node_addr_map_.end()) { // old session
|
auto &old_ssn = old->second;
|
if (old_ssn != ssn) {
|
nodes_[old_ssn]->PutOffline(offline_time_);
|
|
LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline";
|
old_ssn = ssn;
|
}
|
} else {
|
online_node_addr_map_.emplace(head.proc_id(), ssn);
|
}
|
|
// update proc info
|
Node &node = pos->second;
|
node->proc_.Swap(msg.mutable_proc());
|
node->state_.timestamp_ = head.timestamp();
|
node->UpdateState(NowSec(), offline_time_, kill_time_);
|
|
LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
|
|
return MakeReply(eSuccess);
|
} catch (...) {
|
return MakeReply(eError, "register node error.");
|
}
|
}
|
|
MsgCommonReply NodeCenter::Unregister(const BHMsgHead &head, MsgUnregister &msg)
|
{
|
return HandleMsg(
|
head, [&](Node node) -> MsgCommonReply {
|
NodeInfo &ni = *node;
|
ni.PutOffline(offline_time_);
|
return MakeReply(eSuccess);
|
});
|
}
|
|
MsgCommonReply NodeCenter::RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
|
{
|
return HandleMsg(
|
head, [&](Node node) -> MsgCommonReply {
|
auto src = SrcAddr(head);
|
auto &topics = msg.topics().topic_list();
|
node->services_[src].insert(topics.begin(), topics.end());
|
TopicDest dest = {src, SrcAbsAddr(head), node};
|
for (auto &topic : topics) {
|
service_map_[topic].insert(dest);
|
}
|
LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n";
|
for (auto &topic : topics) {
|
LOG_DEBUG() << "\t" << topic;
|
}
|
Notify(kTopicNodeService, *node);
|
return MakeReply(eSuccess);
|
});
|
}
|
|
MsgCommonReply NodeCenter::Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg)
|
{
|
return HandleMsg(head, [&](Node node) {
|
NodeInfo &ni = *node;
|
ni.state_.timestamp_ = head.timestamp();
|
ni.UpdateState(NowSec(), offline_time_, kill_time_);
|
|
auto &info = msg.proc();
|
if (!info.public_info().empty()) {
|
ni.proc_.set_public_info(info.public_info());
|
}
|
if (!info.private_info().empty()) {
|
ni.proc_.set_private_info(info.private_info());
|
}
|
return MakeReply(eSuccess);
|
});
|
}
|
|
MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id)
|
{
|
typedef MsgQueryProcReply Reply;
|
auto Add1 = [](Reply &reply, Node node) {
|
auto info = reply.add_proc_list();
|
*info->mutable_proc() = node->proc_;
|
info->mutable_proc()->clear_private_info();
|
info->set_online(node->state_.flag_ == kStateNormal);
|
auto AddTopics = [](auto &dst, auto &src) {
|
for (auto &addr_topics : src) {
|
for (auto &topic : addr_topics.second) {
|
dst.add_topic_list(topic);
|
}
|
}
|
};
|
AddTopics(*info->mutable_service(), node->services_);
|
AddTopics(*info->mutable_local_sub(), node->local_sub_);
|
AddTopics(*info->mutable_net_sub(), node->net_sub_);
|
};
|
|
if (!proc_id.empty()) {
|
auto pos = online_node_addr_map_.find(proc_id);
|
if (pos == online_node_addr_map_.end()) {
|
return MakeReply<Reply>(eNotFound, "proc not found.");
|
} else {
|
auto node_pos = nodes_.find(pos->second);
|
if (node_pos == nodes_.end()) {
|
return MakeReply<Reply>(eNotFound, "proc node not found.");
|
} else {
|
auto reply = MakeReply<Reply>(eSuccess);
|
Add1(reply, node_pos->second);
|
return reply;
|
}
|
}
|
} else {
|
Reply reply(MakeReply<Reply>(eSuccess));
|
for (auto &kv : nodes_) {
|
Add1(reply, kv.second);
|
}
|
return reply;
|
}
|
}
|
MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
|
{
|
typedef MsgQueryProcReply Reply;
|
auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); };
|
return HandleMsg<Reply>(head, query);
|
}
|
|
MsgQueryTopicReply NodeCenter::QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
|
{
|
typedef MsgQueryTopicReply Reply;
|
|
auto query = [&](Node self) -> Reply {
|
Reply reply = MakeReply<Reply>(eSuccess);
|
auto local = [&]() {
|
auto pos = service_map_.find(req.topic());
|
if (pos != service_map_.end() && !pos->second.empty()) {
|
auto &clients = pos->second;
|
for (auto &dest : clients) {
|
Node dest_node(dest.weak_node_.lock());
|
if (dest_node && Valid(*dest_node)) {
|
auto node_addr = reply.add_node_address();
|
node_addr->set_proc_id(dest_node->proc_.proc_id());
|
node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
|
node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
|
}
|
}
|
return true;
|
} else {
|
return false;
|
}
|
};
|
auto net = [&]() {
|
auto hosts(FindRemoteRPCServers(req.topic()));
|
if (hosts.empty()) {
|
return false;
|
} else {
|
for (auto &ip : hosts) {
|
auto node_addr = reply.add_node_address();
|
node_addr->mutable_addr()->set_ip(ip);
|
}
|
return true;
|
}
|
};
|
local();
|
net();
|
if (reply.node_address_size() == 0) {
|
return MakeReply<Reply>(eNotFound, "topic server not found.");
|
} else {
|
return reply;
|
}
|
};
|
|
return HandleMsg<Reply>(head, query);
|
}
|
|
void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
|
{
|
auto src = SrcAddr(head);
|
auto Sub = [&](auto &sub, auto &sub_map) {
|
auto &topics = msg.topics().topic_list();
|
sub[src].insert(topics.begin(), topics.end());
|
const TopicDest &dest = {src, SrcAbsAddr(head), node};
|
for (auto &topic : topics) {
|
sub_map[topic].insert(dest);
|
}
|
};
|
if (msg.network()) {
|
Sub(net_sub_, center_.net_sub_map_);
|
center_.Notify(kTopicNodeSub, *this);
|
} else {
|
Sub(local_sub_, center_.local_sub_map_);
|
}
|
}
|
|
MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
|
{
|
return HandleMsg(head, [&](Node node) {
|
node->Subscribe(head, msg, node);
|
return MakeReply(eSuccess);
|
});
|
}
|
|
void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
|
{
|
auto src = SrcAddr(head);
|
|
auto Unsub = [&](auto &sub, auto &sub_map) {
|
auto pos = sub.find(src);
|
|
auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
|
auto pos = sub_map.find(topic);
|
if (pos != sub_map.end() &&
|
pos->second.erase(dest) != 0 &&
|
pos->second.empty()) {
|
sub_map.erase(pos);
|
}
|
};
|
|
if (pos != sub.end()) {
|
const TopicDest &dest = {src, SrcAbsAddr(head), node};
|
auto &topics = msg.topics().topic_list();
|
// clear node sub records;
|
for (auto &topic : topics) {
|
pos->second.erase(topic);
|
RemoveSubTopicDestRecord(topic, dest);
|
}
|
if (pos->second.empty()) {
|
sub.erase(pos);
|
}
|
}
|
};
|
if (msg.network()) {
|
Unsub(net_sub_, center_.net_sub_map_);
|
center_.Notify(kTopicNodeUnsub, *this);
|
} else {
|
Unsub(local_sub_, center_.local_sub_map_);
|
}
|
}
|
|
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
|
{
|
return HandleMsg(head, [&](Node node) {
|
node->Unsubscribe(head, msg, node);
|
return MakeReply(eSuccess);
|
});
|
}
|
|
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
|
{
|
// LOG_FUNCTION;
|
Clients dests;
|
auto Find1 = [&](const std::string &exact) {
|
auto FindIn = [&](auto &sub_map) {
|
auto pos = sub_map.find(exact);
|
if (pos != sub_map.end()) {
|
auto &clients = pos->second;
|
for (auto &cli : clients) {
|
auto node = cli.weak_node_.lock();
|
if (node) {
|
if (node->state_.flag_ == kStateNormal)
|
dests.insert(cli);
|
}
|
|
// if (Valid(cli.weak_node_)) {
|
// dests.insert(cli);
|
// }
|
}
|
}
|
};
|
if (!from_remote) {
|
FindIn(local_sub_map_);
|
// LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
|
}
|
// net subscripitions also work in local mode.
|
FindIn(net_sub_map_);
|
// LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
|
};
|
Find1(topic);
|
|
size_t pos = 0;
|
while (true) {
|
pos = topic.find(kTopicSep, pos);
|
if (pos == topic.npos || ++pos == topic.size()) {
|
// Find1(std::string()); // sub all.
|
break;
|
} else {
|
Find1(topic.substr(0, pos - 1));
|
}
|
}
|
return dests;
|
}
|
|
MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
|
{
|
return HandleMsg(head, [&](Node node) {
|
DoPublish(DefaultSender(node->shm_), topic, msg);
|
return MakeReply(eSuccess);
|
});
|
}
|
|
void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
|
{
|
try {
|
auto clients = DoFindClients(topic, false);
|
if (clients.empty()) { return; }
|
|
for (auto &cli : clients) {
|
auto node = cli.weak_node_.lock();
|
if (node) {
|
// should also make sure that mq is not killed before msg expires.
|
// it would be ok if (kill_time - offline_time) is longer than expire time.
|
sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
|
}
|
}
|
} catch (...) {
|
LOG_ERROR() << "DoPublish error.";
|
}
|
}
|
|
void NodeCenter::OnTimer()
|
{
|
CheckNodes();
|
msgs_.AutoRemove();
|
}
|
|
void NodeCenter::CheckNodes()
|
{
|
auto now = NowSec();
|
if (now <= last_check_time_) { return; }
|
last_check_time_ = now;
|
|
auto it = nodes_.begin();
|
while (it != nodes_.end()) {
|
auto &cli = *it->second;
|
cli.UpdateState(now, offline_time_, kill_time_);
|
if (cli.state_.flag_ == kStateKillme) {
|
RemoveNode(it->second);
|
it = nodes_.erase(it);
|
} else {
|
++it;
|
}
|
}
|
msgs_.DebugPrint();
|
}
|
|
void NodeCenter::RemoveNode(Node &node)
|
{
|
auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
|
for (auto &addr_topics : node_rec) {
|
TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used.
|
for (auto &topic : addr_topics.second) {
|
auto pos = rec_map.find(topic);
|
if (pos != rec_map.end()) {
|
pos->second.erase(dest);
|
if (pos->second.empty()) {
|
rec_map.erase(pos);
|
}
|
}
|
}
|
}
|
};
|
EraseMapRec(service_map_, node->services_);
|
EraseMapRec(local_sub_map_, node->local_sub_);
|
EraseMapRec(net_sub_map_, node->net_sub_);
|
|
// remove online record.
|
auto pos = online_node_addr_map_.find(node->proc_.proc_id());
|
if (pos != online_node_addr_map_.end()) {
|
if (node->addrs_.find(pos->second) != node->addrs_.end()) {
|
online_node_addr_map_.erase(pos);
|
}
|
}
|
|
for (auto &addr : node->addrs_) {
|
auto &id = addr.first;
|
auto r = ShmSocket::Remove(node->shm_, id);
|
LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
|
}
|
|
node->addrs_.clear();
|
}
|
|
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
|
{
|
try {
|
MsgPublish pub;
|
pub.set_topic(topic);
|
pub.set_data(content);
|
BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
|
MsgI msg(shm);
|
if (msg.Make(head, pub)) {
|
DEFER1(msg.Release());
|
RecordMsg(msg);
|
DoPublish(DefaultSender(shm), topic, msg);
|
}
|
|
} catch (...) {
|
LOG_ERROR() << "center publish error.";
|
}
|
}
|
|
void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
|
{
|
// LOG_FUNCTION;
|
sub_hosts_.clear();
|
rpc_hosts_.clear();
|
for (auto &host : info.array()) {
|
if (host.get("isLocal", false)) {
|
host_id_ = host.get("serverId", "");
|
ip_ = host.get("ip", "");
|
} else {
|
auto ip = host.get("ip", "");
|
auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
|
for (auto &topic : lot) {
|
auto t = topic.get_value<std::string>();
|
rec[t].insert(ip);
|
// LOG_DEBUG() << "net topic: " << t << ", " << ip;
|
}
|
};
|
// LOG_DEBUG() << "serives:";
|
UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
|
// LOG_DEBUG() << "net sub:";
|
UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
|
}
|
}
|
}
|