lichao
2021-04-02 dc12826dd61ce18fac3a9561c5843d30a0cf9660
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
 * =====================================================================================
 *
 *       Filename:  reqrep_center.cpp
 *
 *    Description:  topic request/reply center
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 14时08分50秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
#include "reqrep_center.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_map>
 
using namespace bhome_shm;
 
namespace
{
auto Now = []() { time_t t; return time(&t); };
 
class NodeCenter
{
public:
    typedef std::string ProcAddr;
    typedef bhome::msg::ProcInfo ProcInfo;
 
    template <class Iter>
    bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
    {
        try {
            Node node(new NodeInfo);
            node->addr_ = src_mq;
            node->proc_.Swap(&info);
            node->state_.timestamp_ = Now();
            nodes_[node->proc_.id()] = node;
            for (auto it = topics_begin; it != topics_end; ++it) {
                topic_map_[*it] = node;
            }
            return true;
        } catch (...) {
            return false;
        }
    }
    void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
    {
        auto pos = nodes_.find(info.name());
        if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
            NodeInfo &ni = *pos->second;
            ni.state_.timestamp_ = Now();
            if (!info.public_info().empty()) {
                ni.proc_.set_public_info(info.public_info());
            }
            if (!info.private_info().empty()) {
                ni.proc_.set_private_info(info.private_info());
            }
        }
    }
    bool QueryTopic(const std::string &topic, ProcAddr &addr)
    {
        auto pos = topic_map_.find(topic);
        if (pos != topic_map_.end()) {
            Node node(pos->second.lock());
            if (node) {
                addr = node->addr_;
                return true;
            } else { // dead, remove record.
                topic_map_.erase(pos);
                return false;
            }
        } else {
            return false;
        }
    }
 
private:
    struct ProcState {
        time_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
    };
    typedef std::string ProcId;
    struct NodeInfo {
        ProcState state_; // state
        ProcAddr addr_;   // registered_mqid.
        ProcInfo proc_;   //
    };
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    std::unordered_map<std::string, WeakNode> topic_map_;
    std::unordered_map<ProcId, Node> nodes_;
};
} // namespace
 
bool ReqRepCenter::Start(const int nworker)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>();
    auto onRecv = [center_ptr, this](BHMsg &msg) {
        auto &center = *center_ptr;
 
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
        }
#endif
        if (msg.route_size() == 0) {
            return;
        }
        auto &src_mq = msg.route(0).mq_id();
 
        auto OnRegister = [&]() {
            DataProcRegister reg;
            if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
                center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
            }
        };
 
        auto OnHeartbeat = [&]() {
            DataProcHeartbeat hb;
            if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
                center->Heartbeat(*hb.mutable_proc(), src_mq);
            }
        };
 
        auto OnQueryTopic = [&]() {
            DataProcQueryTopic query;
            NodeCenter::ProcAddr dest;
            if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
                MQId remote;
                memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
                MsgI imsg;
                if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
                if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
                    imsg.Release(shm());
                }
            }
        };
 
        switch (msg.type()) {
        case kMsgTypeProcRegisterTopics: OnRegister(); break;
        case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
        case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
        default: break;
        }
    };
 
    const int kMaxWorker = 16;
    return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}