From 0e31f38fc37216e1376d8101d1bcf7a3779279dc Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 20 五月 2021 15:29:16 +0800
Subject: [PATCH] add center topic node.
---
utest/api_test.cpp | 61 ++
box/center_topic_node.cpp | 117 ++++++
box/json.h | 277 ++++++++++++++++
box/center.cpp | 7
box/node_center.h | 1
box/center.h | 4
box/node_center.cpp | 68 ++-
box/center_topic_node.h | 46 ++
box/json.cpp | 380 ++++++++++++++++++++++
9 files changed, 915 insertions(+), 46 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 0f36719..e745be8 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -16,6 +16,7 @@
* =====================================================================================
*/
#include "center.h"
+#include "center_topic_node.h"
#include "node_center.h"
#include <chrono>
@@ -185,7 +186,10 @@
auto &info = kv.second;
sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
}
+
+ topic_node_.reset(new CenterTopicNode(center_ptr, shm));
}
+BHCenter::~BHCenter() { Stop(); }
bool BHCenter::Start()
{
@@ -193,12 +197,13 @@
auto &info = kv.second;
sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
}
-
+ topic_node_->Start();
return true;
}
bool BHCenter::Stop()
{
+ topic_node_->Stop();
for (auto &kv : sockets_) {
kv.second->Stop();
}
diff --git a/box/center.h b/box/center.h
index ad0ac4f..6610277 100644
--- a/box/center.h
+++ b/box/center.h
@@ -22,6 +22,7 @@
#include <functional>
#include <map>
#include <memory>
+class CenterTopicNode;
class BHCenter
{
@@ -34,7 +35,7 @@
static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len);
BHCenter(Socket::Shm &shm);
- ~BHCenter() { Stop(); }
+ ~BHCenter();
bool Start();
bool Stop();
@@ -51,6 +52,7 @@
static CenterRecords &Centers();
std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
+ std::unique_ptr<CenterTopicNode> topic_node_;
};
#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
new file mode 100644
index 0000000..82b38ca
--- /dev/null
+++ b/box/center_topic_node.cpp
@@ -0,0 +1,117 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: center_topic_node.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�20鏃� 12鏃�44鍒�31绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "center_topic_node.h"
+#include "node_center.h"
+#include "topic_node.h"
+
+#include "json.h"
+#include <chrono>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+using namespace bhome_shm;
+using namespace ssjson;
+
+namespace
+{
+const std::string &kTopicQueryProc = "@center_query_procs";
+
+std::string ToJson(const MsgQueryProcReply &qpr)
+{
+ Json json;
+ json.put("procCount", qpr.proc_list_size());
+ auto &list = json.put("procList", Json::Array());
+ // Json list = Json::Array();
+ for (auto &info : qpr.proc_list()) {
+ Json proc;
+ proc.put("id", info.proc().proc_id());
+ proc.put("name", info.proc().name());
+ proc.put("publicInfo", info.proc().public_info());
+ proc.put("online", info.online());
+ Json topics = Json::Array();
+ for (auto &t : info.topics().topic_list()) {
+ topics.push_back(t);
+ }
+ proc.put("topics", topics);
+ list.push_back(proc);
+ }
+ return json.dump(0);
+}
+
+} // namespace
+
+CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
+ pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {}
+
+CenterTopicNode::~CenterTopicNode() { Stop(); }
+
+void CenterTopicNode::Stop()
+{
+ bool cur = true;
+ if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) {
+ worker_.join();
+ pnode_->Stop();
+ }
+}
+
+bool CenterTopicNode::Start()
+{
+ Stop();
+
+ int timeout = 3000;
+ MsgCommonReply reply;
+
+ ProcInfo info;
+ info.set_proc_id("#center.node");
+ info.set_name("center node");
+ if (!pnode_->Register(info, reply, timeout)) {
+ throw std::runtime_error("center node register failed.");
+ }
+
+ MsgTopicList topics;
+ topics.add_topic_list(kTopicQueryProc);
+ if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) {
+ throw std::runtime_error("center node register topics failed.");
+ }
+
+ auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
+ auto reply = MakeReply<MsgRequestTopicReply>(eSuccess);
+ if (request.topic() == kTopicQueryProc) {
+ auto data = (*pscenter_)->QueryProc(request.data());
+ *reply.mutable_errmsg() = data.errmsg();
+ reply.set_data(ToJson(data));
+ } else {
+ SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic());
+ }
+ pnode_->ServerSendReply(src_info, reply);
+ };
+
+ bool cur = false;
+ if (run_.compare_exchange_strong(cur, true)) {
+ auto heartbeat = [this]() {
+ while (run_) {
+ pnode_->Heartbeat(1000);
+ std::this_thread::sleep_for(1s);
+ }
+ };
+ std::thread(heartbeat).swap(worker_);
+ return pnode_->ServerStart(onRequest);
+ } else {
+ return false;
+ }
+}
diff --git a/box/center_topic_node.h b/box/center_topic_node.h
new file mode 100644
index 0000000..a29860e
--- /dev/null
+++ b/box/center_topic_node.h
@@ -0,0 +1,46 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: center_topic_node.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�05鏈�20鏃� 12鏃�44鍒�42绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef CENTER_TOPIC_NODE_YCI0P9KC
+#define CENTER_TOPIC_NODE_YCI0P9KC
+
+#include "bh_util.h"
+#include "shm.h"
+#include <atomic>
+#include <memory>
+#include <thread>
+
+class TopicNode;
+class NodeCenter;
+// center as a topic node.
+class CenterTopicNode
+{
+public:
+ typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+ CenterTopicNode(CenterPtr center, bhome_shm::SharedMemory &shm);
+ ~CenterTopicNode();
+ bool Start();
+ void Stop();
+
+private:
+ CenterPtr pscenter_;
+ std::unique_ptr<TopicNode> pnode_;
+ std::thread worker_;
+ std::atomic<bool> run_;
+};
+
+#endif // end of include guard: CENTER_TOPIC_NODE_YCI0P9KC
diff --git a/box/json.cpp b/box/json.cpp
new file mode 100755
index 0000000..0ad2e49
--- /dev/null
+++ b/box/json.cpp
@@ -0,0 +1,380 @@
+#include "json.h"
+
+namespace ssjson {
+
+namespace _impl {
+
+ typedef std::string Buffer;
+
+ template <class Buf, class ...T>
+ inline void DumpNumber(Buf &res, const char *fmt, T&&...val) {
+ char buf[64] = {0};
+ int n = snprintf(buf, sizeof(buf)-1, fmt, val...);
+ res.append(buf, n);
+ }
+ inline void DumpFloat(Buffer &res, const long double fval) { DumpNumber(res, "%.*Lg", std::numeric_limits<long double>::digits10, fval); }
+ inline void DumpInt(Buffer &res, const long long ival) { DumpNumber(res, "%lld", ival); }
+// inline void DumpFloat(Buffer &res, const double fval) { DumpNumber(res, "%.*g", std::numeric_limits<double>::digits10, fval); }
+// inline void DumpInt(Buffer &res, const long ival) { DumpNumber(res, "%ld", ival); }
+
+ // assume utf8 string.
+ void DumpString(Buffer &res, const std::string &s, const bool escape_unicode) {
+ res += '\"';
+
+ auto EscapeSpecialChar = [&](const int c) { // ( \b, \f, \t, \n, \r, \", \\, / ).
+ const char esc_table[] = {
+ 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 'b', 't',
+ 'n', 0, 'f','r', 0 , 0, 0, 0 , 0 , 0 ,
+ 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 ,
+ 0 , 0, 0 , 0 ,'\"', 0, 0, 0 , 0 , 0 ,
+ 0 , 0, 0 , 0 , 0 , 0, 0, '/', 0 , 0 ,
+ 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 ,
+ 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 ,
+ 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 ,
+ 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 ,
+ 0 , 0,'\\', };
+ res += '\\';
+ res += esc_table[c];
+ };
+ auto AddUnicode = [&](unsigned u) {
+ auto HexChar = [](unsigned char uc) { return "0123456789abcdef"[uc & 0xF]; };
+ char buf[6] = { '\\', 'u', HexChar(u>>12), HexChar(u>>8), HexChar(u>>4), HexChar(u) };
+ res.append(buf, buf+6);
+ };
+ auto IsControl = [](const unsigned char c) { return c < 0x20 || c == 0x7F; };
+
+ if (escape_unicode) {
+ for (unsigned i = 0; i < s.size(); ++i) {
+ switch(s[i]) {
+ case '\b': case '\f': case '\t': case '\n': case '\r':
+ case '\"': case '\\': case '/' :
+ EscapeSpecialChar(s[i]);
+ break;
+ default :
+ {
+ const unsigned char &uc = (unsigned char &)s[i];
+ const unsigned char *sz = &uc;
+ if (IsControl(uc)) { // control characters
+ AddUnicode(uc);
+ } else if (uc < 0x80) {
+ res += s[i];
+ } else if (uc < 0xE0) { // 2 bytes
+ AddUnicode(((sz[0]&0x1F)<<6) | (sz[1] & 0x3F));
+ i += 1;
+ } else if (uc < 0xF0) { // 3 bytes
+ AddUnicode(((sz[0]&0x0F)<<12) | ((sz[1]&0x3F)<<6) | (sz[2] & 0x3F));
+ i += 2;
+ } else {
+ res += s[i]; // exceed 0xFFFF.
+ }
+ } break;
+ }
+ }
+ } else {
+ for (unsigned i = 0; i < s.size(); ++i) {
+ switch(s[i]) {
+ case '\b': case '\f': case '\t': case '\n': case '\r':
+ case '\"': case '\\': case '/' :
+ EscapeSpecialChar(s[i]);
+ break;
+ default :
+ if (IsControl(s[i])) { // control characters
+ AddUnicode(s[i]);
+ } else {
+ res += s[i];
+ }
+ break;
+ }
+ }
+ }
+ res += '\"';
+ };
+
+ void Dump(const JValue &jv, Buffer &res, JValue::DumpOptions &opt, const bool pretty)
+ {
+ auto NewLine = [&]() {
+ res += '\n';
+ res.append(opt.indent_level_ * opt.indent_step_, ' ');
+ };
+
+ switch (jv.type()) {
+ case JValue::jv_object:
+ {
+ res += '{';
+ auto &obj = jv.object();
+ if (!obj.empty()) {
+ if (pretty) {
+ opt.indent_level_++;
+ for (auto &&kv : obj) {
+ NewLine();
+ DumpString(res, kv.first, opt.escape_unicode_);
+ res.append(": ", 2);
+ Dump(kv.second, res, opt, pretty);
+ res += ',';
+ }
+ res.pop_back(); // remove last ',';
+ opt.indent_level_--;
+ NewLine();
+ } else {
+ for (auto &&kv : obj) {
+ DumpString(res, kv.first, opt.escape_unicode_);
+ res += ':';
+ Dump(kv.second, res, opt, pretty);
+ res += ',';
+ }
+ res.pop_back(); // remove last ',';
+ }
+ }
+ res += '}';
+ } break;
+ case JValue::jv_array:
+ {
+ res += '[';
+ auto &arr = jv.array();
+ if (!arr.empty()) {
+ if (pretty) {
+ opt.indent_level_++;
+ for (auto &&v : arr) {
+ NewLine();
+ Dump(v, res, opt, pretty);
+ res += ',';
+ }
+ res.pop_back(); // remove last ',';
+ opt.indent_level_--;
+ NewLine();
+ } else {
+ for (auto &&v : arr) {
+ Dump(v, res, opt, pretty);
+ res += ',';
+ }
+ res.pop_back(); // remove last ',';
+ }
+ }
+ res += ']';
+ } break;
+ case JValue::jv_float:
+ {
+ auto idx = res.size();
+ DumpFloat(res, jv.get_value<JValue::float_type>());
+ while (idx < res.size() && res[idx] != '.') {
+ ++idx;
+ }
+ if (idx == res.size()) {
+ res.append(".0", 2);
+ }
+ } break;
+ case JValue::jv_null: res.append("null", 4); break;
+ case JValue::jv_int: DumpInt(res, jv.get_value<JValue::int_type>()); break;
+ case JValue::jv_bool:
+ if (jv.get_value<bool>()) {
+ res.append("true", 4);
+ } else {
+ res.append("false", 5);
+ }
+ break;
+ case JValue::jv_string: DumpString(res, jv.get_value<std::string>(), opt.escape_unicode_); break;
+ default: std::invalid_argument("invalid json type:" + std::to_string(jv.type())); break;
+ }
+ }
+
+ auto HexCharVal = [](const unsigned char c) {
+ static const unsigned char hex_char_table[] = {
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 10, 11, 12, 13, 14, 15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 10, 11, 12, 13, 14, 15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+ };
+ return hex_char_table[c];
+ };
+} // namespace _impl
+
+ int JValue::parse(const char *begin, const char *end)
+ {
+ using namespace _impl;
+ const char *p = begin;
+
+ auto Error = [&](const std::string &msg = "invalid value at"){ throw std::invalid_argument(msg + " : "+ std::string(p, std::min(int(end-p), 30))); };
+ auto IsSpace = [](const char c) {
+ return c == ' ' || c == '\t' || c == '\r' || c == '\n';
+ };
+ auto IgnoreSpaces = [&]() { while (p != end && IsSpace(*p)) { ++p; } };
+ auto Peek = [&]() { IgnoreSpaces(); if (p == end) { Error("input ends unexpectedly!"); } return *p; };
+ auto Expect = [&](const char c) { if (Peek() != c) { Error(std::string("parse error, expecting ") + c); } };
+
+ auto parse_string = [&]() {
+ std::string res;
+ auto to_utf8 = [&]() {
+ auto Hex4 = [](const char *sz) {
+ auto FromHex = [](const unsigned char c)->unsigned char {
+ const unsigned char v = HexCharVal(c);
+ if (v == 0xFF) {
+ throw std::invalid_argument(std::string("invalid unicode input ") + char(c));
+ }
+ return v;
+ };
+ return (FromHex(sz[0]) << 12) | (FromHex(sz[1]) << 8) | (FromHex(sz[2]) << 4) | (FromHex(sz[3]));
+ };
+ const unsigned int u = Hex4(p+2);
+ p += 4;
+ if (u < 0x80) {
+ res += char(u);
+ } else if (u < 0x800) {
+ res += char((u >> 6) | 0xC0);
+ res += char((u & 0x3F) | 0x80);
+ } else if (u < 0x10000) {
+ res += char((u >> 12) | 0xE0);
+ res += char(((u >> 6)& 0x3F) | 0x80);
+ res += char((u & 0x3F) | 0x80);
+ } else if (u < 0x110000) {
+ res += char((u >> 18) | 0xF0);
+ res += char(((u >> 12)& 0x3F) | 0x80);
+ res += char(((u >> 6)& 0x3F) | 0x80);
+ res += char((u & 0x3F) | 0x80);
+ } else {
+ Error("invalid unicode codepoint");
+ }
+ };
+ auto ParseEscapedChar = [&](const int c) {
+ const char unesc_table[] = {
+ 0 , '\b', 0 , 0 , 0 , '\f', 0 ,
+ 0 , 0 , 0 , 0 , 0 , 0 , '\n',
+ 0 , 0 , 0 , '\r', 0 , '\t', };
+ res += unesc_table[c-'a'];
+ };
+ unsigned left = 0;
+ auto AddLeft = [&] { if (left != 0) { res.append(p-left, left); left = 0; } };
+ while (++p != end) {
+ switch(*p) {
+ case '\"':
+ AddLeft();
+ ++p;
+ return res;
+ case '\\':
+ AddLeft();
+ if (p+1 != end) {
+ switch (p[1]) {
+ case 'b' : case 'f' : case 'n' : case 'r' : case 't' : ParseEscapedChar(p[1]); break;
+ case 'u' : to_utf8(); break;
+ case '\"': case '\\': case '/' : ++left; break;
+ default: left += 2; break;
+ }
+ ++p; // parsed 2 char.
+ } // else, next round will fail.
+ break;
+ // and escaped char should not appear.
+ case '\t': case '\n': case '\r': case '\b': case '\f': //case '/' :
+ AddLeft();
+ Error("invalid string content") ;
+ break;
+ default: ++left; break;
+ }
+ }
+ Error("string ends unexpectedly!");
+ return res;
+ };
+
+ auto parse_object = [&]() {
+ assert(*p == '{');
+ ++p;
+ if (Peek() == '}') {
+ ++p;
+ return object_type();
+ }
+ object_type obj;
+ while (true) {
+ Expect('\"');
+ std::string key(parse_string());
+
+ Expect(':');
+ ++p;
+
+ JValue v;
+ p += v.parse(p, end);
+ obj.emplace(std::move(key), std::move(v));
+
+ switch (Peek()) {
+ case ',': ++p; break;
+ case '}': ++p; return obj;
+ default: Error("parse object error");
+ }
+ }
+ };
+ auto parse_array = [&]() {
+ assert(*p == '[');
+ ++p;
+ if (Peek() == ']') {
+ ++p;
+ return array_type();
+ }
+ array_type arr;
+ while (true) {
+ JValue v;
+ p += v.parse(p, end);
+ arr.emplace_back(std::move(v));
+
+ switch (Peek()) {
+ case ',': ++p; break;
+ case ']': ++p; return arr;
+ default: Error("parse array error");
+ }
+ }
+ };
+
+ auto ExpectText = [&](const char *sz, const size_t len) {
+ if(memcmp(p+1, sz+1, len-1) == 0) {
+ p += len;
+ } else {
+ Error("parse literal error, expecting '" + std::string(sz, len) + "', read " + std::string(p, len).c_str());
+ }
+ };
+
+ switch (Peek()) {
+ case '{': *this = parse_object(); break;
+ case '[': *this = parse_array(); break;
+ case '\"': *this = parse_string(); break;
+ case 't': ExpectText("true", 4); *this = true; break;
+ case 'f': ExpectText("false", 5); *this = false; break;
+ case 'n': ExpectText("null", 4); *this = nullptr; break;
+ case '0': case '1': case '2': case '3': case '4':
+ case '5': case '6': case '7': case '8': case '9': case '-':
+ {
+ auto IsNumber = [](const char c) { return '0' <= c && c <= '9'; };
+ auto sep = p+1;
+ while (sep != end && IsNumber(*sep)) { ++sep; }
+ char *parse_end = (char*)p;
+ switch (*sep) {
+ case '.' : case 'e' : case 'E': *this = strtold((char*)p, &parse_end); break;
+ default: *this = strtoll((char*)p, &parse_end, 10); break;
+ }
+ p = parse_end;
+ } break;
+ default : Error(); break;
+ }
+ return p - begin;
+ }
+
+ std::string JValue::dump(const DumpOptions &opt_in) const {
+ using namespace _impl;
+ Buffer res;
+ DumpOptions opt(opt_in);
+ if(opt.indent_step_ > 0) {
+ res.append(opt.indent_level_ * opt.indent_step_, ' ');
+ }
+ Dump(*this, res, opt, opt.indent_step_ > 0);
+ return res;
+ }
+} // jsonpp
+
diff --git a/box/json.h b/box/json.h
new file mode 100755
index 0000000..461f0e9
--- /dev/null
+++ b/box/json.h
@@ -0,0 +1,277 @@
+#ifndef JSON_B360EKF1
+#define JSON_B360EKF1
+
+#include <algorithm>
+#include <boost/variant.hpp>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace ssjson
+{
+
+class JValue
+{
+public:
+ typedef std::string key_type;
+ typedef long long int_type;
+ typedef long double float_type;
+ typedef JValue value_type;
+ typedef std::map<key_type, value_type> object_type;
+ typedef object_type::value_type pair_type;
+ typedef std::vector<value_type> array_type;
+
+ enum JVType { jv_null,
+ jv_int,
+ jv_float,
+ jv_bool,
+ jv_string,
+ jv_object,
+ jv_array };
+ std::string name() const
+ {
+ static const char *names[] = {"jv_null", "jv_int", "jv_float", "jv_bool", "jv_string", "jv_object", "jv_array"};
+ return names[type()];
+ }
+
+private:
+ typedef boost::variant<std::nullptr_t, int_type, float_type, bool, std::string, object_type, array_type> data_type;
+
+ template <class C>
+ const C &val() const { return ref_val<C>(); }
+ template <class C>
+ C &val() { return const_cast<C &>(ref_val<C>()); }
+
+ template <class C>
+ class TInitValue
+ {
+ typedef C value_t;
+ typedef std::vector<TInitValue> list_t;
+ mutable boost::variant<value_t, list_t> data_;
+
+ public:
+ template <class T>
+ TInitValue(const T &t) :
+ data_(value_t(t)) {}
+ template <class T>
+ TInitValue(T &&t) :
+ data_(value_t(std::move(t))) {}
+ TInitValue(std::initializer_list<TInitValue> l) :
+ data_(list_t(l)){};
+ bool is_value() const { return data_.which() == 0; }
+ bool is_list() const { return data_.which() == 1; }
+ const list_t &list() const { return boost::get<list_t>(data_); }
+ const value_t &value() const { return boost::get<value_t>(data_); }
+ };
+ typedef TInitValue<JValue> InitValue;
+ typedef std::initializer_list<InitValue> InitList;
+
+ template <class Iter>
+ static object_type MakeObject(Iter begin, Iter end)
+ {
+ object_type obj;
+ for (auto it = begin; it != end; ++it) {
+ obj.emplace(std::move(it->list()[0].value().template val<key_type>()), (it->list()[1]));
+ }
+ return obj;
+ }
+ template <class Iter>
+ static array_type MakeArray(Iter begin, Iter end) { return array_type(begin, end); }
+ template <class Iter>
+ static JValue FromInit(Iter begin, Iter end)
+ {
+ auto like_object_item = [](const InitValue &v) {
+ return v.is_list() &&
+ v.list().size() == 2 &&
+ v.list()[0].is_value() &&
+ v.list()[0].value().type() == jv_string;
+ };
+ if (std::all_of(begin, end, like_object_item)) {
+ return MakeObject(begin, end);
+ } else {
+ return MakeArray(begin, end);
+ }
+ }
+ static JValue FromInit(const InitValue &ji)
+ {
+ if (ji.is_value()) {
+ return std::move(ji.value());
+ } else {
+ return FromInit(ji.list().begin(), ji.list().end());
+ }
+ }
+
+public:
+ struct DumpOptions {
+ int indent_step_ = 0;
+ int indent_level_ = 0;
+ bool escape_unicode_ = false;
+ };
+ JValue() :
+ data_(object_type()) { static_assert(sizeof(JValue) == sizeof(data_type), "JValue should simple wrap data_type with no other fields."); }
+#define SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \
+ JValue(decl) : data_(expr) {} \
+ JValue &operator=(decl) \
+ { \
+ data_ = (expr); \
+ return *this; \
+ }
+ // operator=() seems faster than JValue(exer).swap(data_).
+ // copy
+ SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(const JValue &v, v.data_) // self;
+#define MAP_TYPE_TO_JSON_TYPE(decl, expr) \
+public: \
+ SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \
+private: \
+ static auto JVTypeAccept(decl)->std::remove_reference<decltype(expr)>::type
+#define MAP_TYPE_TO_JSON_TYPE_BIDIR(decl, expr) \
+ MAP_TYPE_TO_JSON_TYPE(decl, expr); \
+ \
+private: \
+ static auto JVTypeReturn(decl)->std::remove_reference<decltype(expr)>::type
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::nullptr_t, nullptr);
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const short v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned short v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const int v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned int v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const long v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const long long v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long long v, int_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const float v, float_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const double v, float_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const long double v, float_type(v));
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const bool v, v);
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::string &v, v);
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const object_type &v, v);
+ MAP_TYPE_TO_JSON_TYPE_BIDIR(const array_type &v, v);
+ MAP_TYPE_TO_JSON_TYPE(const char *v, std::string(v));
+#undef MAP_TYPE_TO_JSON_TYPE_BIDIR
+#undef MAP_TYPE_TO_JSON_TYPE
+ template <class T>
+ static void JVTypeAccept(T); // use void to disable other types.
+ template <class T>
+ static void JVTypeReturn(T); // use void to disable other types.
+public:
+ // move
+ SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(JValue &&v, std::move(v.data_)) // self
+ SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(std::string &&v, std::move(v)) // and other classes
+ SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(object_type &&v, std::move(v))
+ SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(array_type &&v, std::move(v))
+#undef SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT
+
+ static JValue Object()
+ {
+ return (object_type());
+ }
+ static JValue Object(InitList l) { return (MakeObject(l.begin(), l.end())); }
+ static JValue Array() { return (array_type()); }
+ static JValue Array(InitList l) { return (MakeArray(l.begin(), l.end())); }
+ JValue(InitList l) :
+ JValue(FromInit(l.begin(), l.end())) {}
+ JValue(const InitValue &ji) :
+ JValue(FromInit(ji)) {}
+ JVType type() const { return static_cast<JVType>(data_.which()); }
+
+ std::string dump(const DumpOptions &opt_in) const;
+ std::string dump(const int indent_step = 0, const int indent_level = 0, const bool escape_unicode = false) const
+ {
+ DumpOptions opt;
+ opt.indent_step_ = indent_step;
+ opt.indent_level_ = indent_level;
+ opt.escape_unicode_ = escape_unicode;
+ return dump(opt);
+ }
+ bool parse(const std::string &s) { return parse(s.c_str(), s.c_str() + s.size()) > 0; }
+ int parse(const char *begin, const char *end);
+ void swap(JValue &a) { data_.swap(a.data_); }
+
+ template <class T>
+ T get_value() const { return val<decltype(JVTypeReturn(T()))>(); }
+ template <class T>
+ void put_value(T &&v) { *this = v; }
+ const object_type &object() const { return val<object_type>(); }
+ object_type &object() { return val<object_type>(); }
+ const array_type &array() const { return val<array_type>(); }
+ array_type &array() { return val<array_type>(); }
+ bool is_object() const { return type() == jv_object; }
+ bool is_array() const { return type() == jv_array; }
+
+ // array ops
+ const value_type &operator[](const size_t idx) const { return array()[idx]; }
+ value_type &operator[](const size_t idx) { return array()[idx]; }
+ const value_type &at(const size_t idx) const { return array().at(idx); }
+ value_type &at(const size_t idx) { return array().at(idx); }
+ void push_back(const value_type &v) { array().push_back(v); }
+ void push_back(value_type &&v) { array().push_back(std::move(v)); }
+
+ // object ops
+ value_type &operator[](const key_type &key) { return object()[key]; }
+ const value_type &at(const key_type &key) const { return object().at(key); }
+ value_type &at(const key_type &key) { return object().at(key); }
+ const value_type &child(const key_type &path) const { return ref_child(path); }
+ value_type &child(const key_type &path) { return const_cast<value_type &>(ref_child(path)); }
+ template <class T>
+ T get(const key_type &path) const { return child(path).val<decltype(JVTypeReturn(T()))>(); }
+ template <class T>
+ auto get(const key_type &path, const T &def) const -> typename std::remove_reference<decltype(JVTypeAccept(def))>::type
+ {
+ try {
+ return get<decltype(JVTypeAccept(def))>(path);
+ } catch (...) {
+ return def;
+ }
+ }
+ value_type &put(pair_type &&elem)
+ {
+ const key_type &key = elem.first;
+ auto sep = key.find('.');
+ if (sep == key_type::npos) {
+ auto r = object().lower_bound(elem.first);
+ if (r != object().end() && r->first == elem.first) {
+ r->second = std::move(elem.second);
+ return r->second;
+ } else {
+ return object().emplace_hint(r, std::move(elem))->second;
+ }
+ } else {
+ return (*this)[key.substr(0, sep)].put(pair_type(key.substr(sep + 1), std::move(elem.second)));
+ }
+ }
+ value_type &put(const pair_type &elem)
+ {
+ pair_type tmp(elem);
+ return put(std::move(tmp));
+ }
+ value_type &put(const key_type &k, const value_type &v) { return put(pair_type(k, v)); }
+ value_type &put(const key_type &k, value_type &&v) { return put(pair_type(k, std::move(v))); }
+ value_type &put(key_type &&k, const value_type &v) { return put(pair_type(std::move(k), v)); }
+ value_type &put(key_type &&k, value_type &&v) { return put(pair_type(std::move(k), std::move(v))); }
+
+private:
+ template <class C>
+ const C &ref_val() const
+ {
+ try {
+ return boost::get<C>(data_);
+ } catch (std::exception &) {
+ throw std::runtime_error("json get error, " + name() + " is not a " + JValue(C()).name());
+ }
+ }
+ const value_type &ref_child(const key_type &path) const
+ {
+ auto sep = path.find('.');
+ if (sep == key_type::npos) {
+ return at(path);
+ } else {
+ return at(path.substr(0, sep)).ref_child(path.substr(sep + 1));
+ }
+ }
+ data_type data_;
+};
+
+typedef JValue Json;
+
+} // namespace ssjson
+
+#endif // end of include guard: JSON_B360EKF1
diff --git a/box/node_center.cpp b/box/node_center.cpp
index b970d44..cefb138 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -350,44 +350,48 @@
return MakeReply(eSuccess);
});
}
-MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+
+MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id)
{
typedef MsgQueryProcReply Reply;
- auto query = [&](Node self) -> Reply {
- auto Add1 = [](Reply &reply, Node node) {
- auto info = reply.add_proc_list();
- *info->mutable_proc() = node->proc_;
- info->set_online(node->state_.flag_ == kStateNormal);
- for (auto &addr_topics : node->services_) {
- for (auto &topic : addr_topics.second) {
- info->mutable_topics()->add_topic_list(topic);
- }
+ auto Add1 = [](Reply &reply, Node node) {
+ auto info = reply.add_proc_list();
+ *info->mutable_proc() = node->proc_;
+ info->mutable_proc()->clear_private_info();
+ info->set_online(node->state_.flag_ == kStateNormal);
+ for (auto &addr_topics : node->services_) {
+ for (auto &topic : addr_topics.second) {
+ info->mutable_topics()->add_topic_list(topic);
}
- };
-
- if (!req.proc_id().empty()) {
- auto pos = online_node_addr_map_.find(req.proc_id());
- if (pos == online_node_addr_map_.end()) {
- return MakeReply<Reply>(eNotFound, "proc not found.");
- } else {
- auto node_pos = nodes_.find(pos->second);
- if (node_pos == nodes_.end()) {
- return MakeReply<Reply>(eNotFound, "proc node not found.");
- } else {
- auto reply = MakeReply<Reply>(eSuccess);
- Add1(reply, node_pos->second);
- return reply;
- }
- }
- } else {
- Reply reply(MakeReply<Reply>(eSuccess));
- for (auto &kv : nodes_) {
- Add1(reply, kv.second);
- }
- return reply;
}
};
+ if (!proc_id.empty()) {
+ auto pos = online_node_addr_map_.find(proc_id);
+ if (pos == online_node_addr_map_.end()) {
+ return MakeReply<Reply>(eNotFound, "proc not found.");
+ } else {
+ auto node_pos = nodes_.find(pos->second);
+ if (node_pos == nodes_.end()) {
+ return MakeReply<Reply>(eNotFound, "proc node not found.");
+ } else {
+ auto reply = MakeReply<Reply>(eSuccess);
+ Add1(reply, node_pos->second);
+ return reply;
+ }
+ }
+ } else {
+ Reply reply(MakeReply<Reply>(eSuccess));
+ for (auto &kv : nodes_) {
+ Add1(reply, kv.second);
+ }
+ return reply;
+ }
+}
+MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+{
+ typedef MsgQueryProcReply Reply;
+ auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); };
return HandleMsg<Reply>(head, query);
}
diff --git a/box/node_center.h b/box/node_center.h
index b9a01b3..4663bee 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -159,6 +159,7 @@
MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
+ MsgQueryProcReply QueryProc(const std::string &proc_id);
MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 33adf91..e597533 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -169,6 +169,20 @@
// printf("register topic : %s\n", r ? "ok" : "failed");
// Sleep(1s);
}
+ auto PrintProcs = [](MsgQueryProcReply const &result) {
+ printf("query proc result: %d\n", result.proc_list().size());
+ for (int i = 0; i < result.proc_list().size(); ++i) {
+ auto &info = result.proc_list(i);
+ printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
+ (info.online() ? "online" : "offline"),
+ info.proc().proc_id().c_str(), info.proc().name().c_str());
+ for (auto &t : info.topics().topic_list()) {
+ printf("\t\t %s\n", t.c_str());
+ }
+ printf("\n");
+ }
+ printf("\n");
+ };
{
// query procs
std::string dest(BHAddress().SerializeAsString());
@@ -180,22 +194,45 @@
DEFER1(BHFree(reply, reply_len));
MsgQueryProcReply result;
if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) {
- printf("query proc result: %d\n", result.proc_list().size());
- for (int i = 0; i < result.proc_list().size(); ++i) {
- auto &info = result.proc_list(i);
- printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
- (info.online() ? "online" : "offline"),
- info.proc().proc_id().c_str(), info.proc().name().c_str());
- for (auto &t : info.topics().topic_list()) {
- printf("\t\t %s", t.c_str());
- }
- }
+ PrintProcs(result);
} else {
printf("query proc error\n");
}
// printf("register topic : %s\n", r ? "ok" : "failed");
// Sleep(1s);
}
+ {
+ // query procs with normal topic request
+ MsgRequestTopic req;
+ req.set_topic("@center_query_procs");
+ std::string s(req.SerializeAsString());
+ // Sleep(10ms, false);
+ std::string dest(BHAddress().SerializeAsString());
+ void *proc_id = 0;
+ int proc_id_len = 0;
+ DEFER1(BHFree(proc_id, proc_id_len););
+ void *reply = 0;
+ int reply_len = 0;
+ DEFER1(BHFree(reply, reply_len));
+ bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100);
+ if (!r) {
+ int ec = 0;
+ std::string msg;
+ GetLastError(ec, msg);
+ printf("topic query proc error: %s\n", msg.c_str());
+ } else {
+ MsgRequestTopicReply ret;
+ ret.ParseFromArray(reply, reply_len);
+ printf("topic query proc : %s\n", ret.data().c_str());
+ // MsgQueryProcReply result;
+ // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) {
+ // PrintProcs(result);
+ // } else {
+ // printf("topic query proc error\n");
+ // }
+ }
+ }
+ // return;
{ // Subscribe
MsgTopicList topics;
@@ -335,9 +372,9 @@
threads.Launch(hb, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
- const int64_t nreq = 1000 * 100;
+ const int64_t nreq = 1; //000 * 100;
- for (int i = 0; i < 100; ++i) {
+ for (int i = 0; i < 10; ++i) {
SyncRequest(i);
}
--
Gitblit v1.8.0