/* * ===================================================================================== * * Filename: center_topic_node.cpp * * Description: * * Version: 1.0 * Created: 2021年05月20日 12时44分31秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "center_topic_node.h" #include "node_center.h" #include "topic_node.h" #include "json.h" #include using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_shm; using namespace ssjson; namespace { const std::string &kTopicQueryProc = "@center_query_procs"; std::string ToJson(const MsgQueryProcReply &qpr) { Json json; json.put("procCount", qpr.proc_list_size()); auto &list = json.put("procList", Json::Array()); // Json list = Json::Array(); for (auto &info : qpr.proc_list()) { Json proc; proc.put("id", info.proc().proc_id()); proc.put("name", info.proc().name()); proc.put("publicInfo", info.proc().public_info()); proc.put("online", info.online()); Json topics = Json::Array(); for (auto &t : info.topics().topic_list()) { topics.push_back(t); } proc.put("topics", topics); list.push_back(proc); } return json.dump(0); } } // namespace CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) : pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {} CenterTopicNode::~CenterTopicNode() { Stop(); } void CenterTopicNode::Stop() { bool cur = true; if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) { worker_.join(); pnode_->Stop(); } } bool CenterTopicNode::Start() { Stop(); int timeout = 3000; MsgCommonReply reply; ProcInfo info; info.set_proc_id("#center.node"); info.set_name("center node"); if (!pnode_->Register(info, reply, timeout)) { throw std::runtime_error("center node register failed."); } MsgTopicList topics; topics.add_topic_list(kTopicQueryProc); if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) { throw std::runtime_error("center node register topics failed."); } auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) { auto reply = MakeReply(eSuccess); if (request.topic() == kTopicQueryProc) { auto data = (*pscenter_)->QueryProc(request.data()); *reply.mutable_errmsg() = data.errmsg(); reply.set_data(ToJson(data)); } else { SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic()); } pnode_->ServerSendReply(src_info, reply); }; bool cur = false; if (run_.compare_exchange_strong(cur, true)) { auto heartbeat = [this]() { while (run_) { pnode_->Heartbeat(1000); std::this_thread::sleep_for(1s); } }; std::thread(heartbeat).swap(worker_); return pnode_->ServerStart(onRequest); } else { return false; } }