From 6c07fe29a5185835f28059f627a1d30e462da28b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 29 六月 2021 14:01:19 +0800
Subject: [PATCH] add notify node change.
---
box/node_center.h | 61 ++++++++++++++++++++++--------
1 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/box/node_center.h b/box/node_center.h
index b9a01b3..ae5b075 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -48,17 +48,16 @@
class MsgRecords
{
typedef int64_t MsgId;
- typedef int64_t Offset;
public:
- void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+ void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
void FreeMsg(MsgId id);
void AutoRemove();
size_t size() const { return msgs_.size(); }
void DebugPrint() const;
private:
- std::unordered_map<MsgId, Offset> msgs_;
+ std::unordered_map<MsgId, MsgI> msgs_;
int64_t time_to_clean_ = 0;
};
@@ -80,20 +79,29 @@
struct ProcState {
int64_t timestamp_ = 0;
uint32_t flag_ = 0; // reserved
- void PutOffline(const int64_t offline_time);
- void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
};
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
+ struct NodeInfo;
+ typedef std::shared_ptr<NodeInfo> Node;
+ typedef std::weak_ptr<NodeInfo> WeakNode;
+
struct NodeInfo {
+ NodeCenter ¢er_;
+ SharedMemory &shm_;
ProcState state_; // state
std::map<MQId, int64_t> addrs_; // registered mqs
ProcInfo proc_; //
AddressTopics services_; // address: topics
- AddressTopics subscriptions_; // address: topics
+ AddressTopics local_sub_; // address: topics
+ AddressTopics net_sub_; // address: topics
+ NodeInfo(NodeCenter ¢er, SharedMemory &shm) :
+ center_(center), shm_(shm) {}
+ void PutOffline(const int64_t offline_time);
+ void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
+ void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
+ void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
};
- typedef std::shared_ptr<NodeInfo> Node;
- typedef std::weak_ptr<NodeInfo> WeakNode;
struct TopicDest {
MQId mq_id_;
@@ -109,8 +117,8 @@
public:
typedef std::set<TopicDest> Clients;
- NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
- id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
+ NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
+ id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
// center name, no relative to shm.
const std::string &id() const { return id_; }
@@ -118,6 +126,10 @@
void RecordMsg(const MsgI &msg);
bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
+
+ bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+ bool RemotePublish(BHMsgHead &head, const std::string &body_content);
+ bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
void OnAlloc(ShmSocket &socket, const int64_t val);
void OnFree(ShmSocket &socket, const int64_t val);
bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -144,8 +156,8 @@
return op(node);
}
}
- } catch (...) {
- //TODO error log
+ } catch (std::exception &e) {
+ LOG_ERROR() << "handle msg exception: " << e.what();
return MakeReply<Reply>(eError, "internal error.");
}
}
@@ -154,22 +166,37 @@
{
return HandleMsg<MsgCommonReply, Func>(head, op);
}
+ template <class Reply>
+ bool CheckMsg(const BHMsgHead &head, Reply &reply)
+ {
+ bool r = false;
+ auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
+ reply = HandleMsg<Reply>(head, onOk);
+ return r;
+ }
MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
+ MsgQueryProcReply QueryProc(const std::string &proc_id);
MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
- Clients DoFindClients(const std::string &topic);
- bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
+ MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
void OnTimer();
+
+ // remote hosts records
+ std::vector<std::string> FindRemoteSubClients(const Topic &topic);
private:
void CheckNodes();
bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
+ void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
+ void Notify(const Topic &topic, NodeInfo &node);
+ void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
+ Clients DoFindClients(const std::string &topic, bool from_remote);
bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
bool Valid(const WeakNode &weak)
{
@@ -177,16 +204,18 @@
return node && Valid(*node);
}
void RemoveNode(Node &node);
+ Node GetNode(const MQId mq);
+
std::string id_; // center proc id;
std::unordered_map<Topic, Clients> service_map_;
- std::unordered_map<Topic, Clients> subscribe_map_;
+ std::unordered_map<Topic, Clients> local_sub_map_;
+ std::unordered_map<Topic, Clients> net_sub_map_;
std::unordered_map<Address, Node> nodes_;
std::unordered_map<ProcId, Address> online_node_addr_map_;
ProcRecords procs_; // To get a short index for msg alloc.
MsgRecords msgs_; // record all msgs alloced.
- Cleaner cleaner_; // remove mqs.
int64_t offline_time_;
int64_t kill_time_;
int64_t last_check_time_;
--
Gitblit v1.8.0