From 0e31f38fc37216e1376d8101d1bcf7a3779279dc Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 20 五月 2021 15:29:16 +0800
Subject: [PATCH] add center topic node.

---
 utest/api_test.cpp        |   61 ++
 box/center_topic_node.cpp |  117 ++++++
 box/json.h                |  277 ++++++++++++++++
 box/center.cpp            |    7 
 box/node_center.h         |    1 
 box/center.h              |    4 
 box/node_center.cpp       |   68 ++-
 box/center_topic_node.h   |   46 ++
 box/json.cpp              |  380 ++++++++++++++++++++++
 9 files changed, 915 insertions(+), 46 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 0f36719..e745be8 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -16,6 +16,7 @@
  * =====================================================================================
  */
 #include "center.h"
+#include "center_topic_node.h"
 #include "node_center.h"
 #include <chrono>
 
@@ -185,7 +186,10 @@
 		auto &info = kv.second;
 		sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
 	}
+
+	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
 }
+BHCenter::~BHCenter() { Stop(); }
 
 bool BHCenter::Start()
 {
@@ -193,12 +197,13 @@
 		auto &info = kv.second;
 		sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
 	}
-
+	topic_node_->Start();
 	return true;
 }
 
 bool BHCenter::Stop()
 {
+	topic_node_->Stop();
 	for (auto &kv : sockets_) {
 		kv.second->Stop();
 	}
diff --git a/box/center.h b/box/center.h
index ad0ac4f..6610277 100644
--- a/box/center.h
+++ b/box/center.h
@@ -22,6 +22,7 @@
 #include <functional>
 #include <map>
 #include <memory>
+class CenterTopicNode;
 
 class BHCenter
 {
@@ -34,7 +35,7 @@
 	static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
-	~BHCenter() { Stop(); }
+	~BHCenter();
 	bool Start();
 	bool Stop();
 
@@ -51,6 +52,7 @@
 	static CenterRecords &Centers();
 
 	std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
+	std::unique_ptr<CenterTopicNode> topic_node_;
 };
 
 #endif // end of include guard: CENTER_TM9OUQTG
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
new file mode 100644
index 0000000..82b38ca
--- /dev/null
+++ b/box/center_topic_node.cpp
@@ -0,0 +1,117 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  center_topic_node.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�20鏃� 12鏃�44鍒�31绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "center_topic_node.h"
+#include "node_center.h"
+#include "topic_node.h"
+
+#include "json.h"
+#include <chrono>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+using namespace bhome_shm;
+using namespace ssjson;
+
+namespace
+{
+const std::string &kTopicQueryProc = "@center_query_procs";
+
+std::string ToJson(const MsgQueryProcReply &qpr)
+{
+	Json json;
+	json.put("procCount", qpr.proc_list_size());
+	auto &list = json.put("procList", Json::Array());
+	// Json list = Json::Array();
+	for (auto &info : qpr.proc_list()) {
+		Json proc;
+		proc.put("id", info.proc().proc_id());
+		proc.put("name", info.proc().name());
+		proc.put("publicInfo", info.proc().public_info());
+		proc.put("online", info.online());
+		Json topics = Json::Array();
+		for (auto &t : info.topics().topic_list()) {
+			topics.push_back(t);
+		}
+		proc.put("topics", topics);
+		list.push_back(proc);
+	}
+	return json.dump(0);
+}
+
+} // namespace
+
+CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
+    pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {}
+
+CenterTopicNode::~CenterTopicNode() { Stop(); }
+
+void CenterTopicNode::Stop()
+{
+	bool cur = true;
+	if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) {
+		worker_.join();
+		pnode_->Stop();
+	}
+}
+
+bool CenterTopicNode::Start()
+{
+	Stop();
+
+	int timeout = 3000;
+	MsgCommonReply reply;
+
+	ProcInfo info;
+	info.set_proc_id("#center.node");
+	info.set_name("center node");
+	if (!pnode_->Register(info, reply, timeout)) {
+		throw std::runtime_error("center node register failed.");
+	}
+
+	MsgTopicList topics;
+	topics.add_topic_list(kTopicQueryProc);
+	if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) {
+		throw std::runtime_error("center node register topics failed.");
+	}
+
+	auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
+		auto reply = MakeReply<MsgRequestTopicReply>(eSuccess);
+		if (request.topic() == kTopicQueryProc) {
+			auto data = (*pscenter_)->QueryProc(request.data());
+			*reply.mutable_errmsg() = data.errmsg();
+			reply.set_data(ToJson(data));
+		} else {
+			SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic());
+		}
+		pnode_->ServerSendReply(src_info, reply);
+	};
+
+	bool cur = false;
+	if (run_.compare_exchange_strong(cur, true)) {
+		auto heartbeat = [this]() {
+			while (run_) {
+				pnode_->Heartbeat(1000);
+				std::this_thread::sleep_for(1s);
+			}
+		};
+		std::thread(heartbeat).swap(worker_);
+		return pnode_->ServerStart(onRequest);
+	} else {
+		return false;
+	}
+}
diff --git a/box/center_topic_node.h b/box/center_topic_node.h
new file mode 100644
index 0000000..a29860e
--- /dev/null
+++ b/box/center_topic_node.h
@@ -0,0 +1,46 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  center_topic_node.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�20鏃� 12鏃�44鍒�42绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef CENTER_TOPIC_NODE_YCI0P9KC
+#define CENTER_TOPIC_NODE_YCI0P9KC
+
+#include "bh_util.h"
+#include "shm.h"
+#include <atomic>
+#include <memory>
+#include <thread>
+
+class TopicNode;
+class NodeCenter;
+// center as a topic node.
+class CenterTopicNode
+{
+public:
+	typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+	CenterTopicNode(CenterPtr center, bhome_shm::SharedMemory &shm);
+	~CenterTopicNode();
+	bool Start();
+	void Stop();
+
+private:
+	CenterPtr pscenter_;
+	std::unique_ptr<TopicNode> pnode_;
+	std::thread worker_;
+	std::atomic<bool> run_;
+};
+
+#endif // end of include guard: CENTER_TOPIC_NODE_YCI0P9KC
diff --git a/box/json.cpp b/box/json.cpp
new file mode 100755
index 0000000..0ad2e49
--- /dev/null
+++ b/box/json.cpp
@@ -0,0 +1,380 @@
+#include "json.h"
+
+namespace ssjson {
+
+namespace _impl {
+
+	typedef std::string Buffer;
+
+	template <class Buf, class ...T>
+	inline void DumpNumber(Buf &res, const char *fmt, T&&...val) {
+		char buf[64] = {0};
+		int n = snprintf(buf, sizeof(buf)-1, fmt, val...);
+		res.append(buf, n);
+	}
+	inline void DumpFloat(Buffer &res, const long double fval) { DumpNumber(res, "%.*Lg", std::numeric_limits<long double>::digits10, fval); }
+	inline void DumpInt(Buffer &res, const long long ival)  { DumpNumber(res, "%lld", ival); }
+//	inline void DumpFloat(Buffer &res, const double fval) { DumpNumber(res, "%.*g", std::numeric_limits<double>::digits10, fval); }
+//	inline void DumpInt(Buffer &res, const long ival)  { DumpNumber(res, "%ld", ival); }
+
+	// assume utf8 string.
+	void DumpString(Buffer &res, const std::string &s, const bool escape_unicode) {
+		res += '\"';
+
+		auto EscapeSpecialChar = [&](const int c) { // ( \b, \f, \t, \n, \r, \", \\, / ).
+			const char esc_table[] = {
+				 0 , 0,  0 , 0 ,  0 , 0,  0,  0 , 'b', 't', 
+				'n', 0, 'f','r',  0 , 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,  0 , 0 ,'\"', 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,  0 , 0 ,  0 , 0,  0, '/',  0 ,  0 , 
+				 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 , 
+				 0 , 0,'\\', };
+			res += '\\';
+			res += esc_table[c];
+		};
+		auto AddUnicode = [&](unsigned u) {
+			auto HexChar = [](unsigned char uc) { return "0123456789abcdef"[uc & 0xF]; };
+			char buf[6] = { '\\', 'u', HexChar(u>>12), HexChar(u>>8), HexChar(u>>4), HexChar(u) };
+			res.append(buf, buf+6);
+		};
+		auto IsControl = [](const unsigned char c) { return c < 0x20 || c == 0x7F; };
+
+		if (escape_unicode) {
+			for (unsigned i = 0; i < s.size(); ++i) {
+				switch(s[i]) {
+					case '\b': case '\f': case '\t': case '\n': case '\r':
+					case '\"': case '\\': case '/' :
+						EscapeSpecialChar(s[i]);
+						break;
+					default :
+						{
+							const unsigned char &uc = (unsigned char &)s[i];
+							const unsigned char *sz = &uc;
+							if (IsControl(uc)) { // control characters
+								AddUnicode(uc);
+							} else if (uc < 0x80) {
+								res += s[i];
+							} else if (uc < 0xE0) { // 2 bytes
+								AddUnicode(((sz[0]&0x1F)<<6) | (sz[1] & 0x3F));
+								i += 1;
+							} else if (uc < 0xF0) { // 3 bytes
+								AddUnicode(((sz[0]&0x0F)<<12) | ((sz[1]&0x3F)<<6) | (sz[2] & 0x3F));
+								i += 2;
+							} else {
+								res += s[i]; // exceed 0xFFFF.
+							}
+						} break;
+				}
+			}
+		} else {
+			for (unsigned i = 0; i < s.size(); ++i) {
+				switch(s[i]) {
+					case '\b': case '\f': case '\t': case '\n': case '\r':
+					case '\"': case '\\': case '/' :
+						EscapeSpecialChar(s[i]);
+						break;
+					default :
+						if (IsControl(s[i])) { // control characters
+							AddUnicode(s[i]);
+						} else {
+							res += s[i];
+						}
+						break;
+				}
+			}
+		}
+		res += '\"';
+	};
+
+	void Dump(const JValue &jv, Buffer &res, JValue::DumpOptions &opt, const bool pretty)
+	{
+		auto NewLine = [&]() {
+			res += '\n';
+		   	res.append(opt.indent_level_ * opt.indent_step_, ' ');
+		};
+
+		switch (jv.type()) {
+			case JValue::jv_object: 
+				{
+					res += '{';
+					auto &obj = jv.object();
+					if (!obj.empty()) {
+						if (pretty) {
+							opt.indent_level_++;
+							for (auto &&kv : obj) {
+								NewLine();
+								DumpString(res, kv.first, opt.escape_unicode_);
+								res.append(": ", 2);
+								Dump(kv.second, res, opt, pretty);
+								res += ',';
+							}
+							res.pop_back(); // remove last ',';
+							opt.indent_level_--;
+							NewLine();
+						} else {
+							for (auto &&kv : obj) {
+								DumpString(res, kv.first, opt.escape_unicode_);
+								res += ':';
+								Dump(kv.second, res, opt, pretty);
+								res += ',';
+							}
+							res.pop_back(); // remove last ',';
+						}
+					}
+					res += '}';
+				} break;
+			case JValue::jv_array:
+				{
+					res += '[';
+					auto &arr = jv.array();
+					if (!arr.empty()) {
+						if (pretty) {
+							opt.indent_level_++;
+							for (auto &&v : arr) {
+								NewLine();
+								Dump(v, res, opt, pretty);
+								res += ',';
+							}
+							res.pop_back(); // remove last ',';
+							opt.indent_level_--;
+							NewLine();
+						} else {
+							for (auto &&v : arr) {
+								Dump(v, res, opt, pretty);
+								res += ',';
+							}
+							res.pop_back(); // remove last ',';
+						}
+					}
+					res += ']';
+				} break;
+			case JValue::jv_float:
+				{
+					auto idx = res.size();
+					DumpFloat(res, jv.get_value<JValue::float_type>());
+					while (idx < res.size() && res[idx] != '.') {
+						++idx;
+					}
+					if (idx == res.size()) {
+						res.append(".0", 2);
+					}
+				} break;
+			case JValue::jv_null: res.append("null", 4); break;
+			case JValue::jv_int: DumpInt(res, jv.get_value<JValue::int_type>()); break;
+			case JValue::jv_bool: 
+						 if (jv.get_value<bool>()) {
+							 res.append("true", 4);
+						 } else {
+							 res.append("false", 5);
+						 }
+						 break;
+			case JValue::jv_string: DumpString(res, jv.get_value<std::string>(), opt.escape_unicode_); break;
+			default: std::invalid_argument("invalid json type:" + std::to_string(jv.type())); break;
+		}
+	}
+
+	auto HexCharVal = [](const unsigned char c) {
+		static const unsigned char hex_char_table[] = {
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0,    1,    2,    3,    4,    5,    6,    7,    8,    9, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF,   10,   11,   12,   13,   14,   15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF,   10,   11,   12,   13,   14,   15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+		};
+		return hex_char_table[c];
+	};
+} // namespace _impl
+
+	int JValue::parse(const char *begin, const char *end)
+	{
+		using namespace _impl;
+		const char *p = begin;
+
+		auto Error = [&](const std::string &msg = "invalid value at"){ throw std::invalid_argument(msg + " : "+ std::string(p, std::min(int(end-p), 30))); };
+		auto IsSpace = [](const char c) {
+			return c == ' ' || c == '\t' || c == '\r' || c == '\n';
+		};
+		auto IgnoreSpaces = [&]() { while (p != end && IsSpace(*p)) { ++p; } };
+		auto Peek = [&]() { IgnoreSpaces(); if (p == end) { Error("input ends unexpectedly!"); } return *p; };
+		auto Expect = [&](const char c) { if (Peek() != c) { Error(std::string("parse error, expecting ") + c); } };
+
+		auto parse_string = [&]() {
+			std::string res;
+			auto to_utf8 = [&]() {
+				auto Hex4 = [](const char *sz) {
+					auto FromHex = [](const unsigned char c)->unsigned char {
+						const unsigned char v = HexCharVal(c);
+						if (v == 0xFF) {
+							throw std::invalid_argument(std::string("invalid unicode input ") + char(c));
+						}
+						return v;
+					};
+					return (FromHex(sz[0]) << 12) | (FromHex(sz[1]) << 8) | (FromHex(sz[2]) << 4) | (FromHex(sz[3]));
+				};
+				const unsigned int u = Hex4(p+2);
+				p += 4;
+				if (u < 0x80) {
+					res += char(u);
+				} else if (u < 0x800) {
+					res += char((u >> 6) | 0xC0);
+					res += char((u & 0x3F) | 0x80);
+				} else if (u < 0x10000) {
+					res += char((u >> 12) | 0xE0);
+					res += char(((u >> 6)& 0x3F) | 0x80);
+					res += char((u & 0x3F) | 0x80);
+				} else if (u < 0x110000) {
+					res += char((u >> 18) | 0xF0);
+					res += char(((u >> 12)& 0x3F) | 0x80);
+					res += char(((u >> 6)& 0x3F) | 0x80);
+					res += char((u & 0x3F) | 0x80);
+				} else {
+					Error("invalid unicode codepoint");
+				}
+			};
+			auto ParseEscapedChar = [&](const int c) {
+				const char unesc_table[] = {
+					0 , '\b',   0 ,   0 ,   0 , '\f',   0 , 
+					0 ,   0 ,   0 ,   0 ,   0 ,   0 , '\n', 
+					0 ,   0 ,   0 , '\r',   0 , '\t', };
+				res += unesc_table[c-'a'];
+			};
+			unsigned left = 0;
+			auto AddLeft = [&] { if (left != 0) { res.append(p-left, left); left = 0; } };
+			while (++p != end) {
+				switch(*p) {
+				case '\"':
+					AddLeft();
+					++p;
+					return res;
+				case '\\':
+					AddLeft();
+					if (p+1 != end) {
+						switch (p[1]) {
+						case 'b' : case 'f' : case 'n' : case 'r' : case 't' : ParseEscapedChar(p[1]); break;
+						case 'u' : to_utf8(); break;
+						case '\"': case '\\': case '/' : ++left; break;
+						default: left += 2; break;
+						}
+						++p; // parsed 2 char.
+					} // else, next round will fail.
+					break;
+				// and escaped char should not appear.
+				case '\t': case '\n': case '\r': case '\b': case '\f': //case '/' :
+					AddLeft();
+					Error("invalid string content") ;
+					break;
+				default: ++left; break;
+				}
+			}
+			Error("string ends unexpectedly!");
+			return res;
+		};
+
+		auto parse_object = [&]() {
+			assert(*p == '{');
+			++p;
+			if (Peek() == '}') {
+				++p;
+				return object_type();
+			}
+			object_type obj;
+			while (true) {
+				Expect('\"');
+				std::string key(parse_string());
+
+				Expect(':');
+				++p;
+
+				JValue v;
+				p += v.parse(p, end);
+				obj.emplace(std::move(key), std::move(v));
+
+				switch (Peek()) {
+					case ',': ++p; break;
+					case '}': ++p; return obj;
+					default: Error("parse object error");
+				}
+			} 
+		};
+		auto parse_array = [&]() {
+			assert(*p == '[');
+			++p;
+			if (Peek() == ']') {
+				++p;
+				return array_type();
+			}
+			array_type arr;
+			while (true) {
+				JValue v;
+				p += v.parse(p, end);
+				arr.emplace_back(std::move(v));
+
+				switch (Peek()) {
+					case ',': ++p; break;
+					case ']': ++p; return arr;
+					default: Error("parse array error");
+				}
+			}
+		};
+
+		auto ExpectText = [&](const char *sz, const size_t len) {
+			if(memcmp(p+1, sz+1, len-1) == 0) {
+				p += len;
+			} else {
+				Error("parse literal error, expecting '" + std::string(sz, len) + "', read " + std::string(p, len).c_str());
+			}
+		};
+
+		switch (Peek()) {
+		case '{': *this = parse_object(); break;
+		case '[': *this = parse_array(); break;
+		case '\"': *this = parse_string(); break;
+		case 't': ExpectText("true", 4); *this = true; break;
+		case 'f': ExpectText("false", 5); *this = false; break;
+		case 'n': ExpectText("null", 4); *this = nullptr; break;
+		case '0': case '1': case '2': case '3': case '4':
+		case '5': case '6': case '7': case '8': case '9': case '-':
+				  {
+					  auto IsNumber = [](const char c) { return '0' <= c && c <= '9'; };
+					  auto sep = p+1;
+					  while (sep != end && IsNumber(*sep)) { ++sep; }
+					  char *parse_end = (char*)p;
+					  switch (*sep) {
+						  case '.' : case 'e' : case 'E': *this = strtold((char*)p, &parse_end); break;
+						  default: *this = strtoll((char*)p, &parse_end, 10); break;
+					  }
+					  p = parse_end;
+				  } break;
+		default : Error(); break;
+		}
+		return p - begin;
+	}
+
+	std::string JValue::dump(const DumpOptions &opt_in) const {
+		using namespace _impl;
+		Buffer res;
+		DumpOptions opt(opt_in);
+		if(opt.indent_step_ > 0) {
+		   	res.append(opt.indent_level_ * opt.indent_step_, ' ');
+		}
+		Dump(*this, res, opt, opt.indent_step_ > 0);
+		return res;
+	}
+} // jsonpp
+
diff --git a/box/json.h b/box/json.h
new file mode 100755
index 0000000..461f0e9
--- /dev/null
+++ b/box/json.h
@@ -0,0 +1,277 @@
+#ifndef JSON_B360EKF1
+#define JSON_B360EKF1
+
+#include <algorithm>
+#include <boost/variant.hpp>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace ssjson
+{
+
+class JValue
+{
+public:
+	typedef std::string key_type;
+	typedef long long int_type;
+	typedef long double float_type;
+	typedef JValue value_type;
+	typedef std::map<key_type, value_type> object_type;
+	typedef object_type::value_type pair_type;
+	typedef std::vector<value_type> array_type;
+
+	enum JVType { jv_null,
+		          jv_int,
+		          jv_float,
+		          jv_bool,
+		          jv_string,
+		          jv_object,
+		          jv_array };
+	std::string name() const
+	{
+		static const char *names[] = {"jv_null", "jv_int", "jv_float", "jv_bool", "jv_string", "jv_object", "jv_array"};
+		return names[type()];
+	}
+
+private:
+	typedef boost::variant<std::nullptr_t, int_type, float_type, bool, std::string, object_type, array_type> data_type;
+
+	template <class C>
+	const C &val() const { return ref_val<C>(); }
+	template <class C>
+	C &val() { return const_cast<C &>(ref_val<C>()); }
+
+	template <class C>
+	class TInitValue
+	{
+		typedef C value_t;
+		typedef std::vector<TInitValue> list_t;
+		mutable boost::variant<value_t, list_t> data_;
+
+	public:
+		template <class T>
+		TInitValue(const T &t) :
+		    data_(value_t(t)) {}
+		template <class T>
+		TInitValue(T &&t) :
+		    data_(value_t(std::move(t))) {}
+		TInitValue(std::initializer_list<TInitValue> l) :
+		    data_(list_t(l)){};
+		bool is_value() const { return data_.which() == 0; }
+		bool is_list() const { return data_.which() == 1; }
+		const list_t &list() const { return boost::get<list_t>(data_); }
+		const value_t &value() const { return boost::get<value_t>(data_); }
+	};
+	typedef TInitValue<JValue> InitValue;
+	typedef std::initializer_list<InitValue> InitList;
+
+	template <class Iter>
+	static object_type MakeObject(Iter begin, Iter end)
+	{
+		object_type obj;
+		for (auto it = begin; it != end; ++it) {
+			obj.emplace(std::move(it->list()[0].value().template val<key_type>()), (it->list()[1]));
+		}
+		return obj;
+	}
+	template <class Iter>
+	static array_type MakeArray(Iter begin, Iter end) { return array_type(begin, end); }
+	template <class Iter>
+	static JValue FromInit(Iter begin, Iter end)
+	{
+		auto like_object_item = [](const InitValue &v) {
+			return v.is_list() &&
+			       v.list().size() == 2 &&
+			       v.list()[0].is_value() &&
+			       v.list()[0].value().type() == jv_string;
+		};
+		if (std::all_of(begin, end, like_object_item)) {
+			return MakeObject(begin, end);
+		} else {
+			return MakeArray(begin, end);
+		}
+	}
+	static JValue FromInit(const InitValue &ji)
+	{
+		if (ji.is_value()) {
+			return std::move(ji.value());
+		} else {
+			return FromInit(ji.list().begin(), ji.list().end());
+		}
+	}
+
+public:
+	struct DumpOptions {
+		int indent_step_ = 0;
+		int indent_level_ = 0;
+		bool escape_unicode_ = false;
+	};
+	JValue() :
+	    data_(object_type()) { static_assert(sizeof(JValue) == sizeof(data_type), "JValue should simple wrap data_type with no other fields."); }
+#define SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \
+	JValue(decl) : data_(expr) {}                      \
+	JValue &operator=(decl)                            \
+	{                                                  \
+		data_ = (expr);                                \
+		return *this;                                  \
+	}
+	// operator=() seems faster than JValue(exer).swap(data_).
+	// copy
+	SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(const JValue &v, v.data_) // self;
+#define MAP_TYPE_TO_JSON_TYPE(decl, expr)          \
+public:                                            \
+	SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \
+private:                                           \
+	static auto JVTypeAccept(decl)->std::remove_reference<decltype(expr)>::type
+#define MAP_TYPE_TO_JSON_TYPE_BIDIR(decl, expr) \
+	MAP_TYPE_TO_JSON_TYPE(decl, expr);          \
+                                                \
+private:                                        \
+	static auto JVTypeReturn(decl)->std::remove_reference<decltype(expr)>::type
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::nullptr_t, nullptr);
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const short v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned short v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const int v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned int v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const long v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const long long v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long long v, int_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const float v, float_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const double v, float_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const long double v, float_type(v));
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const bool v, v);
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::string &v, v);
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const object_type &v, v);
+	MAP_TYPE_TO_JSON_TYPE_BIDIR(const array_type &v, v);
+	MAP_TYPE_TO_JSON_TYPE(const char *v, std::string(v));
+#undef MAP_TYPE_TO_JSON_TYPE_BIDIR
+#undef MAP_TYPE_TO_JSON_TYPE
+	template <class T>
+	static void JVTypeAccept(T); // use void to disable other types.
+	template <class T>
+	static void JVTypeReturn(T); // use void to disable other types.
+public:
+	// move
+	SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(JValue &&v, std::move(v.data_)) // self
+	SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(std::string &&v, std::move(v))  // and other classes
+	SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(object_type &&v, std::move(v))
+	SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(array_type &&v, std::move(v))
+#undef SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT
+
+	static JValue Object()
+	{
+		return (object_type());
+	}
+	static JValue Object(InitList l) { return (MakeObject(l.begin(), l.end())); }
+	static JValue Array() { return (array_type()); }
+	static JValue Array(InitList l) { return (MakeArray(l.begin(), l.end())); }
+	JValue(InitList l) :
+	    JValue(FromInit(l.begin(), l.end())) {}
+	JValue(const InitValue &ji) :
+	    JValue(FromInit(ji)) {}
+	JVType type() const { return static_cast<JVType>(data_.which()); }
+
+	std::string dump(const DumpOptions &opt_in) const;
+	std::string dump(const int indent_step = 0, const int indent_level = 0, const bool escape_unicode = false) const
+	{
+		DumpOptions opt;
+		opt.indent_step_ = indent_step;
+		opt.indent_level_ = indent_level;
+		opt.escape_unicode_ = escape_unicode;
+		return dump(opt);
+	}
+	bool parse(const std::string &s) { return parse(s.c_str(), s.c_str() + s.size()) > 0; }
+	int parse(const char *begin, const char *end);
+	void swap(JValue &a) { data_.swap(a.data_); }
+
+	template <class T>
+	T get_value() const { return val<decltype(JVTypeReturn(T()))>(); }
+	template <class T>
+	void put_value(T &&v) { *this = v; }
+	const object_type &object() const { return val<object_type>(); }
+	object_type &object() { return val<object_type>(); }
+	const array_type &array() const { return val<array_type>(); }
+	array_type &array() { return val<array_type>(); }
+	bool is_object() const { return type() == jv_object; }
+	bool is_array() const { return type() == jv_array; }
+
+	// array ops
+	const value_type &operator[](const size_t idx) const { return array()[idx]; }
+	value_type &operator[](const size_t idx) { return array()[idx]; }
+	const value_type &at(const size_t idx) const { return array().at(idx); }
+	value_type &at(const size_t idx) { return array().at(idx); }
+	void push_back(const value_type &v) { array().push_back(v); }
+	void push_back(value_type &&v) { array().push_back(std::move(v)); }
+
+	// object ops
+	value_type &operator[](const key_type &key) { return object()[key]; }
+	const value_type &at(const key_type &key) const { return object().at(key); }
+	value_type &at(const key_type &key) { return object().at(key); }
+	const value_type &child(const key_type &path) const { return ref_child(path); }
+	value_type &child(const key_type &path) { return const_cast<value_type &>(ref_child(path)); }
+	template <class T>
+	T get(const key_type &path) const { return child(path).val<decltype(JVTypeReturn(T()))>(); }
+	template <class T>
+	auto get(const key_type &path, const T &def) const -> typename std::remove_reference<decltype(JVTypeAccept(def))>::type
+	{
+		try {
+			return get<decltype(JVTypeAccept(def))>(path);
+		} catch (...) {
+			return def;
+		}
+	}
+	value_type &put(pair_type &&elem)
+	{
+		const key_type &key = elem.first;
+		auto sep = key.find('.');
+		if (sep == key_type::npos) {
+			auto r = object().lower_bound(elem.first);
+			if (r != object().end() && r->first == elem.first) {
+				r->second = std::move(elem.second);
+				return r->second;
+			} else {
+				return object().emplace_hint(r, std::move(elem))->second;
+			}
+		} else {
+			return (*this)[key.substr(0, sep)].put(pair_type(key.substr(sep + 1), std::move(elem.second)));
+		}
+	}
+	value_type &put(const pair_type &elem)
+	{
+		pair_type tmp(elem);
+		return put(std::move(tmp));
+	}
+	value_type &put(const key_type &k, const value_type &v) { return put(pair_type(k, v)); }
+	value_type &put(const key_type &k, value_type &&v) { return put(pair_type(k, std::move(v))); }
+	value_type &put(key_type &&k, const value_type &v) { return put(pair_type(std::move(k), v)); }
+	value_type &put(key_type &&k, value_type &&v) { return put(pair_type(std::move(k), std::move(v))); }
+
+private:
+	template <class C>
+	const C &ref_val() const
+	{
+		try {
+			return boost::get<C>(data_);
+		} catch (std::exception &) {
+			throw std::runtime_error("json get error, " + name() + " is not a " + JValue(C()).name());
+		}
+	}
+	const value_type &ref_child(const key_type &path) const
+	{
+		auto sep = path.find('.');
+		if (sep == key_type::npos) {
+			return at(path);
+		} else {
+			return at(path.substr(0, sep)).ref_child(path.substr(sep + 1));
+		}
+	}
+	data_type data_;
+};
+
+typedef JValue Json;
+
+} // namespace ssjson
+
+#endif // end of include guard: JSON_B360EKF1
diff --git a/box/node_center.cpp b/box/node_center.cpp
index b970d44..cefb138 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -350,44 +350,48 @@
 		return MakeReply(eSuccess);
 	});
 }
-MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+
+MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id)
 {
 	typedef MsgQueryProcReply Reply;
-	auto query = [&](Node self) -> Reply {
-		auto Add1 = [](Reply &reply, Node node) {
-			auto info = reply.add_proc_list();
-			*info->mutable_proc() = node->proc_;
-			info->set_online(node->state_.flag_ == kStateNormal);
-			for (auto &addr_topics : node->services_) {
-				for (auto &topic : addr_topics.second) {
-					info->mutable_topics()->add_topic_list(topic);
-				}
+	auto Add1 = [](Reply &reply, Node node) {
+		auto info = reply.add_proc_list();
+		*info->mutable_proc() = node->proc_;
+		info->mutable_proc()->clear_private_info();
+		info->set_online(node->state_.flag_ == kStateNormal);
+		for (auto &addr_topics : node->services_) {
+			for (auto &topic : addr_topics.second) {
+				info->mutable_topics()->add_topic_list(topic);
 			}
-		};
-
-		if (!req.proc_id().empty()) {
-			auto pos = online_node_addr_map_.find(req.proc_id());
-			if (pos == online_node_addr_map_.end()) {
-				return MakeReply<Reply>(eNotFound, "proc not found.");
-			} else {
-				auto node_pos = nodes_.find(pos->second);
-				if (node_pos == nodes_.end()) {
-					return MakeReply<Reply>(eNotFound, "proc node not found.");
-				} else {
-					auto reply = MakeReply<Reply>(eSuccess);
-					Add1(reply, node_pos->second);
-					return reply;
-				}
-			}
-		} else {
-			Reply reply(MakeReply<Reply>(eSuccess));
-			for (auto &kv : nodes_) {
-				Add1(reply, kv.second);
-			}
-			return reply;
 		}
 	};
 
+	if (!proc_id.empty()) {
+		auto pos = online_node_addr_map_.find(proc_id);
+		if (pos == online_node_addr_map_.end()) {
+			return MakeReply<Reply>(eNotFound, "proc not found.");
+		} else {
+			auto node_pos = nodes_.find(pos->second);
+			if (node_pos == nodes_.end()) {
+				return MakeReply<Reply>(eNotFound, "proc node not found.");
+			} else {
+				auto reply = MakeReply<Reply>(eSuccess);
+				Add1(reply, node_pos->second);
+				return reply;
+			}
+		}
+	} else {
+		Reply reply(MakeReply<Reply>(eSuccess));
+		for (auto &kv : nodes_) {
+			Add1(reply, kv.second);
+		}
+		return reply;
+	}
+}
+MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
+{
+	typedef MsgQueryProcReply Reply;
+	auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); };
 	return HandleMsg<Reply>(head, query);
 }
 
diff --git a/box/node_center.h b/box/node_center.h
index b9a01b3..4663bee 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -159,6 +159,7 @@
 	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
 	MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
 	MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
+	MsgQueryProcReply QueryProc(const std::string &proc_id);
 	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
 	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
 	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 33adf91..e597533 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -169,6 +169,20 @@
 		// printf("register topic : %s\n", r ? "ok" : "failed");
 		// Sleep(1s);
 	}
