From 0e31f38fc37216e1376d8101d1bcf7a3779279dc Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 20 五月 2021 15:29:16 +0800 Subject: [PATCH] add center topic node. --- utest/api_test.cpp | 61 ++ box/center_topic_node.cpp | 117 ++++++ box/json.h | 277 ++++++++++++++++ box/center.cpp | 7 box/node_center.h | 1 box/center.h | 4 box/node_center.cpp | 68 ++- box/center_topic_node.h | 46 ++ box/json.cpp | 380 ++++++++++++++++++++++ 9 files changed, 915 insertions(+), 46 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0f36719..e745be8 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -16,6 +16,7 @@ * ===================================================================================== */ #include "center.h" +#include "center_topic_node.h" #include "node_center.h" #include <chrono> @@ -185,7 +186,10 @@ auto &info = kv.second; sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); } + + topic_node_.reset(new CenterTopicNode(center_ptr, shm)); } +BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() { @@ -193,12 +197,13 @@ auto &info = kv.second; sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } - + topic_node_->Start(); return true; } bool BHCenter::Stop() { + topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); } diff --git a/box/center.h b/box/center.h index ad0ac4f..6610277 100644 --- a/box/center.h +++ b/box/center.h @@ -22,6 +22,7 @@ #include <functional> #include <map> #include <memory> +class CenterTopicNode; class BHCenter { @@ -34,7 +35,7 @@ static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len); BHCenter(Socket::Shm &shm); - ~BHCenter() { Stop(); } + ~BHCenter(); bool Start(); bool Stop(); @@ -51,6 +52,7 @@ static CenterRecords &Centers(); std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; + std::unique_ptr<CenterTopicNode> topic_node_; }; #endif // end of include guard: CENTER_TM9OUQTG diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp new file mode 100644 index 0000000..82b38ca --- /dev/null +++ b/box/center_topic_node.cpp @@ -0,0 +1,117 @@ +/* + * ===================================================================================== + * + * Filename: center_topic_node.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�20鏃� 12鏃�44鍒�31绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "center_topic_node.h" +#include "node_center.h" +#include "topic_node.h" + +#include "json.h" +#include <chrono> + +using namespace std::chrono; +using namespace std::chrono_literals; +using namespace bhome_shm; +using namespace ssjson; + +namespace +{ +const std::string &kTopicQueryProc = "@center_query_procs"; + +std::string ToJson(const MsgQueryProcReply &qpr) +{ + Json json; + json.put("procCount", qpr.proc_list_size()); + auto &list = json.put("procList", Json::Array()); + // Json list = Json::Array(); + for (auto &info : qpr.proc_list()) { + Json proc; + proc.put("id", info.proc().proc_id()); + proc.put("name", info.proc().name()); + proc.put("publicInfo", info.proc().public_info()); + proc.put("online", info.online()); + Json topics = Json::Array(); + for (auto &t : info.topics().topic_list()) { + topics.push_back(t); + } + proc.put("topics", topics); + list.push_back(proc); + } + return json.dump(0); +} + +} // namespace + +CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) : + pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {} + +CenterTopicNode::~CenterTopicNode() { Stop(); } + +void CenterTopicNode::Stop() +{ + bool cur = true; + if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) { + worker_.join(); + pnode_->Stop(); + } +} + +bool CenterTopicNode::Start() +{ + Stop(); + + int timeout = 3000; + MsgCommonReply reply; + + ProcInfo info; + info.set_proc_id("#center.node"); + info.set_name("center node"); + if (!pnode_->Register(info, reply, timeout)) { + throw std::runtime_error("center node register failed."); + } + + MsgTopicList topics; + topics.add_topic_list(kTopicQueryProc); + if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) { + throw std::runtime_error("center node register topics failed."); + } + + auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) { + auto reply = MakeReply<MsgRequestTopicReply>(eSuccess); + if (request.topic() == kTopicQueryProc) { + auto data = (*pscenter_)->QueryProc(request.data()); + *reply.mutable_errmsg() = data.errmsg(); + reply.set_data(ToJson(data)); + } else { + SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic()); + } + pnode_->ServerSendReply(src_info, reply); + }; + + bool cur = false; + if (run_.compare_exchange_strong(cur, true)) { + auto heartbeat = [this]() { + while (run_) { + pnode_->Heartbeat(1000); + std::this_thread::sleep_for(1s); + } + }; + std::thread(heartbeat).swap(worker_); + return pnode_->ServerStart(onRequest); + } else { + return false; + } +} diff --git a/box/center_topic_node.h b/box/center_topic_node.h new file mode 100644 index 0000000..a29860e --- /dev/null +++ b/box/center_topic_node.h @@ -0,0 +1,46 @@ +/* + * ===================================================================================== + * + * Filename: center_topic_node.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�05鏈�20鏃� 12鏃�44鍒�42绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef CENTER_TOPIC_NODE_YCI0P9KC +#define CENTER_TOPIC_NODE_YCI0P9KC + +#include "bh_util.h" +#include "shm.h" +#include <atomic> +#include <memory> +#include <thread> + +class TopicNode; +class NodeCenter; +// center as a topic node. +class CenterTopicNode +{ +public: + typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; + CenterTopicNode(CenterPtr center, bhome_shm::SharedMemory &shm); + ~CenterTopicNode(); + bool Start(); + void Stop(); + +private: + CenterPtr pscenter_; + std::unique_ptr<TopicNode> pnode_; + std::thread worker_; + std::atomic<bool> run_; +}; + +#endif // end of include guard: CENTER_TOPIC_NODE_YCI0P9KC diff --git a/box/json.cpp b/box/json.cpp new file mode 100755 index 0000000..0ad2e49 --- /dev/null +++ b/box/json.cpp @@ -0,0 +1,380 @@ +#include "json.h" + +namespace ssjson { + +namespace _impl { + + typedef std::string Buffer; + + template <class Buf, class ...T> + inline void DumpNumber(Buf &res, const char *fmt, T&&...val) { + char buf[64] = {0}; + int n = snprintf(buf, sizeof(buf)-1, fmt, val...); + res.append(buf, n); + } + inline void DumpFloat(Buffer &res, const long double fval) { DumpNumber(res, "%.*Lg", std::numeric_limits<long double>::digits10, fval); } + inline void DumpInt(Buffer &res, const long long ival) { DumpNumber(res, "%lld", ival); } +// inline void DumpFloat(Buffer &res, const double fval) { DumpNumber(res, "%.*g", std::numeric_limits<double>::digits10, fval); } +// inline void DumpInt(Buffer &res, const long ival) { DumpNumber(res, "%ld", ival); } + + // assume utf8 string. + void DumpString(Buffer &res, const std::string &s, const bool escape_unicode) { + res += '\"'; + + auto EscapeSpecialChar = [&](const int c) { // ( \b, \f, \t, \n, \r, \", \\, / ). + const char esc_table[] = { + 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 'b', 't', + 'n', 0, 'f','r', 0 , 0, 0, 0 , 0 , 0 , + 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , + 0 , 0, 0 , 0 ,'\"', 0, 0, 0 , 0 , 0 , + 0 , 0, 0 , 0 , 0 , 0, 0, '/', 0 , 0 , + 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , + 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , + 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , + 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , + 0 , 0,'\\', }; + res += '\\'; + res += esc_table[c]; + }; + auto AddUnicode = [&](unsigned u) { + auto HexChar = [](unsigned char uc) { return "0123456789abcdef"[uc & 0xF]; }; + char buf[6] = { '\\', 'u', HexChar(u>>12), HexChar(u>>8), HexChar(u>>4), HexChar(u) }; + res.append(buf, buf+6); + }; + auto IsControl = [](const unsigned char c) { return c < 0x20 || c == 0x7F; }; + + if (escape_unicode) { + for (unsigned i = 0; i < s.size(); ++i) { + switch(s[i]) { + case '\b': case '\f': case '\t': case '\n': case '\r': + case '\"': case '\\': case '/' : + EscapeSpecialChar(s[i]); + break; + default : + { + const unsigned char &uc = (unsigned char &)s[i]; + const unsigned char *sz = &uc; + if (IsControl(uc)) { // control characters + AddUnicode(uc); + } else if (uc < 0x80) { + res += s[i]; + } else if (uc < 0xE0) { // 2 bytes + AddUnicode(((sz[0]&0x1F)<<6) | (sz[1] & 0x3F)); + i += 1; + } else if (uc < 0xF0) { // 3 bytes + AddUnicode(((sz[0]&0x0F)<<12) | ((sz[1]&0x3F)<<6) | (sz[2] & 0x3F)); + i += 2; + } else { + res += s[i]; // exceed 0xFFFF. + } + } break; + } + } + } else { + for (unsigned i = 0; i < s.size(); ++i) { + switch(s[i]) { + case '\b': case '\f': case '\t': case '\n': case '\r': + case '\"': case '\\': case '/' : + EscapeSpecialChar(s[i]); + break; + default : + if (IsControl(s[i])) { // control characters + AddUnicode(s[i]); + } else { + res += s[i]; + } + break; + } + } + } + res += '\"'; + }; + + void Dump(const JValue &jv, Buffer &res, JValue::DumpOptions &opt, const bool pretty) + { + auto NewLine = [&]() { + res += '\n'; + res.append(opt.indent_level_ * opt.indent_step_, ' '); + }; + + switch (jv.type()) { + case JValue::jv_object: + { + res += '{'; + auto &obj = jv.object(); + if (!obj.empty()) { + if (pretty) { + opt.indent_level_++; + for (auto &&kv : obj) { + NewLine(); + DumpString(res, kv.first, opt.escape_unicode_); + res.append(": ", 2); + Dump(kv.second, res, opt, pretty); + res += ','; + } + res.pop_back(); // remove last ','; + opt.indent_level_--; + NewLine(); + } else { + for (auto &&kv : obj) { + DumpString(res, kv.first, opt.escape_unicode_); + res += ':'; + Dump(kv.second, res, opt, pretty); + res += ','; + } + res.pop_back(); // remove last ','; + } + } + res += '}'; + } break; + case JValue::jv_array: + { + res += '['; + auto &arr = jv.array(); + if (!arr.empty()) { + if (pretty) { + opt.indent_level_++; + for (auto &&v : arr) { + NewLine(); + Dump(v, res, opt, pretty); + res += ','; + } + res.pop_back(); // remove last ','; + opt.indent_level_--; + NewLine(); + } else { + for (auto &&v : arr) { + Dump(v, res, opt, pretty); + res += ','; + } + res.pop_back(); // remove last ','; + } + } + res += ']'; + } break; + case JValue::jv_float: + { + auto idx = res.size(); + DumpFloat(res, jv.get_value<JValue::float_type>()); + while (idx < res.size() && res[idx] != '.') { + ++idx; + } + if (idx == res.size()) { + res.append(".0", 2); + } + } break; + case JValue::jv_null: res.append("null", 4); break; + case JValue::jv_int: DumpInt(res, jv.get_value<JValue::int_type>()); break; + case JValue::jv_bool: + if (jv.get_value<bool>()) { + res.append("true", 4); + } else { + res.append("false", 5); + } + break; + case JValue::jv_string: DumpString(res, jv.get_value<std::string>(), opt.escape_unicode_); break; + default: std::invalid_argument("invalid json type:" + std::to_string(jv.type())); break; + } + } + + auto HexCharVal = [](const unsigned char c) { + static const unsigned char hex_char_table[] = { + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 10, 11, 12, 13, 14, 15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 10, 11, 12, 13, 14, 15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + }; + return hex_char_table[c]; + }; +} // namespace _impl + + int JValue::parse(const char *begin, const char *end) + { + using namespace _impl; + const char *p = begin; + + auto Error = [&](const std::string &msg = "invalid value at"){ throw std::invalid_argument(msg + " : "+ std::string(p, std::min(int(end-p), 30))); }; + auto IsSpace = [](const char c) { + return c == ' ' || c == '\t' || c == '\r' || c == '\n'; + }; + auto IgnoreSpaces = [&]() { while (p != end && IsSpace(*p)) { ++p; } }; + auto Peek = [&]() { IgnoreSpaces(); if (p == end) { Error("input ends unexpectedly!"); } return *p; }; + auto Expect = [&](const char c) { if (Peek() != c) { Error(std::string("parse error, expecting ") + c); } }; + + auto parse_string = [&]() { + std::string res; + auto to_utf8 = [&]() { + auto Hex4 = [](const char *sz) { + auto FromHex = [](const unsigned char c)->unsigned char { + const unsigned char v = HexCharVal(c); + if (v == 0xFF) { + throw std::invalid_argument(std::string("invalid unicode input ") + char(c)); + } + return v; + }; + return (FromHex(sz[0]) << 12) | (FromHex(sz[1]) << 8) | (FromHex(sz[2]) << 4) | (FromHex(sz[3])); + }; + const unsigned int u = Hex4(p+2); + p += 4; + if (u < 0x80) { + res += char(u); + } else if (u < 0x800) { + res += char((u >> 6) | 0xC0); + res += char((u & 0x3F) | 0x80); + } else if (u < 0x10000) { + res += char((u >> 12) | 0xE0); + res += char(((u >> 6)& 0x3F) | 0x80); + res += char((u & 0x3F) | 0x80); + } else if (u < 0x110000) { + res += char((u >> 18) | 0xF0); + res += char(((u >> 12)& 0x3F) | 0x80); + res += char(((u >> 6)& 0x3F) | 0x80); + res += char((u & 0x3F) | 0x80); + } else { + Error("invalid unicode codepoint"); + } + }; + auto ParseEscapedChar = [&](const int c) { + const char unesc_table[] = { + 0 , '\b', 0 , 0 , 0 , '\f', 0 , + 0 , 0 , 0 , 0 , 0 , 0 , '\n', + 0 , 0 , 0 , '\r', 0 , '\t', }; + res += unesc_table[c-'a']; + }; + unsigned left = 0; + auto AddLeft = [&] { if (left != 0) { res.append(p-left, left); left = 0; } }; + while (++p != end) { + switch(*p) { + case '\"': + AddLeft(); + ++p; + return res; + case '\\': + AddLeft(); + if (p+1 != end) { + switch (p[1]) { + case 'b' : case 'f' : case 'n' : case 'r' : case 't' : ParseEscapedChar(p[1]); break; + case 'u' : to_utf8(); break; + case '\"': case '\\': case '/' : ++left; break; + default: left += 2; break; + } + ++p; // parsed 2 char. + } // else, next round will fail. + break; + // and escaped char should not appear. + case '\t': case '\n': case '\r': case '\b': case '\f': //case '/' : + AddLeft(); + Error("invalid string content") ; + break; + default: ++left; break; + } + } + Error("string ends unexpectedly!"); + return res; + }; + + auto parse_object = [&]() { + assert(*p == '{'); + ++p; + if (Peek() == '}') { + ++p; + return object_type(); + } + object_type obj; + while (true) { + Expect('\"'); + std::string key(parse_string()); + + Expect(':'); + ++p; + + JValue v; + p += v.parse(p, end); + obj.emplace(std::move(key), std::move(v)); + + switch (Peek()) { + case ',': ++p; break; + case '}': ++p; return obj; + default: Error("parse object error"); + } + } + }; + auto parse_array = [&]() { + assert(*p == '['); + ++p; + if (Peek() == ']') { + ++p; + return array_type(); + } + array_type arr; + while (true) { + JValue v; + p += v.parse(p, end); + arr.emplace_back(std::move(v)); + + switch (Peek()) { + case ',': ++p; break; + case ']': ++p; return arr; + default: Error("parse array error"); + } + } + }; + + auto ExpectText = [&](const char *sz, const size_t len) { + if(memcmp(p+1, sz+1, len-1) == 0) { + p += len; + } else { + Error("parse literal error, expecting '" + std::string(sz, len) + "', read " + std::string(p, len).c_str()); + } + }; + + switch (Peek()) { + case '{': *this = parse_object(); break; + case '[': *this = parse_array(); break; + case '\"': *this = parse_string(); break; + case 't': ExpectText("true", 4); *this = true; break; + case 'f': ExpectText("false", 5); *this = false; break; + case 'n': ExpectText("null", 4); *this = nullptr; break; + case '0': case '1': case '2': case '3': case '4': + case '5': case '6': case '7': case '8': case '9': case '-': + { + auto IsNumber = [](const char c) { return '0' <= c && c <= '9'; }; + auto sep = p+1; + while (sep != end && IsNumber(*sep)) { ++sep; } + char *parse_end = (char*)p; + switch (*sep) { + case '.' : case 'e' : case 'E': *this = strtold((char*)p, &parse_end); break; + default: *this = strtoll((char*)p, &parse_end, 10); break; + } + p = parse_end; + } break; + default : Error(); break; + } + return p - begin; + } + + std::string JValue::dump(const DumpOptions &opt_in) const { + using namespace _impl; + Buffer res; + DumpOptions opt(opt_in); + if(opt.indent_step_ > 0) { + res.append(opt.indent_level_ * opt.indent_step_, ' '); + } + Dump(*this, res, opt, opt.indent_step_ > 0); + return res; + } +} // jsonpp + diff --git a/box/json.h b/box/json.h new file mode 100755 index 0000000..461f0e9 --- /dev/null +++ b/box/json.h @@ -0,0 +1,277 @@ +#ifndef JSON_B360EKF1 +#define JSON_B360EKF1 + +#include <algorithm> +#include <boost/variant.hpp> +#include <map> +#include <string> +#include <vector> + +namespace ssjson +{ + +class JValue +{ +public: + typedef std::string key_type; + typedef long long int_type; + typedef long double float_type; + typedef JValue value_type; + typedef std::map<key_type, value_type> object_type; + typedef object_type::value_type pair_type; + typedef std::vector<value_type> array_type; + + enum JVType { jv_null, + jv_int, + jv_float, + jv_bool, + jv_string, + jv_object, + jv_array }; + std::string name() const + { + static const char *names[] = {"jv_null", "jv_int", "jv_float", "jv_bool", "jv_string", "jv_object", "jv_array"}; + return names[type()]; + } + +private: + typedef boost::variant<std::nullptr_t, int_type, float_type, bool, std::string, object_type, array_type> data_type; + + template <class C> + const C &val() const { return ref_val<C>(); } + template <class C> + C &val() { return const_cast<C &>(ref_val<C>()); } + + template <class C> + class TInitValue + { + typedef C value_t; + typedef std::vector<TInitValue> list_t; + mutable boost::variant<value_t, list_t> data_; + + public: + template <class T> + TInitValue(const T &t) : + data_(value_t(t)) {} + template <class T> + TInitValue(T &&t) : + data_(value_t(std::move(t))) {} + TInitValue(std::initializer_list<TInitValue> l) : + data_(list_t(l)){}; + bool is_value() const { return data_.which() == 0; } + bool is_list() const { return data_.which() == 1; } + const list_t &list() const { return boost::get<list_t>(data_); } + const value_t &value() const { return boost::get<value_t>(data_); } + }; + typedef TInitValue<JValue> InitValue; + typedef std::initializer_list<InitValue> InitList; + + template <class Iter> + static object_type MakeObject(Iter begin, Iter end) + { + object_type obj; + for (auto it = begin; it != end; ++it) { + obj.emplace(std::move(it->list()[0].value().template val<key_type>()), (it->list()[1])); + } + return obj; + } + template <class Iter> + static array_type MakeArray(Iter begin, Iter end) { return array_type(begin, end); } + template <class Iter> + static JValue FromInit(Iter begin, Iter end) + { + auto like_object_item = [](const InitValue &v) { + return v.is_list() && + v.list().size() == 2 && + v.list()[0].is_value() && + v.list()[0].value().type() == jv_string; + }; + if (std::all_of(begin, end, like_object_item)) { + return MakeObject(begin, end); + } else { + return MakeArray(begin, end); + } + } + static JValue FromInit(const InitValue &ji) + { + if (ji.is_value()) { + return std::move(ji.value()); + } else { + return FromInit(ji.list().begin(), ji.list().end()); + } + } + +public: + struct DumpOptions { + int indent_step_ = 0; + int indent_level_ = 0; + bool escape_unicode_ = false; + }; + JValue() : + data_(object_type()) { static_assert(sizeof(JValue) == sizeof(data_type), "JValue should simple wrap data_type with no other fields."); } +#define SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \ + JValue(decl) : data_(expr) {} \ + JValue &operator=(decl) \ + { \ + data_ = (expr); \ + return *this; \ + } + // operator=() seems faster than JValue(exer).swap(data_). + // copy + SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(const JValue &v, v.data_) // self; +#define MAP_TYPE_TO_JSON_TYPE(decl, expr) \ +public: \ + SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \ +private: \ + static auto JVTypeAccept(decl)->std::remove_reference<decltype(expr)>::type +#define MAP_TYPE_TO_JSON_TYPE_BIDIR(decl, expr) \ + MAP_TYPE_TO_JSON_TYPE(decl, expr); \ + \ +private: \ + static auto JVTypeReturn(decl)->std::remove_reference<decltype(expr)>::type + MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::nullptr_t, nullptr); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const short v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned short v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const int v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned int v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const long v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const long long v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long long v, int_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const float v, float_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const double v, float_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const long double v, float_type(v)); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const bool v, v); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::string &v, v); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const object_type &v, v); + MAP_TYPE_TO_JSON_TYPE_BIDIR(const array_type &v, v); + MAP_TYPE_TO_JSON_TYPE(const char *v, std::string(v)); +#undef MAP_TYPE_TO_JSON_TYPE_BIDIR +#undef MAP_TYPE_TO_JSON_TYPE + template <class T> + static void JVTypeAccept(T); // use void to disable other types. + template <class T> + static void JVTypeReturn(T); // use void to disable other types. +public: + // move + SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(JValue &&v, std::move(v.data_)) // self + SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(std::string &&v, std::move(v)) // and other classes + SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(object_type &&v, std::move(v)) + SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(array_type &&v, std::move(v)) +#undef SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT + + static JValue Object() + { + return (object_type()); + } + static JValue Object(InitList l) { return (MakeObject(l.begin(), l.end())); } + static JValue Array() { return (array_type()); } + static JValue Array(InitList l) { return (MakeArray(l.begin(), l.end())); } + JValue(InitList l) : + JValue(FromInit(l.begin(), l.end())) {} + JValue(const InitValue &ji) : + JValue(FromInit(ji)) {} + JVType type() const { return static_cast<JVType>(data_.which()); } + + std::string dump(const DumpOptions &opt_in) const; + std::string dump(const int indent_step = 0, const int indent_level = 0, const bool escape_unicode = false) const + { + DumpOptions opt; + opt.indent_step_ = indent_step; + opt.indent_level_ = indent_level; + opt.escape_unicode_ = escape_unicode; + return dump(opt); + } + bool parse(const std::string &s) { return parse(s.c_str(), s.c_str() + s.size()) > 0; } + int parse(const char *begin, const char *end); + void swap(JValue &a) { data_.swap(a.data_); } + + template <class T> + T get_value() const { return val<decltype(JVTypeReturn(T()))>(); } + template <class T> + void put_value(T &&v) { *this = v; } + const object_type &object() const { return val<object_type>(); } + object_type &object() { return val<object_type>(); } + const array_type &array() const { return val<array_type>(); } + array_type &array() { return val<array_type>(); } + bool is_object() const { return type() == jv_object; } + bool is_array() const { return type() == jv_array; } + + // array ops + const value_type &operator[](const size_t idx) const { return array()[idx]; } + value_type &operator[](const size_t idx) { return array()[idx]; } + const value_type &at(const size_t idx) const { return array().at(idx); } + value_type &at(const size_t idx) { return array().at(idx); } + void push_back(const value_type &v) { array().push_back(v); } + void push_back(value_type &&v) { array().push_back(std::move(v)); } + + // object ops + value_type &operator[](const key_type &key) { return object()[key]; } + const value_type &at(const key_type &key) const { return object().at(key); } + value_type &at(const key_type &key) { return object().at(key); } + const value_type &child(const key_type &path) const { return ref_child(path); } + value_type &child(const key_type &path) { return const_cast<value_type &>(ref_child(path)); } + template <class T> + T get(const key_type &path) const { return child(path).val<decltype(JVTypeReturn(T()))>(); } + template <class T> + auto get(const key_type &path, const T &def) const -> typename std::remove_reference<decltype(JVTypeAccept(def))>::type + { + try { + return get<decltype(JVTypeAccept(def))>(path); + } catch (...) { + return def; + } + } + value_type &put(pair_type &&elem) + { + const key_type &key = elem.first; + auto sep = key.find('.'); + if (sep == key_type::npos) { + auto r = object().lower_bound(elem.first); + if (r != object().end() && r->first == elem.first) { + r->second = std::move(elem.second); + return r->second; + } else { + return object().emplace_hint(r, std::move(elem))->second; + } + } else { + return (*this)[key.substr(0, sep)].put(pair_type(key.substr(sep + 1), std::move(elem.second))); + } + } + value_type &put(const pair_type &elem) + { + pair_type tmp(elem); + return put(std::move(tmp)); + } + value_type &put(const key_type &k, const value_type &v) { return put(pair_type(k, v)); } + value_type &put(const key_type &k, value_type &&v) { return put(pair_type(k, std::move(v))); } + value_type &put(key_type &&k, const value_type &v) { return put(pair_type(std::move(k), v)); } + value_type &put(key_type &&k, value_type &&v) { return put(pair_type(std::move(k), std::move(v))); } + +private: + template <class C> + const C &ref_val() const + { + try { + return boost::get<C>(data_); + } catch (std::exception &) { + throw std::runtime_error("json get error, " + name() + " is not a " + JValue(C()).name()); + } + } + const value_type &ref_child(const key_type &path) const + { + auto sep = path.find('.'); + if (sep == key_type::npos) { + return at(path); + } else { + return at(path.substr(0, sep)).ref_child(path.substr(sep + 1)); + } + } + data_type data_; +}; + +typedef JValue Json; + +} // namespace ssjson + +#endif // end of include guard: JSON_B360EKF1 diff --git a/box/node_center.cpp b/box/node_center.cpp index b970d44..cefb138 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -350,44 +350,48 @@ return MakeReply(eSuccess); }); } -MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) + +MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id) { typedef MsgQueryProcReply Reply; - auto query = [&](Node self) -> Reply { - auto Add1 = [](Reply &reply, Node node) { - auto info = reply.add_proc_list(); - *info->mutable_proc() = node->proc_; - info->set_online(node->state_.flag_ == kStateNormal); - for (auto &addr_topics : node->services_) { - for (auto &topic : addr_topics.second) { - info->mutable_topics()->add_topic_list(topic); - } + auto Add1 = [](Reply &reply, Node node) { + auto info = reply.add_proc_list(); + *info->mutable_proc() = node->proc_; + info->mutable_proc()->clear_private_info(); + info->set_online(node->state_.flag_ == kStateNormal); + for (auto &addr_topics : node->services_) { + for (auto &topic : addr_topics.second) { + info->mutable_topics()->add_topic_list(topic); } - }; - - if (!req.proc_id().empty()) { - auto pos = online_node_addr_map_.find(req.proc_id()); - if (pos == online_node_addr_map_.end()) { - return MakeReply<Reply>(eNotFound, "proc not found."); - } else { - auto node_pos = nodes_.find(pos->second); - if (node_pos == nodes_.end()) { - return MakeReply<Reply>(eNotFound, "proc node not found."); - } else { - auto reply = MakeReply<Reply>(eSuccess); - Add1(reply, node_pos->second); - return reply; - } - } - } else { - Reply reply(MakeReply<Reply>(eSuccess)); - for (auto &kv : nodes_) { - Add1(reply, kv.second); - } - return reply; } }; + if (!proc_id.empty()) { + auto pos = online_node_addr_map_.find(proc_id); + if (pos == online_node_addr_map_.end()) { + return MakeReply<Reply>(eNotFound, "proc not found."); + } else { + auto node_pos = nodes_.find(pos->second); + if (node_pos == nodes_.end()) { + return MakeReply<Reply>(eNotFound, "proc node not found."); + } else { + auto reply = MakeReply<Reply>(eSuccess); + Add1(reply, node_pos->second); + return reply; + } + } + } else { + Reply reply(MakeReply<Reply>(eSuccess)); + for (auto &kv : nodes_) { + Add1(reply, kv.second); + } + return reply; + } +} +MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) +{ + typedef MsgQueryProcReply Reply; + auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); }; return HandleMsg<Reply>(head, query); } diff --git a/box/node_center.h b/box/node_center.h index b9a01b3..4663bee 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -159,6 +159,7 @@ MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg); MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req); + MsgQueryProcReply QueryProc(const std::string &proc_id); MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 33adf91..e597533 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -169,6 +169,20 @@ // printf("register topic : %s\n", r ? "ok" : "failed"); // Sleep(1s); } + auto PrintProcs = [](MsgQueryProcReply const &result) { + printf("query proc result: %d\n", result.proc_list().size()); + for (int i = 0; i < result.proc_list().size(); ++i) { + auto &info = result.proc_list(i); + printf("proc [%d] %s, %s, %s\n\ttopics\n", i, + (info.online() ? "online" : "offline"), + info.proc().proc_id().c_str(), info.proc().name().c_str()); + for (auto &t : info.topics().topic_list()) { + printf("\t\t %s\n", t.c_str()); + } + printf("\n"); + } + printf("\n"); + }; { // query procs std::string dest(BHAddress().SerializeAsString()); @@ -180,22 +194,45 @@ DEFER1(BHFree(reply, reply_len)); MsgQueryProcReply result; if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) { - printf("query proc result: %d\n", result.proc_list().size()); - for (int i = 0; i < result.proc_list().size(); ++i) { - auto &info = result.proc_list(i); - printf("proc [%d] %s, %s, %s\n\ttopics\n", i, - (info.online() ? "online" : "offline"), - info.proc().proc_id().c_str(), info.proc().name().c_str()); - for (auto &t : info.topics().topic_list()) { - printf("\t\t %s", t.c_str()); - } - } + PrintProcs(result); } else { printf("query proc error\n"); } // printf("register topic : %s\n", r ? "ok" : "failed"); // Sleep(1s); } + { + // query procs with normal topic request + MsgRequestTopic req; + req.set_topic("@center_query_procs"); + std::string s(req.SerializeAsString()); + // Sleep(10ms, false); + std::string dest(BHAddress().SerializeAsString()); + void *proc_id = 0; + int proc_id_len = 0; + DEFER1(BHFree(proc_id, proc_id_len);); + void *reply = 0; + int reply_len = 0; + DEFER1(BHFree(reply, reply_len)); + bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100); + if (!r) { + int ec = 0; + std::string msg; + GetLastError(ec, msg); + printf("topic query proc error: %s\n", msg.c_str()); + } else { + MsgRequestTopicReply ret; + ret.ParseFromArray(reply, reply_len); + printf("topic query proc : %s\n", ret.data().c_str()); + // MsgQueryProcReply result; + // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) { + // PrintProcs(result); + // } else { + // printf("topic query proc error\n"); + // } + } + } + // return; { // Subscribe MsgTopicList topics; @@ -335,9 +372,9 @@ threads.Launch(hb, &run); threads.Launch(showStatus, &run); int ncli = 10; - const int64_t nreq = 1000 * 100; + const int64_t nreq = 1; //000 * 100; - for (int i = 0; i < 100; ++i) { + for (int i = 0; i < 10; ++i) { SyncRequest(i); } -- Gitblit v1.8.0