From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
box/node_center.h | 19 ++++++++++++-------
1 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/box/node_center.h b/box/node_center.h
index b9a01b3..caaf054 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -51,14 +51,14 @@
typedef int64_t Offset;
public:
- void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+ void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
void FreeMsg(MsgId id);
void AutoRemove();
size_t size() const { return msgs_.size(); }
void DebugPrint() const;
private:
- std::unordered_map<MsgId, Offset> msgs_;
+ std::unordered_map<MsgId, MsgI> msgs_;
int64_t time_to_clean_ = 0;
};
@@ -80,17 +80,21 @@
struct ProcState {
int64_t timestamp_ = 0;
uint32_t flag_ = 0; // reserved
- void PutOffline(const int64_t offline_time);
- void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
};
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
struct NodeInfo {
+ NodeCenter ¢er_;
+ SharedMemory &shm_;
ProcState state_; // state
std::map<MQId, int64_t> addrs_; // registered mqs
ProcInfo proc_; //
AddressTopics services_; // address: topics
AddressTopics subscriptions_; // address: topics
+ NodeInfo(NodeCenter ¢er, SharedMemory &shm) :
+ center_(center), shm_(shm) {}
+ void PutOffline(const int64_t offline_time);
+ void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
};
typedef std::shared_ptr<NodeInfo> Node;
typedef std::weak_ptr<NodeInfo> WeakNode;
@@ -109,8 +113,8 @@
public:
typedef std::set<TopicDest> Clients;
- NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
- id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
+ NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
+ id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
// center name, no relative to shm.
const std::string &id() const { return id_; }
@@ -159,6 +163,7 @@
MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
+ MsgQueryProcReply QueryProc(const std::string &proc_id);
MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
@@ -170,6 +175,7 @@
private:
void CheckNodes();
bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
+ void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
bool Valid(const WeakNode &weak)
{
@@ -186,7 +192,6 @@
ProcRecords procs_; // To get a short index for msg alloc.
MsgRecords msgs_; // record all msgs alloced.
- Cleaner cleaner_; // remove mqs.
int64_t offline_time_;
int64_t kill_time_;
int64_t last_check_time_;
--
Gitblit v1.8.0