+	auto PrintProcs = [](MsgQueryProcReply const &result) {
+		printf("query proc result: %d\n", result.proc_list().size());
+		for (int i = 0; i < result.proc_list().size(); ++i) {
+			auto &info = result.proc_list(i);
+			printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
+			       (info.online() ? "online" : "offline"),
+			       info.proc().proc_id().c_str(), info.proc().name().c_str());
+			for (auto &t : info.topics().topic_list()) {
+				printf("\t\t %s\n", t.c_str());
+			}
+			printf("\n");
+		}
+		printf("\n");
+	};
 	{
 		// query procs
 		std::string dest(BHAddress().SerializeAsString());
@@ -180,22 +194,45 @@
 		DEFER1(BHFree(reply, reply_len));
 		MsgQueryProcReply result;
 		if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) {
-			printf("query proc result: %d\n", result.proc_list().size());
-			for (int i = 0; i < result.proc_list().size(); ++i) {
-				auto &info = result.proc_list(i);
-				printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
-				       (info.online() ? "online" : "offline"),
-				       info.proc().proc_id().c_str(), info.proc().name().c_str());
-				for (auto &t : info.topics().topic_list()) {
-					printf("\t\t %s", t.c_str());
-				}
-			}
+			PrintProcs(result);
 		} else {
 			printf("query proc error\n");
 		}
 		// printf("register topic : %s\n", r ? "ok" : "failed");
 		// Sleep(1s);
 	}
