| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "center.h" |
| | | #include "center_topic_node.h" |
| | | #include "node_center.h" |
| | | #include <chrono> |
| | | |
| | |
| | | auto &info = kv.second; |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); |
| | | } |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | } |
| | | BHCenter::~BHCenter() { Stop(); } |
| | | |
| | | bool BHCenter::Start() |
| | | { |
| | |
| | | auto &info = kv.second; |
| | | sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); |
| | | } |
| | | |
| | | topic_node_->Start(); |
| | | return true; |
| | | } |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | topic_node_->Stop(); |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |
| | | } |
| | |
| | | #include <functional> |
| | | #include <map> |
| | | #include <memory> |
| | | class CenterTopicNode; |
| | | |
| | | class BHCenter |
| | | { |
| | |
| | | static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | ~BHCenter() { Stop(); } |
| | | ~BHCenter(); |
| | | bool Start(); |
| | | bool Stop(); |
| | | |
| | |
| | | static CenterRecords &Centers(); |
| | | |
| | | std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; |
| | | std::unique_ptr<CenterTopicNode> topic_node_; |
| | | }; |
| | | |
| | | #endif // end of include guard: CENTER_TM9OUQTG |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: center_topic_node.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年05月20日 12时44分31秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "center_topic_node.h" |
| | | #include "node_center.h" |
| | | #include "topic_node.h" |
| | | |
| | | #include "json.h" |
| | | #include <chrono> |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | using namespace bhome_shm; |
| | | using namespace ssjson; |
| | | |
| | | namespace |
| | | { |
| | | const std::string &kTopicQueryProc = "@center_query_procs"; |
| | | |
| | | std::string ToJson(const MsgQueryProcReply &qpr) |
| | | { |
| | | Json json; |
| | | json.put("procCount", qpr.proc_list_size()); |
| | | auto &list = json.put("procList", Json::Array()); |
| | | // Json list = Json::Array(); |
| | | for (auto &info : qpr.proc_list()) { |
| | | Json proc; |
| | | proc.put("id", info.proc().proc_id()); |
| | | proc.put("name", info.proc().name()); |
| | | proc.put("publicInfo", info.proc().public_info()); |
| | | proc.put("online", info.online()); |
| | | Json topics = Json::Array(); |
| | | for (auto &t : info.topics().topic_list()) { |
| | | topics.push_back(t); |
| | | } |
| | | proc.put("topics", topics); |
| | | list.push_back(proc); |
| | | } |
| | | return json.dump(0); |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) : |
| | | pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {} |
| | | |
| | | CenterTopicNode::~CenterTopicNode() { Stop(); } |
| | | |
| | | void CenterTopicNode::Stop() |
| | | { |
| | | bool cur = true; |
| | | if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) { |
| | | worker_.join(); |
| | | pnode_->Stop(); |
| | | } |
| | | } |
| | | |
| | | bool CenterTopicNode::Start() |
| | | { |
| | | Stop(); |
| | | |
| | | int timeout = 3000; |
| | | MsgCommonReply reply; |
| | | |
| | | ProcInfo info; |
| | | info.set_proc_id("#center.node"); |
| | | info.set_name("center node"); |
| | | if (!pnode_->Register(info, reply, timeout)) { |
| | | throw std::runtime_error("center node register failed."); |
| | | } |
| | | |
| | | MsgTopicList topics; |
| | | topics.add_topic_list(kTopicQueryProc); |
| | | if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) { |
| | | throw std::runtime_error("center node register topics failed."); |
| | | } |
| | | |
| | | auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) { |
| | | auto reply = MakeReply<MsgRequestTopicReply>(eSuccess); |
| | | if (request.topic() == kTopicQueryProc) { |
| | | auto data = (*pscenter_)->QueryProc(request.data()); |
| | | *reply.mutable_errmsg() = data.errmsg(); |
| | | reply.set_data(ToJson(data)); |
| | | } else { |
| | | SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic()); |
| | | } |
| | | pnode_->ServerSendReply(src_info, reply); |
| | | }; |
| | | |
| | | bool cur = false; |
| | | if (run_.compare_exchange_strong(cur, true)) { |
| | | auto heartbeat = [this]() { |
| | | while (run_) { |
| | | pnode_->Heartbeat(1000); |
| | | std::this_thread::sleep_for(1s); |
| | | } |
| | | }; |
| | | std::thread(heartbeat).swap(worker_); |
| | | return pnode_->ServerStart(onRequest); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: center_topic_node.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年05月20日 12时44分42秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef CENTER_TOPIC_NODE_YCI0P9KC |
| | | #define CENTER_TOPIC_NODE_YCI0P9KC |
| | | |
| | | #include "bh_util.h" |
| | | #include "shm.h" |
| | | #include <atomic> |
| | | #include <memory> |
| | | #include <thread> |
| | | |
| | | class TopicNode; |
| | | class NodeCenter; |
| | | // center as a topic node. |
| | | class CenterTopicNode |
| | | { |
| | | public: |
| | | typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr; |
| | | CenterTopicNode(CenterPtr center, bhome_shm::SharedMemory &shm); |
| | | ~CenterTopicNode(); |
| | | bool Start(); |
| | | void Stop(); |
| | | |
| | | private: |
| | | CenterPtr pscenter_; |
| | | std::unique_ptr<TopicNode> pnode_; |
| | | std::thread worker_; |
| | | std::atomic<bool> run_; |
| | | }; |
| | | |
| | | #endif // end of include guard: CENTER_TOPIC_NODE_YCI0P9KC |
New file |
| | |
| | | #include "json.h" |
| | | |
| | | namespace ssjson { |
| | | |
| | | namespace _impl { |
| | | |
| | | typedef std::string Buffer; |
| | | |
| | | template <class Buf, class ...T> |
| | | inline void DumpNumber(Buf &res, const char *fmt, T&&...val) { |
| | | char buf[64] = {0}; |
| | | int n = snprintf(buf, sizeof(buf)-1, fmt, val...); |
| | | res.append(buf, n); |
| | | } |
| | | inline void DumpFloat(Buffer &res, const long double fval) { DumpNumber(res, "%.*Lg", std::numeric_limits<long double>::digits10, fval); } |
| | | inline void DumpInt(Buffer &res, const long long ival) { DumpNumber(res, "%lld", ival); } |
| | | // inline void DumpFloat(Buffer &res, const double fval) { DumpNumber(res, "%.*g", std::numeric_limits<double>::digits10, fval); } |
| | | // inline void DumpInt(Buffer &res, const long ival) { DumpNumber(res, "%ld", ival); } |
| | | |
| | | // assume utf8 string. |
| | | void DumpString(Buffer &res, const std::string &s, const bool escape_unicode) { |
| | | res += '\"'; |
| | | |
| | | auto EscapeSpecialChar = [&](const int c) { // ( \b, \f, \t, \n, \r, \", \\, / ). |
| | | const char esc_table[] = { |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 'b', 't', |
| | | 'n', 0, 'f','r', 0 , 0, 0, 0 , 0 , 0 , |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , |
| | | 0 , 0, 0 , 0 ,'\"', 0, 0, 0 , 0 , 0 , |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, '/', 0 , 0 , |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , |
| | | 0 , 0, 0 , 0 , 0 , 0, 0, 0 , 0 , 0 , |
| | | 0 , 0,'\\', }; |
| | | res += '\\'; |
| | | res += esc_table[c]; |
| | | }; |
| | | auto AddUnicode = [&](unsigned u) { |
| | | auto HexChar = [](unsigned char uc) { return "0123456789abcdef"[uc & 0xF]; }; |
| | | char buf[6] = { '\\', 'u', HexChar(u>>12), HexChar(u>>8), HexChar(u>>4), HexChar(u) }; |
| | | res.append(buf, buf+6); |
| | | }; |
| | | auto IsControl = [](const unsigned char c) { return c < 0x20 || c == 0x7F; }; |
| | | |
| | | if (escape_unicode) { |
| | | for (unsigned i = 0; i < s.size(); ++i) { |
| | | switch(s[i]) { |
| | | case '\b': case '\f': case '\t': case '\n': case '\r': |
| | | case '\"': case '\\': case '/' : |
| | | EscapeSpecialChar(s[i]); |
| | | break; |
| | | default : |
| | | { |
| | | const unsigned char &uc = (unsigned char &)s[i]; |
| | | const unsigned char *sz = &uc; |
| | | if (IsControl(uc)) { // control characters |
| | | AddUnicode(uc); |
| | | } else if (uc < 0x80) { |
| | | res += s[i]; |
| | | } else if (uc < 0xE0) { // 2 bytes |
| | | AddUnicode(((sz[0]&0x1F)<<6) | (sz[1] & 0x3F)); |
| | | i += 1; |
| | | } else if (uc < 0xF0) { // 3 bytes |
| | | AddUnicode(((sz[0]&0x0F)<<12) | ((sz[1]&0x3F)<<6) | (sz[2] & 0x3F)); |
| | | i += 2; |
| | | } else { |
| | | res += s[i]; // exceed 0xFFFF. |
| | | } |
| | | } break; |
| | | } |
| | | } |
| | | } else { |
| | | for (unsigned i = 0; i < s.size(); ++i) { |
| | | switch(s[i]) { |
| | | case '\b': case '\f': case '\t': case '\n': case '\r': |
| | | case '\"': case '\\': case '/' : |
| | | EscapeSpecialChar(s[i]); |
| | | break; |
| | | default : |
| | | if (IsControl(s[i])) { // control characters |
| | | AddUnicode(s[i]); |
| | | } else { |
| | | res += s[i]; |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | res += '\"'; |
| | | }; |
| | | |
| | | void Dump(const JValue &jv, Buffer &res, JValue::DumpOptions &opt, const bool pretty) |
| | | { |
| | | auto NewLine = [&]() { |
| | | res += '\n'; |
| | | res.append(opt.indent_level_ * opt.indent_step_, ' '); |
| | | }; |
| | | |
| | | switch (jv.type()) { |
| | | case JValue::jv_object: |
| | | { |
| | | res += '{'; |
| | | auto &obj = jv.object(); |
| | | if (!obj.empty()) { |
| | | if (pretty) { |
| | | opt.indent_level_++; |
| | | for (auto &&kv : obj) { |
| | | NewLine(); |
| | | DumpString(res, kv.first, opt.escape_unicode_); |
| | | res.append(": ", 2); |
| | | Dump(kv.second, res, opt, pretty); |
| | | res += ','; |
| | | } |
| | | res.pop_back(); // remove last ','; |
| | | opt.indent_level_--; |
| | | NewLine(); |
| | | } else { |
| | | for (auto &&kv : obj) { |
| | | DumpString(res, kv.first, opt.escape_unicode_); |
| | | res += ':'; |
| | | Dump(kv.second, res, opt, pretty); |
| | | res += ','; |
| | | } |
| | | res.pop_back(); // remove last ','; |
| | | } |
| | | } |
| | | res += '}'; |
| | | } break; |
| | | case JValue::jv_array: |
| | | { |
| | | res += '['; |
| | | auto &arr = jv.array(); |
| | | if (!arr.empty()) { |
| | | if (pretty) { |
| | | opt.indent_level_++; |
| | | for (auto &&v : arr) { |
| | | NewLine(); |
| | | Dump(v, res, opt, pretty); |
| | | res += ','; |
| | | } |
| | | res.pop_back(); // remove last ','; |
| | | opt.indent_level_--; |
| | | NewLine(); |
| | | } else { |
| | | for (auto &&v : arr) { |
| | | Dump(v, res, opt, pretty); |
| | | res += ','; |
| | | } |
| | | res.pop_back(); // remove last ','; |
| | | } |
| | | } |
| | | res += ']'; |
| | | } break; |
| | | case JValue::jv_float: |
| | | { |
| | | auto idx = res.size(); |
| | | DumpFloat(res, jv.get_value<JValue::float_type>()); |
| | | while (idx < res.size() && res[idx] != '.') { |
| | | ++idx; |
| | | } |
| | | if (idx == res.size()) { |
| | | res.append(".0", 2); |
| | | } |
| | | } break; |
| | | case JValue::jv_null: res.append("null", 4); break; |
| | | case JValue::jv_int: DumpInt(res, jv.get_value<JValue::int_type>()); break; |
| | | case JValue::jv_bool: |
| | | if (jv.get_value<bool>()) { |
| | | res.append("true", 4); |
| | | } else { |
| | | res.append("false", 5); |
| | | } |
| | | break; |
| | | case JValue::jv_string: DumpString(res, jv.get_value<std::string>(), opt.escape_unicode_); break; |
| | | default: std::invalid_argument("invalid json type:" + std::to_string(jv.type())); break; |
| | | } |
| | | } |
| | | |
| | | auto HexCharVal = [](const unsigned char c) { |
| | | static const unsigned char hex_char_table[] = { |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 10, 11, 12, 13, 14, 15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 10, 11, 12, 13, 14, 15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, |
| | | }; |
| | | return hex_char_table[c]; |
| | | }; |
| | | } // namespace _impl |
| | | |
| | | int JValue::parse(const char *begin, const char *end) |
| | | { |
| | | using namespace _impl; |
| | | const char *p = begin; |
| | | |
| | | auto Error = [&](const std::string &msg = "invalid value at"){ throw std::invalid_argument(msg + " : "+ std::string(p, std::min(int(end-p), 30))); }; |
| | | auto IsSpace = [](const char c) { |
| | | return c == ' ' || c == '\t' || c == '\r' || c == '\n'; |
| | | }; |
| | | auto IgnoreSpaces = [&]() { while (p != end && IsSpace(*p)) { ++p; } }; |
| | | auto Peek = [&]() { IgnoreSpaces(); if (p == end) { Error("input ends unexpectedly!"); } return *p; }; |
| | | auto Expect = [&](const char c) { if (Peek() != c) { Error(std::string("parse error, expecting ") + c); } }; |
| | | |
| | | auto parse_string = [&]() { |
| | | std::string res; |
| | | auto to_utf8 = [&]() { |
| | | auto Hex4 = [](const char *sz) { |
| | | auto FromHex = [](const unsigned char c)->unsigned char { |
| | | const unsigned char v = HexCharVal(c); |
| | | if (v == 0xFF) { |
| | | throw std::invalid_argument(std::string("invalid unicode input ") + char(c)); |
| | | } |
| | | return v; |
| | | }; |
| | | return (FromHex(sz[0]) << 12) | (FromHex(sz[1]) << 8) | (FromHex(sz[2]) << 4) | (FromHex(sz[3])); |
| | | }; |
| | | const unsigned int u = Hex4(p+2); |
| | | p += 4; |
| | | if (u < 0x80) { |
| | | res += char(u); |
| | | } else if (u < 0x800) { |
| | | res += char((u >> 6) | 0xC0); |
| | | res += char((u & 0x3F) | 0x80); |
| | | } else if (u < 0x10000) { |
| | | res += char((u >> 12) | 0xE0); |
| | | res += char(((u >> 6)& 0x3F) | 0x80); |
| | | res += char((u & 0x3F) | 0x80); |
| | | } else if (u < 0x110000) { |
| | | res += char((u >> 18) | 0xF0); |
| | | res += char(((u >> 12)& 0x3F) | 0x80); |
| | | res += char(((u >> 6)& 0x3F) | 0x80); |
| | | res += char((u & 0x3F) | 0x80); |
| | | } else { |
| | | Error("invalid unicode codepoint"); |
| | | } |
| | | }; |
| | | auto ParseEscapedChar = [&](const int c) { |
| | | const char unesc_table[] = { |
| | | 0 , '\b', 0 , 0 , 0 , '\f', 0 , |
| | | 0 , 0 , 0 , 0 , 0 , 0 , '\n', |
| | | 0 , 0 , 0 , '\r', 0 , '\t', }; |
| | | res += unesc_table[c-'a']; |
| | | }; |
| | | unsigned left = 0; |
| | | auto AddLeft = [&] { if (left != 0) { res.append(p-left, left); left = 0; } }; |
| | | while (++p != end) { |
| | | switch(*p) { |
| | | case '\"': |
| | | AddLeft(); |
| | | ++p; |
| | | return res; |
| | | case '\\': |
| | | AddLeft(); |
| | | if (p+1 != end) { |
| | | switch (p[1]) { |
| | | case 'b' : case 'f' : case 'n' : case 'r' : case 't' : ParseEscapedChar(p[1]); break; |
| | | case 'u' : to_utf8(); break; |
| | | case '\"': case '\\': case '/' : ++left; break; |
| | | default: left += 2; break; |
| | | } |
| | | ++p; // parsed 2 char. |
| | | } // else, next round will fail. |
| | | break; |
| | | // and escaped char should not appear. |
| | | case '\t': case '\n': case '\r': case '\b': case '\f': //case '/' : |
| | | AddLeft(); |
| | | Error("invalid string content") ; |
| | | break; |
| | | default: ++left; break; |
| | | } |
| | | } |
| | | Error("string ends unexpectedly!"); |
| | | return res; |
| | | }; |
| | | |
| | | auto parse_object = [&]() { |
| | | assert(*p == '{'); |
| | | ++p; |
| | | if (Peek() == '}') { |
| | | ++p; |
| | | return object_type(); |
| | | } |
| | | object_type obj; |
| | | while (true) { |
| | | Expect('\"'); |
| | | std::string key(parse_string()); |
| | | |
| | | Expect(':'); |
| | | ++p; |
| | | |
| | | JValue v; |
| | | p += v.parse(p, end); |
| | | obj.emplace(std::move(key), std::move(v)); |
| | | |
| | | switch (Peek()) { |
| | | case ',': ++p; break; |
| | | case '}': ++p; return obj; |
| | | default: Error("parse object error"); |
| | | } |
| | | } |
| | | }; |
| | | auto parse_array = [&]() { |
| | | assert(*p == '['); |
| | | ++p; |
| | | if (Peek() == ']') { |
| | | ++p; |
| | | return array_type(); |
| | | } |
| | | array_type arr; |
| | | while (true) { |
| | | JValue v; |
| | | p += v.parse(p, end); |
| | | arr.emplace_back(std::move(v)); |
| | | |
| | | switch (Peek()) { |
| | | case ',': ++p; break; |
| | | case ']': ++p; return arr; |
| | | default: Error("parse array error"); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | auto ExpectText = [&](const char *sz, const size_t len) { |
| | | if(memcmp(p+1, sz+1, len-1) == 0) { |
| | | p += len; |
| | | } else { |
| | | Error("parse literal error, expecting '" + std::string(sz, len) + "', read " + std::string(p, len).c_str()); |
| | | } |
| | | }; |
| | | |
| | | switch (Peek()) { |
| | | case '{': *this = parse_object(); break; |
| | | case '[': *this = parse_array(); break; |
| | | case '\"': *this = parse_string(); break; |
| | | case 't': ExpectText("true", 4); *this = true; break; |
| | | case 'f': ExpectText("false", 5); *this = false; break; |
| | | case 'n': ExpectText("null", 4); *this = nullptr; break; |
| | | case '0': case '1': case '2': case '3': case '4': |
| | | case '5': case '6': case '7': case '8': case '9': case '-': |
| | | { |
| | | auto IsNumber = [](const char c) { return '0' <= c && c <= '9'; }; |
| | | auto sep = p+1; |
| | | while (sep != end && IsNumber(*sep)) { ++sep; } |
| | | char *parse_end = (char*)p; |
| | | switch (*sep) { |
| | | case '.' : case 'e' : case 'E': *this = strtold((char*)p, &parse_end); break; |
| | | default: *this = strtoll((char*)p, &parse_end, 10); break; |
| | | } |
| | | p = parse_end; |
| | | } break; |
| | | default : Error(); break; |
| | | } |
| | | return p - begin; |
| | | } |
| | | |
| | | std::string JValue::dump(const DumpOptions &opt_in) const { |
| | | using namespace _impl; |
| | | Buffer res; |
| | | DumpOptions opt(opt_in); |
| | | if(opt.indent_step_ > 0) { |
| | | res.append(opt.indent_level_ * opt.indent_step_, ' '); |
| | | } |
| | | Dump(*this, res, opt, opt.indent_step_ > 0); |
| | | return res; |
| | | } |
| | | } // jsonpp |
| | | |
New file |
| | |
| | | #ifndef JSON_B360EKF1 |
| | | #define JSON_B360EKF1 |
| | | |
| | | #include <algorithm> |
| | | #include <boost/variant.hpp> |
| | | #include <map> |
| | | #include <string> |
| | | #include <vector> |
| | | |
| | | namespace ssjson |
| | | { |
| | | |
| | | class JValue |
| | | { |
| | | public: |
| | | typedef std::string key_type; |
| | | typedef long long int_type; |
| | | typedef long double float_type; |
| | | typedef JValue value_type; |
| | | typedef std::map<key_type, value_type> object_type; |
| | | typedef object_type::value_type pair_type; |
| | | typedef std::vector<value_type> array_type; |
| | | |
| | | enum JVType { jv_null, |
| | | jv_int, |
| | | jv_float, |
| | | jv_bool, |
| | | jv_string, |
| | | jv_object, |
| | | jv_array }; |
| | | std::string name() const |
| | | { |
| | | static const char *names[] = {"jv_null", "jv_int", "jv_float", "jv_bool", "jv_string", "jv_object", "jv_array"}; |
| | | return names[type()]; |
| | | } |
| | | |
| | | private: |
| | | typedef boost::variant<std::nullptr_t, int_type, float_type, bool, std::string, object_type, array_type> data_type; |
| | | |
| | | template <class C> |
| | | const C &val() const { return ref_val<C>(); } |
| | | template <class C> |
| | | C &val() { return const_cast<C &>(ref_val<C>()); } |
| | | |
| | | template <class C> |
| | | class TInitValue |
| | | { |
| | | typedef C value_t; |
| | | typedef std::vector<TInitValue> list_t; |
| | | mutable boost::variant<value_t, list_t> data_; |
| | | |
| | | public: |
| | | template <class T> |
| | | TInitValue(const T &t) : |
| | | data_(value_t(t)) {} |
| | | template <class T> |
| | | TInitValue(T &&t) : |
| | | data_(value_t(std::move(t))) {} |
| | | TInitValue(std::initializer_list<TInitValue> l) : |
| | | data_(list_t(l)){}; |
| | | bool is_value() const { return data_.which() == 0; } |
| | | bool is_list() const { return data_.which() == 1; } |
| | | const list_t &list() const { return boost::get<list_t>(data_); } |
| | | const value_t &value() const { return boost::get<value_t>(data_); } |
| | | }; |
| | | typedef TInitValue<JValue> InitValue; |
| | | typedef std::initializer_list<InitValue> InitList; |
| | | |
| | | template <class Iter> |
| | | static object_type MakeObject(Iter begin, Iter end) |
| | | { |
| | | object_type obj; |
| | | for (auto it = begin; it != end; ++it) { |
| | | obj.emplace(std::move(it->list()[0].value().template val<key_type>()), (it->list()[1])); |
| | | } |
| | | return obj; |
| | | } |
| | | template <class Iter> |
| | | static array_type MakeArray(Iter begin, Iter end) { return array_type(begin, end); } |
| | | template <class Iter> |
| | | static JValue FromInit(Iter begin, Iter end) |
| | | { |
| | | auto like_object_item = [](const InitValue &v) { |
| | | return v.is_list() && |
| | | v.list().size() == 2 && |
| | | v.list()[0].is_value() && |
| | | v.list()[0].value().type() == jv_string; |
| | | }; |
| | | if (std::all_of(begin, end, like_object_item)) { |
| | | return MakeObject(begin, end); |
| | | } else { |
| | | return MakeArray(begin, end); |
| | | } |
| | | } |
| | | static JValue FromInit(const InitValue &ji) |
| | | { |
| | | if (ji.is_value()) { |
| | | return std::move(ji.value()); |
| | | } else { |
| | | return FromInit(ji.list().begin(), ji.list().end()); |
| | | } |
| | | } |
| | | |
| | | public: |
| | | struct DumpOptions { |
| | | int indent_step_ = 0; |
| | | int indent_level_ = 0; |
| | | bool escape_unicode_ = false; |
| | | }; |
| | | JValue() : |
| | | data_(object_type()) { static_assert(sizeof(JValue) == sizeof(data_type), "JValue should simple wrap data_type with no other fields."); } |
| | | #define SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \ |
| | | JValue(decl) : data_(expr) {} \ |
| | | JValue &operator=(decl) \ |
| | | { \ |
| | | data_ = (expr); \ |
| | | return *this; \ |
| | | } |
| | | // operator=() seems faster than JValue(exer).swap(data_). |
| | | // copy |
| | | SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(const JValue &v, v.data_) // self; |
| | | #define MAP_TYPE_TO_JSON_TYPE(decl, expr) \ |
| | | public: \ |
| | | SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \ |
| | | private: \ |
| | | static auto JVTypeAccept(decl)->std::remove_reference<decltype(expr)>::type |
| | | #define MAP_TYPE_TO_JSON_TYPE_BIDIR(decl, expr) \ |
| | | MAP_TYPE_TO_JSON_TYPE(decl, expr); \ |
| | | \ |
| | | private: \ |
| | | static auto JVTypeReturn(decl)->std::remove_reference<decltype(expr)>::type |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::nullptr_t, nullptr); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const short v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned short v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const int v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned int v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const long v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const long long v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long long v, int_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const float v, float_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const double v, float_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const long double v, float_type(v)); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const bool v, v); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::string &v, v); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const object_type &v, v); |
| | | MAP_TYPE_TO_JSON_TYPE_BIDIR(const array_type &v, v); |
| | | MAP_TYPE_TO_JSON_TYPE(const char *v, std::string(v)); |
| | | #undef MAP_TYPE_TO_JSON_TYPE_BIDIR |
| | | #undef MAP_TYPE_TO_JSON_TYPE |
| | | template <class T> |
| | | static void JVTypeAccept(T); // use void to disable other types. |
| | | template <class T> |
| | | static void JVTypeReturn(T); // use void to disable other types. |
| | | public: |
| | | // move |
| | | SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(JValue &&v, std::move(v.data_)) // self |
| | | SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(std::string &&v, std::move(v)) // and other classes |
| | | SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(object_type &&v, std::move(v)) |
| | | SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(array_type &&v, std::move(v)) |
| | | #undef SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT |
| | | |
| | | static JValue Object() |
| | | { |
| | | return (object_type()); |
| | | } |
| | | static JValue Object(InitList l) { return (MakeObject(l.begin(), l.end())); } |
| | | static JValue Array() { return (array_type()); } |
| | | static JValue Array(InitList l) { return (MakeArray(l.begin(), l.end())); } |
| | | JValue(InitList l) : |
| | | JValue(FromInit(l.begin(), l.end())) {} |
| | | JValue(const InitValue &ji) : |
| | | JValue(FromInit(ji)) {} |
| | | JVType type() const { return static_cast<JVType>(data_.which()); } |
| | | |
| | | std::string dump(const DumpOptions &opt_in) const; |
| | | std::string dump(const int indent_step = 0, const int indent_level = 0, const bool escape_unicode = false) const |
| | | { |
| | | DumpOptions opt; |
| | | opt.indent_step_ = indent_step; |
| | | opt.indent_level_ = indent_level; |
| | | opt.escape_unicode_ = escape_unicode; |
| | | return dump(opt); |
| | | } |
| | | bool parse(const std::string &s) { return parse(s.c_str(), s.c_str() + s.size()) > 0; } |
| | | int parse(const char *begin, const char *end); |
| | | void swap(JValue &a) { data_.swap(a.data_); } |
| | | |
| | | template <class T> |
| | | T get_value() const { return val<decltype(JVTypeReturn(T()))>(); } |
| | | template <class T> |
| | | void put_value(T &&v) { *this = v; } |
| | | const object_type &object() const { return val<object_type>(); } |
| | | object_type &object() { return val<object_type>(); } |
| | | const array_type &array() const { return val<array_type>(); } |
| | | array_type &array() { return val<array_type>(); } |
| | | bool is_object() const { return type() == jv_object; } |
| | | bool is_array() const { return type() == jv_array; } |
| | | |
| | | // array ops |
| | | const value_type &operator[](const size_t idx) const { return array()[idx]; } |
| | | value_type &operator[](const size_t idx) { return array()[idx]; } |
| | | const value_type &at(const size_t idx) const { return array().at(idx); } |
| | | value_type &at(const size_t idx) { return array().at(idx); } |
| | | void push_back(const value_type &v) { array().push_back(v); } |
| | | void push_back(value_type &&v) { array().push_back(std::move(v)); } |
| | | |
| | | // object ops |
| | | value_type &operator[](const key_type &key) { return object()[key]; } |
| | | const value_type &at(const key_type &key) const { return object().at(key); } |
| | | value_type &at(const key_type &key) { return object().at(key); } |
| | | const value_type &child(const key_type &path) const { return ref_child(path); } |
| | | value_type &child(const key_type &path) { return const_cast<value_type &>(ref_child(path)); } |
| | | template <class T> |
| | | T get(const key_type &path) const { return child(path).val<decltype(JVTypeReturn(T()))>(); } |
| | | template <class T> |
| | | auto get(const key_type &path, const T &def) const -> typename std::remove_reference<decltype(JVTypeAccept(def))>::type |
| | | { |
| | | try { |
| | | return get<decltype(JVTypeAccept(def))>(path); |
| | | } catch (...) { |
| | | return def; |
| | | } |
| | | } |
| | | value_type &put(pair_type &&elem) |
| | | { |
| | | const key_type &key = elem.first; |
| | | auto sep = key.find('.'); |
| | | if (sep == key_type::npos) { |
| | | auto r = object().lower_bound(elem.first); |
| | | if (r != object().end() && r->first == elem.first) { |
| | | r->second = std::move(elem.second); |
| | | return r->second; |
| | | } else { |
| | | return object().emplace_hint(r, std::move(elem))->second; |
| | | } |
| | | } else { |
| | | return (*this)[key.substr(0, sep)].put(pair_type(key.substr(sep + 1), std::move(elem.second))); |
| | | } |
| | | } |
| | | value_type &put(const pair_type &elem) |
| | | { |
| | | pair_type tmp(elem); |
| | | return put(std::move(tmp)); |
| | | } |
| | | value_type &put(const key_type &k, const value_type &v) { return put(pair_type(k, v)); } |
| | | value_type &put(const key_type &k, value_type &&v) { return put(pair_type(k, std::move(v))); } |
| | | value_type &put(key_type &&k, const value_type &v) { return put(pair_type(std::move(k), v)); } |
| | | value_type &put(key_type &&k, value_type &&v) { return put(pair_type(std::move(k), std::move(v))); } |
| | | |
| | | private: |
| | | template <class C> |
| | | const C &ref_val() const |
| | | { |
| | | try { |
| | | return boost::get<C>(data_); |
| | | } catch (std::exception &) { |
| | | throw std::runtime_error("json get error, " + name() + " is not a " + JValue(C()).name()); |
| | | } |
| | | } |
| | | const value_type &ref_child(const key_type &path) const |
| | | { |
| | | auto sep = path.find('.'); |
| | | if (sep == key_type::npos) { |
| | | return at(path); |
| | | } else { |
| | | return at(path.substr(0, sep)).ref_child(path.substr(sep + 1)); |
| | | } |
| | | } |
| | | data_type data_; |
| | | }; |
| | | |
| | | typedef JValue Json; |
| | | |
| | | } // namespace ssjson |
| | | |
| | | #endif // end of include guard: JSON_B360EKF1 |
| | |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) |
| | | |
| | | MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id) |
| | | { |
| | | typedef MsgQueryProcReply Reply; |
| | | auto query = [&](Node self) -> Reply { |
| | | auto Add1 = [](Reply &reply, Node node) { |
| | | auto info = reply.add_proc_list(); |
| | | *info->mutable_proc() = node->proc_; |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | } |
| | | auto Add1 = [](Reply &reply, Node node) { |
| | | auto info = reply.add_proc_list(); |
| | | *info->mutable_proc() = node->proc_; |
| | | info->mutable_proc()->clear_private_info(); |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | } |
| | | }; |
| | | |
| | | if (!req.proc_id().empty()) { |
| | | auto pos = online_node_addr_map_.find(req.proc_id()); |
| | | if (pos == online_node_addr_map_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc not found."); |
| | | } else { |
| | | auto node_pos = nodes_.find(pos->second); |
| | | if (node_pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc node not found."); |
| | | } else { |
| | | auto reply = MakeReply<Reply>(eSuccess); |
| | | Add1(reply, node_pos->second); |
| | | return reply; |
| | | } |
| | | } |
| | | } else { |
| | | Reply reply(MakeReply<Reply>(eSuccess)); |
| | | for (auto &kv : nodes_) { |
| | | Add1(reply, kv.second); |
| | | } |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | | if (!proc_id.empty()) { |
| | | auto pos = online_node_addr_map_.find(proc_id); |
| | | if (pos == online_node_addr_map_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc not found."); |
| | | } else { |
| | | auto node_pos = nodes_.find(pos->second); |
| | | if (node_pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc node not found."); |
| | | } else { |
| | | auto reply = MakeReply<Reply>(eSuccess); |
| | | Add1(reply, node_pos->second); |
| | | return reply; |
| | | } |
| | | } |
| | | } else { |
| | | Reply reply(MakeReply<Reply>(eSuccess)); |
| | | for (auto &kv : nodes_) { |
| | | Add1(reply, kv.second); |
| | | } |
| | | return reply; |
| | | } |
| | | } |
| | | MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) |
| | | { |
| | | typedef MsgQueryProcReply Reply; |
| | | auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); }; |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | |
| | | MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg); |
| | | MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg); |
| | | MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req); |
| | | MsgQueryProcReply QueryProc(const std::string &proc_id); |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req); |
| | | MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg); |
| | | MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg); |
| | |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | // Sleep(1s); |
| | | } |
| | | auto PrintProcs = [](MsgQueryProcReply const &result) { |
| | | printf("query proc result: %d\n", result.proc_list().size()); |
| | | for (int i = 0; i < result.proc_list().size(); ++i) { |
| | | auto &info = result.proc_list(i); |
| | | printf("proc [%d] %s, %s, %s\n\ttopics\n", i, |
| | | (info.online() ? "online" : "offline"), |
| | | info.proc().proc_id().c_str(), info.proc().name().c_str()); |
| | | for (auto &t : info.topics().topic_list()) { |
| | | printf("\t\t %s\n", t.c_str()); |
| | | } |
| | | printf("\n"); |
| | | } |
| | | printf("\n"); |
| | | }; |
| | | { |
| | | // query procs |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | |
| | | DEFER1(BHFree(reply, reply_len)); |
| | | MsgQueryProcReply result; |
| | | if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) { |
| | | printf("query proc result: %d\n", result.proc_list().size()); |
| | | for (int i = 0; i < result.proc_list().size(); ++i) { |
| | | auto &info = result.proc_list(i); |
| | | printf("proc [%d] %s, %s, %s\n\ttopics\n", i, |
| | | (info.online() ? "online" : "offline"), |
| | | info.proc().proc_id().c_str(), info.proc().name().c_str()); |
| | | for (auto &t : info.topics().topic_list()) { |
| | | printf("\t\t %s", t.c_str()); |
| | | } |
| | | } |
| | | PrintProcs(result); |
| | | } else { |
| | | printf("query proc error\n"); |
| | | } |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | // Sleep(1s); |
| | | } |
| | | { |
| | | // query procs with normal topic request |
| | | MsgRequestTopic req; |
| | | req.set_topic("@center_query_procs"); |
| | | std::string s(req.SerializeAsString()); |
| | | // Sleep(10ms, false); |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | | void *proc_id = 0; |
| | | int proc_id_len = 0; |
| | | DEFER1(BHFree(proc_id, proc_id_len);); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | DEFER1(BHFree(reply, reply_len)); |
| | | bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100); |
| | | if (!r) { |
| | | int ec = 0; |
| | | std::string msg; |
| | | GetLastError(ec, msg); |
| | | printf("topic query proc error: %s\n", msg.c_str()); |
| | | } else { |
| | | MsgRequestTopicReply ret; |
| | | ret.ParseFromArray(reply, reply_len); |
| | | printf("topic query proc : %s\n", ret.data().c_str()); |
| | | // MsgQueryProcReply result; |
| | | // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) { |
| | | // PrintProcs(result); |
| | | // } else { |
| | | // printf("topic query proc error\n"); |
| | | // } |
| | | } |
| | | } |
| | | // return; |
| | | |
| | | { // Subscribe |
| | | MsgTopicList topics; |
| | |
| | | threads.Launch(hb, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const int64_t nreq = 1000 * 100; |
| | | const int64_t nreq = 1; //000 * 100; |
| | | |
| | | for (int i = 0; i < 100; ++i) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | SyncRequest(i); |
| | | } |
| | | |