/*
|
* =====================================================================================
|
*
|
* Filename: center_topic_node.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年05月20日 12时44分31秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "center_topic_node.h"
|
#include "node_center.h"
|
#include "topic_node.h"
|
|
#include "json.h"
|
#include <chrono>
|
|
using namespace std::chrono;
|
using namespace std::chrono_literals;
|
using namespace bhome_shm;
|
using namespace ssjson;
|
|
namespace
|
{
|
const std::string &kTopicQueryProc = "#center_query_procs";
|
|
std::string ToJson(const MsgQueryProcReply &qpr)
|
{
|
Json json;
|
json.put("procCount", qpr.proc_list_size());
|
auto &list = json.put("procList", Json::Array());
|
// Json list = Json::Array();
|
for (auto &info : qpr.proc_list()) {
|
Json proc;
|
proc.put("id", info.proc().proc_id());
|
proc.put("name", info.proc().name());
|
proc.put("publicInfo", info.proc().public_info());
|
proc.put("online", info.online());
|
auto AddTopics = [&](auto &name, auto &topic_list) {
|
Json topics = Json::Array();
|
for (auto &t : topic_list) {
|
topics.push_back(t);
|
}
|
proc.put(name, topics);
|
};
|
AddTopics("service", info.service().topic_list());
|
AddTopics("local_sub", info.local_sub().topic_list());
|
AddTopics("net_sub", info.net_sub().topic_list());
|
|
list.push_back(proc);
|
}
|
return json.dump(0);
|
}
|
|
} // namespace
|
|
CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
|
pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {}
|
|
CenterTopicNode::~CenterTopicNode() { Stop(); }
|
|
void CenterTopicNode::Stop()
|
{
|
bool cur = true;
|
if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) {
|
worker_.join();
|
pnode_->Stop();
|
}
|
}
|
|
bool CenterTopicNode::Start()
|
{
|
Stop();
|
|
int timeout = 3000;
|
MsgCommonReply reply;
|
|
ProcInfo info;
|
info.set_proc_id("@center.node");
|
info.set_name("center node");
|
Json jinfo;
|
jinfo.put("description", "some center services. Other nodes may use topics to use them.");
|
info.set_public_info(jinfo.dump());
|
if (!pnode_->DoRegister(true, info, reply, timeout)) {
|
throw std::runtime_error("center node register failed.");
|
}
|
|
MsgTopicList topics;
|
topics.add_topic_list(kTopicQueryProc);
|
if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
|
throw std::runtime_error("center node register topics failed.");
|
}
|
|
auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
|
auto reply = MakeReply<MsgRequestTopicReply>(eSuccess);
|
if (request.topic() == kTopicQueryProc) {
|
std::string id;
|
if (!request.data().empty()) {
|
Json json;
|
if (json.parse(request.data())) {
|
id = json.get("proc_id", "");
|
}
|
}
|
auto data = (*pscenter_)->QueryProc(id);
|
*reply.mutable_errmsg() = data.errmsg();
|
reply.set_data(ToJson(data));
|
} else {
|
SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic());
|
}
|
pnode_->ServerSendReply(src_info, reply);
|
};
|
|
bool cur = false;
|
if (run_.compare_exchange_strong(cur, true)) {
|
auto heartbeat = [this]() {
|
while (run_) {
|
pnode_->Heartbeat(1000);
|
std::this_thread::sleep_for(1s);
|
}
|
};
|
std::thread(heartbeat).swap(worker_);
|
return pnode_->ServerStart(onRequest);
|
} else {
|
return false;
|
}
|
}
|