+	{
+		// query procs with normal topic request
+		MsgRequestTopic req;
+		req.set_topic("@center_query_procs");
+		std::string s(req.SerializeAsString());
+		// Sleep(10ms, false);
+		std::string dest(BHAddress().SerializeAsString());
+		void *proc_id = 0;
+		int proc_id_len = 0;
+		DEFER1(BHFree(proc_id, proc_id_len););
+		void *reply = 0;
+		int reply_len = 0;
+		DEFER1(BHFree(reply, reply_len));
+		bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100);
+		if (!r) {
+			int ec = 0;
+			std::string msg;
+			GetLastError(ec, msg);
+			printf("topic query proc error: %s\n", msg.c_str());
+		} else {
+			MsgRequestTopicReply ret;
+			ret.ParseFromArray(reply, reply_len);
+			printf("topic query proc : %s\n", ret.data().c_str());
+			// MsgQueryProcReply result;
+			// if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) {
+			// 	PrintProcs(result);
+			// } else {
+			// 	printf("topic query proc error\n");
+			// }
+		}
+	}
+	// return;
 
 	{ // Subscribe
 		MsgTopicList topics;
@@ -335,9 +372,9 @@
 	threads.Launch(hb, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
-	const int64_t nreq = 1000 * 100;
+	const int64_t nreq = 1; //000 * 100;
 
-	for (int i = 0; i < 100; ++i) {
+	for (int i = 0; i < 10; ++i) {
 		SyncRequest(i);
 	}
 

--
Gitblit v1.8.0