lichao
2021-06-23 c1e39e20ca42b21eeac8b5068fa1f921bf9a070f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
 * =====================================================================================
 *
 *       Filename:  node_center.h
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年05月20日 11时33分06秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#ifndef NODE_CENTER_KY67RJ1Q
#define NODE_CENTER_KY67RJ1Q
 
#include "shm_socket.h"
#include <unordered_map>
 
typedef std::string ProcId;
typedef size_t ProcIndex; // max local procs.
const int kMaxProcs = 65536;
 
// record all procs ever registered, always grow, never remove.
// mainly for node to request msg allocation.
// use index instead of MQId to save some bits.
class ProcRecords
{
public:
    struct ProcRec {
        ProcId proc_;
        MQId ssn_ = 0;
    };
 
    ProcRecords() { procs_.reserve(kMaxProcs); }
    ProcIndex Put(const ProcId &proc_id, const MQId ssn);
    const ProcRec &Get(const ProcIndex index) const;
 
private:
    std::unordered_map<ProcId, size_t> proc_index_;
    std::vector<ProcRec> procs_;
};
 
class MsgRecords
{
    typedef int64_t MsgId;
 
public:
    void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
    void FreeMsg(MsgId id);
    void AutoRemove();
    size_t size() const { return msgs_.size(); }
    void DebugPrint() const;
 
private:
    std::unordered_map<MsgId, MsgI> msgs_;
    int64_t time_to_clean_ = 0;
};
 
class NodeCenter
{
public:
    typedef MQId Address;
    typedef bhome_msg::ProcInfo ProcInfo;
    typedef std::function<void(Address const)> Cleaner;
 
private:
    enum {
        kStateInvalid,
        kStateNormal,
        kStateOffline,
        kStateKillme,
    };
 
    struct ProcState {
        int64_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
    };
    typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
 
    struct NodeInfo;
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
 
    struct NodeInfo {
        NodeCenter &center_;
        SharedMemory &shm_;
        ProcState state_;               // state
        std::map<MQId, int64_t> addrs_; // registered mqs
        ProcInfo proc_;                 //
        AddressTopics services_;        // address: topics
        AddressTopics local_sub_;       // address: topics
        AddressTopics net_sub_;         // address: topics
        NodeInfo(NodeCenter &center, SharedMemory &shm) :
            center_(center), shm_(shm) {}
        void PutOffline(const int64_t offline_time);
        void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
        void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
        void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
    };
 
    struct TopicDest {
        MQId mq_id_;
        int64_t mq_abs_addr_;
        WeakNode weak_node_;
        bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; }
    };
 
    static inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
    static inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); }
    static inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
 
public:
    typedef std::set<TopicDest> Clients;
 
    NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
        id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
 
    // center name, no relative to shm.
    const std::string &id() const { return id_; }
    int64_t OnNodeInit(ShmSocket &socket, const int64_t val);
    void RecordMsg(const MsgI &msg);
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
    bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
 
    bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    bool RemotePublish(BHMsgHead &head, const std::string &body_content);
    bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
    void OnAlloc(ShmSocket &socket, const int64_t val);
    void OnFree(ShmSocket &socket, const int64_t val);
    bool OnCommand(ShmSocket &socket, const int64_t val);
 
    MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg);
    MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg);
 
    template <class Reply, class Func>
    Reply HandleMsg(const BHMsgHead &head, Func const &op)
    {
        try {
            auto pos = nodes_.find(head.ssn_id());
            if (pos == nodes_.end()) {
                return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
            } else {
                auto &node = pos->second;
                if (!MatchAddr(node->addrs_, SrcAddr(head))) {
                    return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
                } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) {
                    return op(node);
                } else if (!Valid(*node)) {
                    return MakeReply<Reply>(eNoRespond, "Node is not alive.");
                } else {
                    return op(node);
                }
            }
        } catch (std::exception &e) {
            LOG_ERROR() << "handle msg exception: " << e.what();
            return MakeReply<Reply>(eError, "internal error.");
        }
    }
    template <class Func>
    inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
    {
        return HandleMsg<MsgCommonReply, Func>(head, op);
    }
    template <class Reply>
    bool CheckMsg(const BHMsgHead &head, Reply &reply)
    {
        bool r = false;
        auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
        reply = HandleMsg<Reply>(head, onOk);
        return r;
    }
 
    MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
    MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
    MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
    MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
    MsgQueryProcReply QueryProc(const std::string &proc_id);
    MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
    MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
    MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
    MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
 
    void OnTimer();
 
    // remote hosts records
    std::vector<std::string> FindRemoteSubClients(const Topic &topic);
 
private:
    void CheckNodes();
    bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
    void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
    void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
    Clients DoFindClients(const std::string &topic, bool from_remote);
    bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
    bool Valid(const WeakNode &weak)
    {
        auto node = weak.lock();
        return node && Valid(*node);
    }
    void RemoveNode(Node &node);
    Node GetNode(const MQId mq);
 
    std::string id_; // center proc id;
 
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> local_sub_map_;
    std::unordered_map<Topic, Clients> net_sub_map_;
    std::unordered_map<Address, Node> nodes_;
    std::unordered_map<ProcId, Address> online_node_addr_map_;
    ProcRecords procs_; // To get a short index for msg alloc.
    MsgRecords msgs_;   // record all msgs alloced.
 
    int64_t offline_time_;
    int64_t kill_time_;
    int64_t last_check_time_;
};
 
#endif // end of include guard: NODE_CENTER_KY67RJ1Q