lichao
2021-04-06 bb9a7e348892eb5c4fccb063380aa6fcd9612b71
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
 * =====================================================================================
 *
 *       Filename:  reqrep_center.cpp
 *
 *    Description:  topic request/reply center
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 14时08分50秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
#include "reqrep_center.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_map>
 
using namespace bhome_shm;
 
namespace
{
auto Now = []() { time_t t; return time(&t); };
 
class NodeCenter
{
public:
    typedef std::string ProcAddr;
    typedef bhome::msg::ProcInfo ProcInfo;
 
    template <class Iter>
    bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
    {
        try {
            Node node(new NodeInfo);
            node->addr_ = src_mq;
            node->proc_.Swap(&info);
            node->state_.timestamp_ = Now();
            nodes_[node->proc_.id()] = node;
            for (auto it = topics_begin; it != topics_end; ++it) {
                topic_map_[*it] = node;
            }
            return true;
        } catch (...) {
            return false;
        }
    }
    void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
    {
        auto pos = nodes_.find(info.name());
        if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
            NodeInfo &ni = *pos->second;
            ni.state_.timestamp_ = Now();
            if (!info.public_info().empty()) {
                ni.proc_.set_public_info(info.public_info());
            }
            if (!info.private_info().empty()) {
                ni.proc_.set_private_info(info.private_info());
            }
        }
    }
    bool QueryTopic(const Topic &topic, ProcAddr &addr)
    {
        auto pos = topic_map_.find(topic);
        if (pos != topic_map_.end()) {
            Node node(pos->second.lock());
            if (node) {
                addr = node->addr_;
                return true;
            } else { // dead, remove record.
                topic_map_.erase(pos);
                return false;
            }
        } else {
            return false;
        }
    }
 
private:
    struct ProcState {
        time_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
    };
    typedef std::string ProcId;
    struct NodeInfo {
        ProcState state_; // state
        ProcAddr addr_;   // registered_mqid.
        ProcInfo proc_;   //
    };
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    std::unordered_map<Topic, WeakNode> topic_map_;
    std::unordered_map<ProcId, Node> nodes_;
};
 
} // namespace
 
BHCenter::MsgHandler MakeReqRepCenter()
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>();
    return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
        auto &center = *center_ptr;
        auto &shm = socket.shm();
 
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("center queue size: %ld\n", socket.Pending());
        }
#endif
        auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
 
        auto OnRegister = [&]() {
            if (msg.route_size() != 1) { return; }
 
            MsgRegister reg;
            if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
                center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
            }
        };
 
        auto OnHeartbeat = [&]() {
            if (msg.route_size() != 1) { return; }
            auto &src_mq = msg.route(0).mq_id();
 
            MsgHeartbeat hb;
            if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
                center->Heartbeat(*hb.mutable_proc(), SrcMQ());
            }
        };
 
        auto OnQueryTopic = [&]() {
            if (msg.route_size() != 1) { return; }
 
            MsgQueryTopic query;
            NodeCenter::ProcAddr dest;
            if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
                MQId remote;
                memcpy(&remote, SrcMQ().data(), sizeof(MQId));
                MsgI imsg;
                if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
                if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
                    imsg.Release(shm);
                }
            }
        };
 
        switch (msg.type()) {
        case kMsgTypeRegister: OnRegister(); return true;
        case kMsgTypeHeartbeat: OnHeartbeat(); return true;
        case kMsgTypeQueryTopic: OnQueryTopic(); return true;
        default: return false;
        }
    };
}
 
bool ReqRepCenter::Start(const int nworker)
{
    auto handler = MakeReqRepCenter();
    printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
 
    const int kMaxWorker = 16;
    return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}