lichao
2021-06-02 993c556000a414011626770540678948f16eaa9e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
 * =====================================================================================
 *
 *       Filename:  center_topic_node.cpp
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年05月20日 12时44分31秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:  
 *
 * =====================================================================================
 */
#include "center_topic_node.h"
#include "node_center.h"
#include "topic_node.h"
 
#include "json.h"
#include <chrono>
 
using namespace std::chrono;
using namespace std::chrono_literals;
using namespace bhome_shm;
using namespace ssjson;
 
namespace
{
const std::string &kTopicQueryProc = "#center_query_procs";
 
std::string ToJson(const MsgQueryProcReply &qpr)
{
    Json json;
    json.put("procCount", qpr.proc_list_size());
    auto &list = json.put("procList", Json::Array());
    // Json list = Json::Array();
    for (auto &info : qpr.proc_list()) {
        Json proc;
        proc.put("id", info.proc().proc_id());
        proc.put("name", info.proc().name());
        proc.put("publicInfo", info.proc().public_info());
        proc.put("online", info.online());
        Json topics = Json::Array();
        for (auto &t : info.topics().topic_list()) {
            topics.push_back(t);
        }
        proc.put("topics", topics);
        list.push_back(proc);
    }
    return json.dump(0);
}
 
} // namespace
 
CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
    pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {}
 
CenterTopicNode::~CenterTopicNode() { Stop(); }
 
void CenterTopicNode::Stop()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) {
        worker_.join();
        pnode_->Stop();
    }
}
 
bool CenterTopicNode::Start()
{
    Stop();
 
    int timeout = 3000;
    MsgCommonReply reply;
 
    ProcInfo info;
    info.set_proc_id("@center.node");
    info.set_name("center node");
    Json jinfo;
    jinfo.put("description", "some center services. Other nodes may use topics to use them.");
    info.set_public_info(jinfo.dump());
    if (!pnode_->DoRegister(true, info, reply, timeout)) {
        throw std::runtime_error("center node register failed.");
    }
 
    MsgTopicList topics;
    topics.add_topic_list(kTopicQueryProc);
    if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
        throw std::runtime_error("center node register topics failed.");
    }
 
    auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
        auto reply = MakeReply<MsgRequestTopicReply>(eSuccess);
        if (request.topic() == kTopicQueryProc) {
            std::string id;
            if (!request.data().empty()) {
                Json json;
                if (json.parse(request.data())) {
                    id = json.get("proc_id", "");
                }
            }
            auto data = (*pscenter_)->QueryProc(id);
            *reply.mutable_errmsg() = data.errmsg();
            reply.set_data(ToJson(data));
        } else {
            SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic());
        }
        pnode_->ServerSendReply(src_info, reply);
    };
 
    bool cur = false;
    if (run_.compare_exchange_strong(cur, true)) {
        auto heartbeat = [this]() {
            while (run_) {
                pnode_->Heartbeat(1000);
                std::this_thread::sleep_for(1s);
            }
        };
        std::thread(heartbeat).swap(worker_);
        return pnode_->ServerStart(onRequest);
    } else {
        return false;
    }
}