/*
|
* =====================================================================================
|
*
|
* Filename: center.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月30日 16时19分37秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "center.h"
|
#include "node_center.h"
|
#include <chrono>
|
|
using namespace std::chrono;
|
using namespace std::chrono_literals;
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
typedef BHCenter::MsgHandler Handler;
|
|
namespace
|
{
|
|
//TODO check proc_id
|
|
template <class Body, class OnMsg, class Replyer>
|
inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
|
{
|
if (head.route_size() != 1) { return; }
|
Body body;
|
if (msg.ParseBody(body)) {
|
replyer(onmsg(body));
|
}
|
}
|
|
Handler Combine(const Handler &h1, const Handler &h2)
|
{
|
return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
|
return h1(socket, msg, head) || h2(socket, msg, head);
|
};
|
}
|
template <class... H>
|
Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
|
{
|
return Combine(Combine(h0, h1), h2, rest...);
|
}
|
|
#define CASE_ON_MSG_TYPE(MsgTag) \
|
case kMsgType##MsgTag: \
|
Dispatch<Msg##MsgTag>( \
|
msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
|
return true;
|
|
auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er)
|
{
|
return [&](auto &&rep_body) {
|
auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
|
MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
|
MsgI msg;
|
if (msg.Make(reply_head, rep_body)) {
|
DEFER1(msg.Release(););
|
center->SendAllocMsg(socket, remote, msg);
|
}
|
};
|
}
|
|
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
|
{
|
// command
|
auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
|
auto ¢er = *center_ptr;
|
return IsCmd(cmd) && center->OnCommand(socket, cmd);
|
};
|
|
// now we can talk.
|
auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
|
auto ¢er = *center_ptr;
|
auto onInit = [&](const int64_t request) {
|
return center->OnNodeInit(socket, request);
|
};
|
BHCenterHandleInit(onInit);
|
center->OnTimer();
|
};
|
|
auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
|
auto ¢er = *center_ptr;
|
auto replyer = MakeReplyer(socket, head, center);
|
switch (head.type()) {
|
CASE_ON_MSG_TYPE(ProcInit);
|
CASE_ON_MSG_TYPE(Register);
|
CASE_ON_MSG_TYPE(Heartbeat);
|
CASE_ON_MSG_TYPE(Unregister);
|
|
CASE_ON_MSG_TYPE(RegisterRPC);
|
CASE_ON_MSG_TYPE(QueryTopic);
|
CASE_ON_MSG_TYPE(QueryProc);
|
default: return false;
|
}
|
};
|
BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
|
|
auto OnBusIdle = [=](ShmSocket &socket) {};
|
auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
|
auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
|
auto ¢er = *center_ptr;
|
auto replyer = MakeReplyer(socket, head, center);
|
auto OnPublish = [&]() {
|
MsgPublish pub;
|
NodeCenter::Clients clients;
|
MsgCommonReply reply;
|
if (head.route_size() != 1 || !msg.ParseBody(pub)) {
|
return;
|
} else if (!center->FindClients(head, pub, clients, reply)) {
|
replyer(reply);
|
} else {
|
replyer(MakeReply(eSuccess));
|
if (clients.empty()) { return; }
|
|
auto it = clients.begin();
|
do {
|
auto &cli = *it;
|
auto node = cli.weak_node_.lock();
|
if (node) {
|
// should also make sure that mq is not killed before msg expires.
|
// it would be ok if (kill_time - offline_time) is longer than expire time.
|
socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
|
++it;
|
} else {
|
it = clients.erase(it);
|
}
|
} while (it != clients.end());
|
}
|
};
|
switch (head.type()) {
|
CASE_ON_MSG_TYPE(Subscribe);
|
CASE_ON_MSG_TYPE(Unsubscribe);
|
case kMsgTypePublish: OnPublish(); return true;
|
default: return false;
|
}
|
};
|
|
BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
|
|
return true;
|
}
|
|
#undef CASE_ON_MSG_TYPE
|
|
} // namespace
|
|
BHCenter::CenterRecords &BHCenter::Centers()
|
{
|
static CenterRecords rec;
|
return rec;
|
}
|
|
bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len)
|
{
|
Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len};
|
return true;
|
}
|
|
BHCenter::BHCenter(Socket::Shm &shm)
|
{
|
auto gc = [&](const MQId id) {
|
auto r = ShmSocket::Remove(shm, id);
|
if (r) {
|
LOG_DEBUG() << "remove mq " << id << " ok\n";
|
}
|
};
|
|
auto nsec = NodeTimeoutSec();
|
auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
|
AddCenter(center_ptr);
|
|
for (auto &kv : Centers()) {
|
auto &info = kv.second;
|
sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
|
}
|
}
|
|
bool BHCenter::Start()
|
{
|
for (auto &kv : Centers()) {
|
auto &info = kv.second;
|
sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
|
}
|
|
return true;
|
}
|
|
bool BHCenter::Stop()
|
{
|
for (auto &kv : sockets_) {
|
kv.second->Stop();
|
}
|
return true;
|
}
|