/*
|
* =====================================================================================
|
*
|
* Filename: center.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月30日 16时19分37秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "center.h"
|
#include "center_topic_node.h"
|
#include "io_service.h"
|
#include "node_center.h"
|
#include "tcp_proxy.h"
|
#include "tcp_server.h"
|
#include <chrono>
|
|
using namespace std::chrono;
|
using namespace std::chrono_literals;
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
typedef BHCenter::MsgHandler Handler;
|
|
namespace
|
{
|
|
template <class Body, class OnMsg, class Replyer>
|
inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
|
{
|
if (head.route_size() != 1) { return; }
|
Body body;
|
if (msg.ParseBody(body)) {
|
replyer(onmsg(body));
|
}
|
}
|
|
#define CASE_ON_MSG_TYPE(MsgTag) \
|
case kMsgType##MsgTag: \
|
Dispatch<Msg##MsgTag>( \
|
msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
|
return true;
|
|
auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er)
|
{
|
return [&](auto &&rep_body) {
|
auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
|
MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
|
MsgI msg(socket.shm());
|
if (msg.Make(reply_head, rep_body)) {
|
DEFER1(msg.Release(););
|
center->SendAllocMsg(socket, remote, msg);
|
}
|
};
|
}
|
|
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
|
{
|
// command
|
auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
|
auto ¢er = *center_ptr;
|
return IsCmd(cmd) && center->OnCommand(socket, cmd);
|
};
|
|
// now we can talk.
|
auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
|
auto ¢er = *center_ptr;
|
auto onInit = [&](const int64_t request) {
|
return center->OnNodeInit(socket, request);
|
};
|
BHCenterHandleInit(socket.shm(), onInit);
|
center->OnTimer();
|
};
|
|
auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
|
auto ¢er = *center_ptr;
|
auto replyer = MakeReplyer(socket, head, center);
|
|
if (!head.dest().ip().empty()) { // other host, proxy
|
auto valid = [&]() { return head.route_size() == 1; };
|
if (!valid()) { return false; }
|
|
if (head.type() == kMsgTypeRequestTopic) {
|
typedef MsgRequestTopicReply Reply;
|
Reply reply;
|
if (!center->CheckMsg(head, reply)) {
|
replyer(reply);
|
} else {
|
auto onResult = [¢er](BHMsgHead &head, std::string body_content) {
|
if (head.route_size() > 0) {
|
auto &back = head.route(head.route_size() - 1);
|
MQInfo dest = {back.mq_id(), back.abs_addr()};
|
head.mutable_route()->RemoveLast();
|
center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
|
}
|
};
|
uint16_t port = head.dest().port();
|
if (port == 0) {
|
port = kBHCenterPort;
|
}
|
if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) {
|
replyer(MakeReply<Reply>(eError, "send request failed."));
|
} else {
|
// success
|
}
|
}
|
return true;
|
} else {
|
// ignore other msgs for now.
|
}
|
return false;
|
}
|
|
switch (head.type()) {
|
CASE_ON_MSG_TYPE(ProcInit);
|
CASE_ON_MSG_TYPE(Register);
|
CASE_ON_MSG_TYPE(Heartbeat);
|
CASE_ON_MSG_TYPE(Unregister);
|
|
CASE_ON_MSG_TYPE(RegisterRPC);
|
CASE_ON_MSG_TYPE(QueryTopic);
|
CASE_ON_MSG_TYPE(QueryProc);
|
default: return false;
|
}
|
};
|
BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
|
|
auto OnBusIdle = [=](ShmSocket &socket) {};
|
auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
|
auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
|
auto ¢er = *center_ptr;
|
auto replyer = MakeReplyer(socket, head, center);
|
auto OnPublish = [&]() {
|
MsgPublish pub;
|
if (head.route_size() == 1 && msg.ParseBody(pub)) {
|
// replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
|
auto reply(center->Publish(head, pub.topic(), msg));
|
replyer(reply);
|
auto hosts = center->FindRemoteSubClients(pub.topic());
|
for (auto &host : hosts) {
|
tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
|
}
|
}
|
};
|
switch (head.type()) {
|
CASE_ON_MSG_TYPE(Subscribe);
|
CASE_ON_MSG_TYPE(Unsubscribe);
|
case kMsgTypePublish: OnPublish(); return true;
|
default: return false;
|
}
|
};
|
|
BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
|
|
return true;
|
}
|
|
#undef CASE_ON_MSG_TYPE
|
|
} // namespace
|
|
BHCenter::CenterRecords &BHCenter::Centers()
|
{
|
static CenterRecords rec;
|
return rec;
|
}
|
|
bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len)
|
{
|
Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len};
|
return true;
|
}
|
|
BHCenter::BHCenter(Socket::Shm &shm)
|
{
|
auto nsec = NodeTimeoutSec();
|
auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
|
io_service_.reset(new IoService);
|
tcp_proxy_.reset(new TcpProxy(io_service_->io()));
|
|
AddCenter(center_ptr, shm, *tcp_proxy_);
|
|
for (auto &kv : Centers()) {
|
auto &info = kv.second;
|
sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
|
}
|
|
topic_node_.reset(new CenterTopicNode(center_ptr, shm));
|
tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
|
}
|
|
BHCenter::~BHCenter() { Stop(); }
|
|
bool BHCenter::Start()
|
{
|
for (auto &kv : Centers()) {
|
auto &info = kv.second;
|
sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
|
}
|
topic_node_->Start();
|
return true;
|
}
|
|
bool BHCenter::Stop()
|
{
|
tcp_proxy_.reset();
|
tcp_server_.reset();
|
io_service_.reset();
|
topic_node_->Stop();
|
for (auto &kv : sockets_) {
|
kv.second->Stop();
|
}
|
return true;
|
}
|