/*
|
* =====================================================================================
|
*
|
* Filename: node_center.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年05月20日 11时33分06秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef NODE_CENTER_KY67RJ1Q
|
#define NODE_CENTER_KY67RJ1Q
|
|
#include "shm_socket.h"
|
#include <unordered_map>
|
|
typedef std::string ProcId;
|
typedef size_t ProcIndex; // max local procs.
|
const int kMaxProcs = 65536;
|
|
// record all procs ever registered, always grow, never remove.
|
// mainly for node to request msg allocation.
|
// use index instead of MQId to save some bits.
|
class ProcRecords
|
{
|
public:
|
struct ProcRec {
|
ProcId proc_;
|
MQId ssn_ = 0;
|
};
|
|
ProcRecords() { procs_.reserve(kMaxProcs); }
|
ProcIndex Put(const ProcId &proc_id, const MQId ssn);
|
const ProcRec &Get(const ProcIndex index) const;
|
|
private:
|
std::unordered_map<ProcId, size_t> proc_index_;
|
std::vector<ProcRec> procs_;
|
};
|
|
class MsgRecords
|
{
|
typedef int64_t MsgId;
|
|
public:
|
void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
|
void FreeMsg(MsgId id);
|
void AutoRemove();
|
size_t size() const { return msgs_.size(); }
|
void DebugPrint() const;
|
|
private:
|
std::unordered_map<MsgId, MsgI> msgs_;
|
int64_t time_to_clean_ = 0;
|
};
|
|
class NodeCenter
|
{
|
public:
|
typedef MQId Address;
|
typedef bhome_msg::ProcInfo ProcInfo;
|
typedef std::function<void(Address const)> Cleaner;
|
|
private:
|
enum {
|
kStateInvalid,
|
kStateNormal,
|
kStateOffline,
|
kStateKillme,
|
};
|
|
struct ProcState {
|
int64_t timestamp_ = 0;
|
uint32_t flag_ = 0; // reserved
|
};
|
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
|
|
struct NodeInfo;
|
typedef std::shared_ptr<NodeInfo> Node;
|
typedef std::weak_ptr<NodeInfo> WeakNode;
|
|
struct NodeInfo {
|
NodeCenter ¢er_;
|
SharedMemory &shm_;
|
ProcState state_; // state
|
std::map<MQId, int64_t> addrs_; // registered mqs
|
ProcInfo proc_; //
|
AddressTopics services_; // address: topics
|
AddressTopics local_sub_; // address: topics
|
AddressTopics net_sub_; // address: topics
|
NodeInfo(NodeCenter ¢er, SharedMemory &shm) :
|
center_(center), shm_(shm) {}
|
void PutOffline(const int64_t offline_time);
|
void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
|
void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
|
void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
|
};
|
|
struct TopicDest {
|
MQId mq_id_;
|
int64_t mq_abs_addr_;
|
WeakNode weak_node_;
|
bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; }
|
};
|
|
static inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
|
static inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); }
|
static inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
|
|
public:
|
typedef std::set<TopicDest> Clients;
|
|
NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
|
id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
|
|
// center name, no relative to shm.
|
const std::string &id() const { return id_; }
|
int64_t OnNodeInit(ShmSocket &socket, const int64_t val);
|
void RecordMsg(const MsgI &msg);
|
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
|
bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
|
|
bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
|
bool RemotePublish(BHMsgHead &head, const std::string &body_content);
|
bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
|
void OnAlloc(ShmSocket &socket, const int64_t val);
|
void OnFree(ShmSocket &socket, const int64_t val);
|
bool OnCommand(ShmSocket &socket, const int64_t val);
|
|
MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg);
|
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg);
|
|
template <class Reply, class Func>
|
Reply HandleMsg(const BHMsgHead &head, Func const &op)
|
{
|
try {
|
auto pos = nodes_.find(head.ssn_id());
|
if (pos == nodes_.end()) {
|
return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
|
} else {
|
auto &node = pos->second;
|
if (!MatchAddr(node->addrs_, SrcAddr(head))) {
|
return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
|
} else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) {
|
return op(node);
|
} else if (!Valid(*node)) {
|
return MakeReply<Reply>(eNoRespond, "Node is not alive.");
|
} else {
|
return op(node);
|
}
|
}
|
} catch (std::exception &e) {
|
LOG_ERROR() << "handle msg exception: " << e.what();
|
return MakeReply<Reply>(eError, "internal error.");
|
}
|
}
|
template <class Func>
|
inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
|
{
|
return HandleMsg<MsgCommonReply, Func>(head, op);
|
}
|
template <class Reply>
|
bool CheckMsg(const BHMsgHead &head, Reply &reply)
|
{
|
bool r = false;
|
auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
|
reply = HandleMsg<Reply>(head, onOk);
|
return r;
|
}
|
|
MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
|
MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
|
MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
|
MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
|
MsgQueryProcReply QueryProc(const std::string &proc_id);
|
MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
|
MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
|
MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
|
MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
|
|
void OnTimer();
|
|
// remote hosts records
|
std::vector<std::string> FindRemoteSubClients(const Topic &topic);
|
|
private:
|
void CheckNodes();
|
bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
|
void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
|
void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
|
Clients DoFindClients(const std::string &topic, bool from_remote);
|
bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
|
bool Valid(const WeakNode &weak)
|
{
|
auto node = weak.lock();
|
return node && Valid(*node);
|
}
|
void RemoveNode(Node &node);
|
Node GetNode(const MQId mq);
|
|
std::string id_; // center proc id;
|
|
std::unordered_map<Topic, Clients> service_map_;
|
std::unordered_map<Topic, Clients> local_sub_map_;
|
std::unordered_map<Topic, Clients> net_sub_map_;
|
std::unordered_map<Address, Node> nodes_;
|
std::unordered_map<ProcId, Address> online_node_addr_map_;
|
ProcRecords procs_; // To get a short index for msg alloc.
|
MsgRecords msgs_; // record all msgs alloced.
|
|
int64_t offline_time_;
|
int64_t kill_time_;
|
int64_t last_check_time_;
|
};
|
|
#endif // end of include guard: NODE_CENTER_KY67RJ1Q
|