lichao
2021-05-20 0e31f38fc37216e1376d8101d1bcf7a3779279dc
add center topic node.
4个文件已添加
5个文件已修改
961 ■■■■■ 已修改文件
box/center.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.cpp 117 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.h 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/json.cpp 380 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/json.h 277 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 68 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 61 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -16,6 +16,7 @@
 * =====================================================================================
 */
#include "center.h"
#include "center_topic_node.h"
#include "node_center.h"
#include <chrono>
@@ -185,7 +186,10 @@
        auto &info = kv.second;
        sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
    }
    topic_node_.reset(new CenterTopicNode(center_ptr, shm));
}
BHCenter::~BHCenter() { Stop(); }
bool BHCenter::Start()
{
@@ -193,12 +197,13 @@
        auto &info = kv.second;
        sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
    }
    topic_node_->Start();
    return true;
}
bool BHCenter::Stop()
{
    topic_node_->Stop();
    for (auto &kv : sockets_) {
        kv.second->Stop();
    }
box/center.h
@@ -22,6 +22,7 @@
#include <functional>
#include <map>
#include <memory>
class CenterTopicNode;
class BHCenter
{
@@ -34,7 +35,7 @@
    static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len);
    BHCenter(Socket::Shm &shm);
    ~BHCenter() { Stop(); }
    ~BHCenter();
    bool Start();
    bool Stop();
@@ -51,6 +52,7 @@
    static CenterRecords &Centers();
    std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
    std::unique_ptr<CenterTopicNode> topic_node_;
};
#endif // end of include guard: CENTER_TM9OUQTG
box/center_topic_node.cpp
New file
@@ -0,0 +1,117 @@
/*
 * =====================================================================================
 *
 *       Filename:  center_topic_node.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月20日 12时44分31秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "center_topic_node.h"
#include "node_center.h"
#include "topic_node.h"
#include "json.h"
#include <chrono>
using namespace std::chrono;
using namespace std::chrono_literals;
using namespace bhome_shm;
using namespace ssjson;
namespace
{
const std::string &kTopicQueryProc = "@center_query_procs";
std::string ToJson(const MsgQueryProcReply &qpr)
{
    Json json;
    json.put("procCount", qpr.proc_list_size());
    auto &list = json.put("procList", Json::Array());
    // Json list = Json::Array();
    for (auto &info : qpr.proc_list()) {
        Json proc;
        proc.put("id", info.proc().proc_id());
        proc.put("name", info.proc().name());
        proc.put("publicInfo", info.proc().public_info());
        proc.put("online", info.online());
        Json topics = Json::Array();
        for (auto &t : info.topics().topic_list()) {
            topics.push_back(t);
        }
        proc.put("topics", topics);
        list.push_back(proc);
    }
    return json.dump(0);
}
} // namespace
CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
    pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {}
CenterTopicNode::~CenterTopicNode() { Stop(); }
void CenterTopicNode::Stop()
{
    bool cur = true;
    if (run_.compare_exchange_strong(cur, false) && worker_.joinable()) {
        worker_.join();
        pnode_->Stop();
    }
}
bool CenterTopicNode::Start()
{
    Stop();
    int timeout = 3000;
    MsgCommonReply reply;
    ProcInfo info;
    info.set_proc_id("#center.node");
    info.set_name("center node");
    if (!pnode_->Register(info, reply, timeout)) {
        throw std::runtime_error("center node register failed.");
    }
    MsgTopicList topics;
    topics.add_topic_list(kTopicQueryProc);
    if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) {
        throw std::runtime_error("center node register topics failed.");
    }
    auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
        auto reply = MakeReply<MsgRequestTopicReply>(eSuccess);
        if (request.topic() == kTopicQueryProc) {
            auto data = (*pscenter_)->QueryProc(request.data());
            *reply.mutable_errmsg() = data.errmsg();
            reply.set_data(ToJson(data));
        } else {
            SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic());
        }
        pnode_->ServerSendReply(src_info, reply);
    };
    bool cur = false;
    if (run_.compare_exchange_strong(cur, true)) {
        auto heartbeat = [this]() {
            while (run_) {
                pnode_->Heartbeat(1000);
                std::this_thread::sleep_for(1s);
            }
        };
        std::thread(heartbeat).swap(worker_);
        return pnode_->ServerStart(onRequest);
    } else {
        return false;
    }
}
box/center_topic_node.h
New file
@@ -0,0 +1,46 @@
/*
 * =====================================================================================
 *
 *       Filename:  center_topic_node.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年05月20日 12时44分42秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef CENTER_TOPIC_NODE_YCI0P9KC
#define CENTER_TOPIC_NODE_YCI0P9KC
#include "bh_util.h"
#include "shm.h"
#include <atomic>
#include <memory>
#include <thread>
class TopicNode;
class NodeCenter;
// center as a topic node.
class CenterTopicNode
{
public:
    typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
    CenterTopicNode(CenterPtr center, bhome_shm::SharedMemory &shm);
    ~CenterTopicNode();
    bool Start();
    void Stop();
private:
    CenterPtr pscenter_;
    std::unique_ptr<TopicNode> pnode_;
    std::thread worker_;
    std::atomic<bool> run_;
};
#endif // end of include guard: CENTER_TOPIC_NODE_YCI0P9KC
box/json.cpp
New file
@@ -0,0 +1,380 @@
#include "json.h"
namespace ssjson {
namespace _impl {
    typedef std::string Buffer;
    template <class Buf, class ...T>
    inline void DumpNumber(Buf &res, const char *fmt, T&&...val) {
        char buf[64] = {0};
        int n = snprintf(buf, sizeof(buf)-1, fmt, val...);
        res.append(buf, n);
    }
    inline void DumpFloat(Buffer &res, const long double fval) { DumpNumber(res, "%.*Lg", std::numeric_limits<long double>::digits10, fval); }
    inline void DumpInt(Buffer &res, const long long ival)  { DumpNumber(res, "%lld", ival); }
//    inline void DumpFloat(Buffer &res, const double fval) { DumpNumber(res, "%.*g", std::numeric_limits<double>::digits10, fval); }
//    inline void DumpInt(Buffer &res, const long ival)  { DumpNumber(res, "%ld", ival); }
    // assume utf8 string.
    void DumpString(Buffer &res, const std::string &s, const bool escape_unicode) {
        res += '\"';
        auto EscapeSpecialChar = [&](const int c) { // ( \b, \f, \t, \n, \r, \", \\, / ).
            const char esc_table[] = {
                 0 , 0,  0 , 0 ,  0 , 0,  0,  0 , 'b', 't',
                'n', 0, 'f','r',  0 , 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,  0 , 0 ,'\"', 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,  0 , 0 ,  0 , 0,  0, '/',  0 ,  0 ,
                 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,  0 , 0 ,  0 , 0,  0,  0 ,  0 ,  0 ,
                 0 , 0,'\\', };
            res += '\\';
            res += esc_table[c];
        };
        auto AddUnicode = [&](unsigned u) {
            auto HexChar = [](unsigned char uc) { return "0123456789abcdef"[uc & 0xF]; };
            char buf[6] = { '\\', 'u', HexChar(u>>12), HexChar(u>>8), HexChar(u>>4), HexChar(u) };
            res.append(buf, buf+6);
        };
        auto IsControl = [](const unsigned char c) { return c < 0x20 || c == 0x7F; };
        if (escape_unicode) {
            for (unsigned i = 0; i < s.size(); ++i) {
                switch(s[i]) {
                    case '\b': case '\f': case '\t': case '\n': case '\r':
                    case '\"': case '\\': case '/' :
                        EscapeSpecialChar(s[i]);
                        break;
                    default :
                        {
                            const unsigned char &uc = (unsigned char &)s[i];
                            const unsigned char *sz = &uc;
                            if (IsControl(uc)) { // control characters
                                AddUnicode(uc);
                            } else if (uc < 0x80) {
                                res += s[i];
                            } else if (uc < 0xE0) { // 2 bytes
                                AddUnicode(((sz[0]&0x1F)<<6) | (sz[1] & 0x3F));
                                i += 1;
                            } else if (uc < 0xF0) { // 3 bytes
                                AddUnicode(((sz[0]&0x0F)<<12) | ((sz[1]&0x3F)<<6) | (sz[2] & 0x3F));
                                i += 2;
                            } else {
                                res += s[i]; // exceed 0xFFFF.
                            }
                        } break;
                }
            }
        } else {
            for (unsigned i = 0; i < s.size(); ++i) {
                switch(s[i]) {
                    case '\b': case '\f': case '\t': case '\n': case '\r':
                    case '\"': case '\\': case '/' :
                        EscapeSpecialChar(s[i]);
                        break;
                    default :
                        if (IsControl(s[i])) { // control characters
                            AddUnicode(s[i]);
                        } else {
                            res += s[i];
                        }
                        break;
                }
            }
        }
        res += '\"';
    };
    void Dump(const JValue &jv, Buffer &res, JValue::DumpOptions &opt, const bool pretty)
    {
        auto NewLine = [&]() {
            res += '\n';
               res.append(opt.indent_level_ * opt.indent_step_, ' ');
        };
        switch (jv.type()) {
            case JValue::jv_object:
                {
                    res += '{';
                    auto &obj = jv.object();
                    if (!obj.empty()) {
                        if (pretty) {
                            opt.indent_level_++;
                            for (auto &&kv : obj) {
                                NewLine();
                                DumpString(res, kv.first, opt.escape_unicode_);
                                res.append(": ", 2);
                                Dump(kv.second, res, opt, pretty);
                                res += ',';
                            }
                            res.pop_back(); // remove last ',';
                            opt.indent_level_--;
                            NewLine();
                        } else {
                            for (auto &&kv : obj) {
                                DumpString(res, kv.first, opt.escape_unicode_);
                                res += ':';
                                Dump(kv.second, res, opt, pretty);
                                res += ',';
                            }
                            res.pop_back(); // remove last ',';
                        }
                    }
                    res += '}';
                } break;
            case JValue::jv_array:
                {
                    res += '[';
                    auto &arr = jv.array();
                    if (!arr.empty()) {
                        if (pretty) {
                            opt.indent_level_++;
                            for (auto &&v : arr) {
                                NewLine();
                                Dump(v, res, opt, pretty);
                                res += ',';
                            }
                            res.pop_back(); // remove last ',';
                            opt.indent_level_--;
                            NewLine();
                        } else {
                            for (auto &&v : arr) {
                                Dump(v, res, opt, pretty);
                                res += ',';
                            }
                            res.pop_back(); // remove last ',';
                        }
                    }
                    res += ']';
                } break;
            case JValue::jv_float:
                {
                    auto idx = res.size();
                    DumpFloat(res, jv.get_value<JValue::float_type>());
                    while (idx < res.size() && res[idx] != '.') {
                        ++idx;
                    }
                    if (idx == res.size()) {
                        res.append(".0", 2);
                    }
                } break;
            case JValue::jv_null: res.append("null", 4); break;
            case JValue::jv_int: DumpInt(res, jv.get_value<JValue::int_type>()); break;
            case JValue::jv_bool:
                         if (jv.get_value<bool>()) {
                             res.append("true", 4);
                         } else {
                             res.append("false", 5);
                         }
                         break;
            case JValue::jv_string: DumpString(res, jv.get_value<std::string>(), opt.escape_unicode_); break;
            default: std::invalid_argument("invalid json type:" + std::to_string(jv.type())); break;
        }
    }
    auto HexCharVal = [](const unsigned char c) {
        static const unsigned char hex_char_table[] = {
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0,    1,    2,    3,    4,    5,    6,    7,    8,    9, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF,   10,   11,   12,   13,   14,   15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF,   10,   11,   12,   13,   14,   15, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
            0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
        };
        return hex_char_table[c];
    };
} // namespace _impl
    int JValue::parse(const char *begin, const char *end)
    {
        using namespace _impl;
        const char *p = begin;
        auto Error = [&](const std::string &msg = "invalid value at"){ throw std::invalid_argument(msg + " : "+ std::string(p, std::min(int(end-p), 30))); };
        auto IsSpace = [](const char c) {
            return c == ' ' || c == '\t' || c == '\r' || c == '\n';
        };
        auto IgnoreSpaces = [&]() { while (p != end && IsSpace(*p)) { ++p; } };
        auto Peek = [&]() { IgnoreSpaces(); if (p == end) { Error("input ends unexpectedly!"); } return *p; };
        auto Expect = [&](const char c) { if (Peek() != c) { Error(std::string("parse error, expecting ") + c); } };
        auto parse_string = [&]() {
            std::string res;
            auto to_utf8 = [&]() {
                auto Hex4 = [](const char *sz) {
                    auto FromHex = [](const unsigned char c)->unsigned char {
                        const unsigned char v = HexCharVal(c);
                        if (v == 0xFF) {
                            throw std::invalid_argument(std::string("invalid unicode input ") + char(c));
                        }
                        return v;
                    };
                    return (FromHex(sz[0]) << 12) | (FromHex(sz[1]) << 8) | (FromHex(sz[2]) << 4) | (FromHex(sz[3]));
                };
                const unsigned int u = Hex4(p+2);
                p += 4;
                if (u < 0x80) {
                    res += char(u);
                } else if (u < 0x800) {
                    res += char((u >> 6) | 0xC0);
                    res += char((u & 0x3F) | 0x80);
                } else if (u < 0x10000) {
                    res += char((u >> 12) | 0xE0);
                    res += char(((u >> 6)& 0x3F) | 0x80);
                    res += char((u & 0x3F) | 0x80);
                } else if (u < 0x110000) {
                    res += char((u >> 18) | 0xF0);
                    res += char(((u >> 12)& 0x3F) | 0x80);
                    res += char(((u >> 6)& 0x3F) | 0x80);
                    res += char((u & 0x3F) | 0x80);
                } else {
                    Error("invalid unicode codepoint");
                }
            };
            auto ParseEscapedChar = [&](const int c) {
                const char unesc_table[] = {
                    0 , '\b',   0 ,   0 ,   0 , '\f',   0 ,
                    0 ,   0 ,   0 ,   0 ,   0 ,   0 , '\n',
                    0 ,   0 ,   0 , '\r',   0 , '\t', };
                res += unesc_table[c-'a'];
            };
            unsigned left = 0;
            auto AddLeft = [&] { if (left != 0) { res.append(p-left, left); left = 0; } };
            while (++p != end) {
                switch(*p) {
                case '\"':
                    AddLeft();
                    ++p;
                    return res;
                case '\\':
                    AddLeft();
                    if (p+1 != end) {
                        switch (p[1]) {
                        case 'b' : case 'f' : case 'n' : case 'r' : case 't' : ParseEscapedChar(p[1]); break;
                        case 'u' : to_utf8(); break;
                        case '\"': case '\\': case '/' : ++left; break;
                        default: left += 2; break;
                        }
                        ++p; // parsed 2 char.
                    } // else, next round will fail.
                    break;
                // and escaped char should not appear.
                case '\t': case '\n': case '\r': case '\b': case '\f': //case '/' :
                    AddLeft();
                    Error("invalid string content") ;
                    break;
                default: ++left; break;
                }
            }
            Error("string ends unexpectedly!");
            return res;
        };
        auto parse_object = [&]() {
            assert(*p == '{');
            ++p;
            if (Peek() == '}') {
                ++p;
                return object_type();
            }
            object_type obj;
            while (true) {
                Expect('\"');
                std::string key(parse_string());
                Expect(':');
                ++p;
                JValue v;
                p += v.parse(p, end);
                obj.emplace(std::move(key), std::move(v));
                switch (Peek()) {
                    case ',': ++p; break;
                    case '}': ++p; return obj;
                    default: Error("parse object error");
                }
            }
        };
        auto parse_array = [&]() {
            assert(*p == '[');
            ++p;
            if (Peek() == ']') {
                ++p;
                return array_type();
            }
            array_type arr;
            while (true) {
                JValue v;
                p += v.parse(p, end);
                arr.emplace_back(std::move(v));
                switch (Peek()) {
                    case ',': ++p; break;
                    case ']': ++p; return arr;
                    default: Error("parse array error");
                }
            }
        };
        auto ExpectText = [&](const char *sz, const size_t len) {
            if(memcmp(p+1, sz+1, len-1) == 0) {
                p += len;
            } else {
                Error("parse literal error, expecting '" + std::string(sz, len) + "', read " + std::string(p, len).c_str());
            }
        };
        switch (Peek()) {
        case '{': *this = parse_object(); break;
        case '[': *this = parse_array(); break;
        case '\"': *this = parse_string(); break;
        case 't': ExpectText("true", 4); *this = true; break;
        case 'f': ExpectText("false", 5); *this = false; break;
        case 'n': ExpectText("null", 4); *this = nullptr; break;
        case '0': case '1': case '2': case '3': case '4':
        case '5': case '6': case '7': case '8': case '9': case '-':
                  {
                      auto IsNumber = [](const char c) { return '0' <= c && c <= '9'; };
                      auto sep = p+1;
                      while (sep != end && IsNumber(*sep)) { ++sep; }
                      char *parse_end = (char*)p;
                      switch (*sep) {
                          case '.' : case 'e' : case 'E': *this = strtold((char*)p, &parse_end); break;
                          default: *this = strtoll((char*)p, &parse_end, 10); break;
                      }
                      p = parse_end;
                  } break;
        default : Error(); break;
        }
        return p - begin;
    }
    std::string JValue::dump(const DumpOptions &opt_in) const {
        using namespace _impl;
        Buffer res;
        DumpOptions opt(opt_in);
        if(opt.indent_step_ > 0) {
               res.append(opt.indent_level_ * opt.indent_step_, ' ');
        }
        Dump(*this, res, opt, opt.indent_step_ > 0);
        return res;
    }
} // jsonpp
box/json.h
New file
@@ -0,0 +1,277 @@
#ifndef JSON_B360EKF1
#define JSON_B360EKF1
#include <algorithm>
#include <boost/variant.hpp>
#include <map>
#include <string>
#include <vector>
namespace ssjson
{
class JValue
{
public:
    typedef std::string key_type;
    typedef long long int_type;
    typedef long double float_type;
    typedef JValue value_type;
    typedef std::map<key_type, value_type> object_type;
    typedef object_type::value_type pair_type;
    typedef std::vector<value_type> array_type;
    enum JVType { jv_null,
                  jv_int,
                  jv_float,
                  jv_bool,
                  jv_string,
                  jv_object,
                  jv_array };
    std::string name() const
    {
        static const char *names[] = {"jv_null", "jv_int", "jv_float", "jv_bool", "jv_string", "jv_object", "jv_array"};
        return names[type()];
    }
private:
    typedef boost::variant<std::nullptr_t, int_type, float_type, bool, std::string, object_type, array_type> data_type;
    template <class C>
    const C &val() const { return ref_val<C>(); }
    template <class C>
    C &val() { return const_cast<C &>(ref_val<C>()); }
    template <class C>
    class TInitValue
    {
        typedef C value_t;
        typedef std::vector<TInitValue> list_t;
        mutable boost::variant<value_t, list_t> data_;
    public:
        template <class T>
        TInitValue(const T &t) :
            data_(value_t(t)) {}
        template <class T>
        TInitValue(T &&t) :
            data_(value_t(std::move(t))) {}
        TInitValue(std::initializer_list<TInitValue> l) :
            data_(list_t(l)){};
        bool is_value() const { return data_.which() == 0; }
        bool is_list() const { return data_.which() == 1; }
        const list_t &list() const { return boost::get<list_t>(data_); }
        const value_t &value() const { return boost::get<value_t>(data_); }
    };
    typedef TInitValue<JValue> InitValue;
    typedef std::initializer_list<InitValue> InitList;
    template <class Iter>
    static object_type MakeObject(Iter begin, Iter end)
    {
        object_type obj;
        for (auto it = begin; it != end; ++it) {
            obj.emplace(std::move(it->list()[0].value().template val<key_type>()), (it->list()[1]));
        }
        return obj;
    }
    template <class Iter>
    static array_type MakeArray(Iter begin, Iter end) { return array_type(begin, end); }
    template <class Iter>
    static JValue FromInit(Iter begin, Iter end)
    {
        auto like_object_item = [](const InitValue &v) {
            return v.is_list() &&
                   v.list().size() == 2 &&
                   v.list()[0].is_value() &&
                   v.list()[0].value().type() == jv_string;
        };
        if (std::all_of(begin, end, like_object_item)) {
            return MakeObject(begin, end);
        } else {
            return MakeArray(begin, end);
        }
    }
    static JValue FromInit(const InitValue &ji)
    {
        if (ji.is_value()) {
            return std::move(ji.value());
        } else {
            return FromInit(ji.list().begin(), ji.list().end());
        }
    }
public:
    struct DumpOptions {
        int indent_step_ = 0;
        int indent_level_ = 0;
        bool escape_unicode_ = false;
    };
    JValue() :
        data_(object_type()) { static_assert(sizeof(JValue) == sizeof(data_type), "JValue should simple wrap data_type with no other fields."); }
#define SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \
    JValue(decl) : data_(expr) {}                      \
    JValue &operator=(decl)                            \
    {                                                  \
        data_ = (expr);                                \
        return *this;                                  \
    }
    // operator=() seems faster than JValue(exer).swap(data_).
    // copy
    SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(const JValue &v, v.data_) // self;
#define MAP_TYPE_TO_JSON_TYPE(decl, expr)          \
public:                                            \
    SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(decl, expr) \
private:                                           \
    static auto JVTypeAccept(decl)->std::remove_reference<decltype(expr)>::type
#define MAP_TYPE_TO_JSON_TYPE_BIDIR(decl, expr) \
    MAP_TYPE_TO_JSON_TYPE(decl, expr);          \
                                                \
private:                                        \
    static auto JVTypeReturn(decl)->std::remove_reference<decltype(expr)>::type
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::nullptr_t, nullptr);
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const short v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned short v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const int v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned int v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const long v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const long long v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const unsigned long long v, int_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const float v, float_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const double v, float_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const long double v, float_type(v));
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const bool v, v);
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const std::string &v, v);
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const object_type &v, v);
    MAP_TYPE_TO_JSON_TYPE_BIDIR(const array_type &v, v);
    MAP_TYPE_TO_JSON_TYPE(const char *v, std::string(v));
#undef MAP_TYPE_TO_JSON_TYPE_BIDIR
#undef MAP_TYPE_TO_JSON_TYPE
    template <class T>
    static void JVTypeAccept(T); // use void to disable other types.
    template <class T>
    static void JVTypeReturn(T); // use void to disable other types.
public:
    // move
    SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(JValue &&v, std::move(v.data_)) // self
    SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(std::string &&v, std::move(v))  // and other classes
    SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(object_type &&v, std::move(v))
    SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT(array_type &&v, std::move(v))
#undef SUPPORT_CONSTRUCTOR_AND_ASSIGNMENT
    static JValue Object()
    {
        return (object_type());
    }
    static JValue Object(InitList l) { return (MakeObject(l.begin(), l.end())); }
    static JValue Array() { return (array_type()); }
    static JValue Array(InitList l) { return (MakeArray(l.begin(), l.end())); }
    JValue(InitList l) :
        JValue(FromInit(l.begin(), l.end())) {}
    JValue(const InitValue &ji) :
        JValue(FromInit(ji)) {}
    JVType type() const { return static_cast<JVType>(data_.which()); }
    std::string dump(const DumpOptions &opt_in) const;
    std::string dump(const int indent_step = 0, const int indent_level = 0, const bool escape_unicode = false) const
    {
        DumpOptions opt;
        opt.indent_step_ = indent_step;
        opt.indent_level_ = indent_level;
        opt.escape_unicode_ = escape_unicode;
        return dump(opt);
    }
    bool parse(const std::string &s) { return parse(s.c_str(), s.c_str() + s.size()) > 0; }
    int parse(const char *begin, const char *end);
    void swap(JValue &a) { data_.swap(a.data_); }
    template <class T>
    T get_value() const { return val<decltype(JVTypeReturn(T()))>(); }
    template <class T>
    void put_value(T &&v) { *this = v; }
    const object_type &object() const { return val<object_type>(); }
    object_type &object() { return val<object_type>(); }
    const array_type &array() const { return val<array_type>(); }
    array_type &array() { return val<array_type>(); }
    bool is_object() const { return type() == jv_object; }
    bool is_array() const { return type() == jv_array; }
    // array ops
    const value_type &operator[](const size_t idx) const { return array()[idx]; }
    value_type &operator[](const size_t idx) { return array()[idx]; }
    const value_type &at(const size_t idx) const { return array().at(idx); }
    value_type &at(const size_t idx) { return array().at(idx); }
    void push_back(const value_type &v) { array().push_back(v); }
    void push_back(value_type &&v) { array().push_back(std::move(v)); }
    // object ops
    value_type &operator[](const key_type &key) { return object()[key]; }
    const value_type &at(const key_type &key) const { return object().at(key); }
    value_type &at(const key_type &key) { return object().at(key); }
    const value_type &child(const key_type &path) const { return ref_child(path); }
    value_type &child(const key_type &path) { return const_cast<value_type &>(ref_child(path)); }
    template <class T>
    T get(const key_type &path) const { return child(path).val<decltype(JVTypeReturn(T()))>(); }
    template <class T>
    auto get(const key_type &path, const T &def) const -> typename std::remove_reference<decltype(JVTypeAccept(def))>::type
    {
        try {
            return get<decltype(JVTypeAccept(def))>(path);
        } catch (...) {
            return def;
        }
    }
    value_type &put(pair_type &&elem)
    {
        const key_type &key = elem.first;
        auto sep = key.find('.');
        if (sep == key_type::npos) {
            auto r = object().lower_bound(elem.first);
            if (r != object().end() && r->first == elem.first) {
                r->second = std::move(elem.second);
                return r->second;
            } else {
                return object().emplace_hint(r, std::move(elem))->second;
            }
        } else {
            return (*this)[key.substr(0, sep)].put(pair_type(key.substr(sep + 1), std::move(elem.second)));
        }
    }
    value_type &put(const pair_type &elem)
    {
        pair_type tmp(elem);
        return put(std::move(tmp));
    }
    value_type &put(const key_type &k, const value_type &v) { return put(pair_type(k, v)); }
    value_type &put(const key_type &k, value_type &&v) { return put(pair_type(k, std::move(v))); }
    value_type &put(key_type &&k, const value_type &v) { return put(pair_type(std::move(k), v)); }
    value_type &put(key_type &&k, value_type &&v) { return put(pair_type(std::move(k), std::move(v))); }
private:
    template <class C>
    const C &ref_val() const
    {
        try {
            return boost::get<C>(data_);
        } catch (std::exception &) {
            throw std::runtime_error("json get error, " + name() + " is not a " + JValue(C()).name());
        }
    }
    const value_type &ref_child(const key_type &path) const
    {
        auto sep = path.find('.');
        if (sep == key_type::npos) {
            return at(path);
        } else {
            return at(path.substr(0, sep)).ref_child(path.substr(sep + 1));
        }
    }
    data_type data_;
};
typedef JValue Json;
} // namespace ssjson
#endif // end of include guard: JSON_B360EKF1
box/node_center.cpp
@@ -350,44 +350,48 @@
        return MakeReply(eSuccess);
    });
}
MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id)
{
    typedef MsgQueryProcReply Reply;
    auto query = [&](Node self) -> Reply {
        auto Add1 = [](Reply &reply, Node node) {
            auto info = reply.add_proc_list();
            *info->mutable_proc() = node->proc_;
            info->set_online(node->state_.flag_ == kStateNormal);
            for (auto &addr_topics : node->services_) {
                for (auto &topic : addr_topics.second) {
                    info->mutable_topics()->add_topic_list(topic);
                }
    auto Add1 = [](Reply &reply, Node node) {
        auto info = reply.add_proc_list();
        *info->mutable_proc() = node->proc_;
        info->mutable_proc()->clear_private_info();
        info->set_online(node->state_.flag_ == kStateNormal);
        for (auto &addr_topics : node->services_) {
            for (auto &topic : addr_topics.second) {
                info->mutable_topics()->add_topic_list(topic);
            }
        };
        if (!req.proc_id().empty()) {
            auto pos = online_node_addr_map_.find(req.proc_id());
            if (pos == online_node_addr_map_.end()) {
                return MakeReply<Reply>(eNotFound, "proc not found.");
            } else {
                auto node_pos = nodes_.find(pos->second);
                if (node_pos == nodes_.end()) {
                    return MakeReply<Reply>(eNotFound, "proc node not found.");
                } else {
                    auto reply = MakeReply<Reply>(eSuccess);
                    Add1(reply, node_pos->second);
                    return reply;
                }
            }
        } else {
            Reply reply(MakeReply<Reply>(eSuccess));
            for (auto &kv : nodes_) {
                Add1(reply, kv.second);
            }
            return reply;
        }
    };
    if (!proc_id.empty()) {
        auto pos = online_node_addr_map_.find(proc_id);
        if (pos == online_node_addr_map_.end()) {
            return MakeReply<Reply>(eNotFound, "proc not found.");
        } else {
            auto node_pos = nodes_.find(pos->second);
            if (node_pos == nodes_.end()) {
                return MakeReply<Reply>(eNotFound, "proc node not found.");
            } else {
                auto reply = MakeReply<Reply>(eSuccess);
                Add1(reply, node_pos->second);
                return reply;
            }
        }
    } else {
        Reply reply(MakeReply<Reply>(eSuccess));
        for (auto &kv : nodes_) {
            Add1(reply, kv.second);
        }
        return reply;
    }
}
MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
{
    typedef MsgQueryProcReply Reply;
    auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); };
    return HandleMsg<Reply>(head, query);
}
box/node_center.h
@@ -159,6 +159,7 @@
    MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
    MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg);
    MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req);
    MsgQueryProcReply QueryProc(const std::string &proc_id);
    MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
    MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
    MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
utest/api_test.cpp
@@ -169,6 +169,20 @@
        // printf("register topic : %s\n", r ? "ok" : "failed");
        // Sleep(1s);
    }
    auto PrintProcs = [](MsgQueryProcReply const &result) {
        printf("query proc result: %d\n", result.proc_list().size());
        for (int i = 0; i < result.proc_list().size(); ++i) {
            auto &info = result.proc_list(i);
            printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
                   (info.online() ? "online" : "offline"),
                   info.proc().proc_id().c_str(), info.proc().name().c_str());
            for (auto &t : info.topics().topic_list()) {
                printf("\t\t %s\n", t.c_str());
            }
            printf("\n");
        }
        printf("\n");
    };
    {
        // query procs
        std::string dest(BHAddress().SerializeAsString());
@@ -180,22 +194,45 @@
        DEFER1(BHFree(reply, reply_len));
        MsgQueryProcReply result;
        if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) {
            printf("query proc result: %d\n", result.proc_list().size());
            for (int i = 0; i < result.proc_list().size(); ++i) {
                auto &info = result.proc_list(i);
                printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
                       (info.online() ? "online" : "offline"),
                       info.proc().proc_id().c_str(), info.proc().name().c_str());
                for (auto &t : info.topics().topic_list()) {
                    printf("\t\t %s", t.c_str());
                }
            }
            PrintProcs(result);
        } else {
            printf("query proc error\n");
        }
        // printf("register topic : %s\n", r ? "ok" : "failed");
        // Sleep(1s);
    }
    {
        // query procs with normal topic request
        MsgRequestTopic req;
        req.set_topic("@center_query_procs");
        std::string s(req.SerializeAsString());
        // Sleep(10ms, false);
        std::string dest(BHAddress().SerializeAsString());
        void *proc_id = 0;
        int proc_id_len = 0;
        DEFER1(BHFree(proc_id, proc_id_len););
        void *reply = 0;
        int reply_len = 0;
        DEFER1(BHFree(reply, reply_len));
        bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100);
        if (!r) {
            int ec = 0;
            std::string msg;
            GetLastError(ec, msg);
            printf("topic query proc error: %s\n", msg.c_str());
        } else {
            MsgRequestTopicReply ret;
            ret.ParseFromArray(reply, reply_len);
            printf("topic query proc : %s\n", ret.data().c_str());
            // MsgQueryProcReply result;
            // if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) {
            //     PrintProcs(result);
            // } else {
            //     printf("topic query proc error\n");
            // }
        }
    }
    // return;
    { // Subscribe
        MsgTopicList topics;
@@ -335,9 +372,9 @@
    threads.Launch(hb, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const int64_t nreq = 1000 * 100;
    const int64_t nreq = 1; //000 * 100;
    for (int i = 0; i < 100; ++i) {
    for (int i = 0; i < 10; ++i) {
        SyncRequest(i);
    }