/* * ===================================================================================== * * Filename: center_topic_node.cpp * * Description: * * Version: 1.0 * Created: 2021年05月20日 12时44分31秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #include "center_topic_node.h" #include "node_center.h" #include "topic_node.h" #include "json.h" #include using namespace std::chrono; using namespace std::chrono_literals; using namespace bhome_shm; using namespace ssjson; namespace { const std::string &kTopicQueryProc = "#center_query_procs"; std::string ToJson(const MsgQueryProcReply &qpr) { Json json; json.put("procCount", qpr.proc_list_size()); auto &list = json.put("procList", Json::Array()); // Json list = Json::Array(); for (auto &info : qpr.proc_list()) { Json proc; proc.put("id", info.proc().proc_id()); proc.put("name", info.proc().name()); proc.put("publicInfo", info.proc().public_info()); proc.put("online", info.online()); Json topics = Json::Array(); for (auto &t : info.topics().topic_list()) { topics.push_back(t); } proc.put("topics", topics); list.push_back(proc); } return json.dump(0); } } // namespace CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) : pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {} CenterTopicNode::~CenterTopicNode() { Stop(); } void CenterTopicNode::Stop() { bool cur = true; if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) { worker_.join(); pnode_->Stop(); } } bool CenterTopicNode::Start() { Stop(); int timeout = 3000; MsgCommonReply reply; ProcInfo info; info.set_proc_id("@center.node"); info.set_name("center node"); Json jinfo; jinfo.put("description", "some center services. Other nodes may use topics to use them."); info.set_public_info(jinfo.dump()); if (!pnode_->DoRegister(true, info, reply, timeout)) { throw std::runtime_error("center node register failed."); } MsgTopicList topics; topics.add_topic_list(kTopicQueryProc); if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) { throw std::runtime_error("center node register topics failed."); } auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) { auto reply = MakeReply(eSuccess); if (request.topic() == kTopicQueryProc) { std::string id; if (!request.data().empty()) { Json json; if (json.parse(request.data())) { id = json.get("proc_id", ""); } } auto data = (*pscenter_)->QueryProc(id); *reply.mutable_errmsg() = data.errmsg(); reply.set_data(ToJson(data)); } else { SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic()); } pnode_->ServerSendReply(src_info, reply); }; bool cur = false; if (run_.compare_exchange_strong(cur, true)) { auto heartbeat = [this]() { while (run_) { pnode_->Heartbeat(1000); std::this_thread::sleep_for(1s); } }; std::thread(heartbeat).swap(worker_); return pnode_->ServerStart(onRequest); } else { return false; } }