lichao
2021-04-13 153375e3b152768cbffce715d049499945834c29
add api code, use proto msg as data.
3个文件已添加
8个文件已修改
544 ■■■■ 已修改文件
.vscode/launch.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/settings.json 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 263 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json
@@ -11,7 +11,7 @@
            "program": "${workspaceFolder}/debug/bin/utest",
            "args": [
                "-t",
                "HeartbeatTest"
                "ApiTest"
            ],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
.vscode/settings.json
@@ -67,5 +67,8 @@
        "build/": true,
        "debug/": true
    },
    "cmake.configureOnOpen": false
    "cmake.configureOnOpen": false,
    "C_Cpp.default.includePath": [
        "build/proto"
    ]
}
src/bh_api.cpp
New file
@@ -0,0 +1,263 @@
#include "bh_api.h"
#include "defs.h"
#include "topic_node.h"
#include <memory>
using namespace bhome_shm;
using namespace bhome_msg;
namespace
{
TopicNode &ProcNode()
{
    static TopicNode node(BHomeShm());
    return node;
}
class TmpPtr : private boost::noncopyable
{
    void *ptr_ = 0;
    size_t size_ = 0;
public:
    explicit TmpPtr(const size_t size) :
        ptr_(malloc(size)), size_(size) {}
    explicit TmpPtr(const std::string &str) :
        TmpPtr(str.size())
    {
        if (ptr_) {
            memcpy(ptr_, str.data(), str.size());
        }
    }
    ~TmpPtr() { free(ptr_); }
    void *get() const { return ptr_; }
    void *release()
    {
        void *tmp = ptr_;
        ptr_ = 0;
        return tmp;
    }
    size_t size() const { return size_; }
    operator bool() const { return ptr_; }
};
template <class Msg>
bool PackOutput(const Msg &msg, void **out, int *out_len)
{
    auto size = msg.ByteSizeLong();
    TmpPtr p(size);
    if (!p) {
        SetLastError(ENOMEM, "not enough memory.");
        return false;
    }
    msg.SerializePartialToArray(p.get(), size);
    *out = p.release();
    *out_len = size;
    return true;
}
} // namespace
bool BHRegister(const void *proc_info,
                const int proc_info_len,
                void **reply,
                int *reply_len,
                const int timeout_ms)
{
    ProcInfo pi;
    if (!pi.ParseFromArray(proc_info, proc_info_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    MsgCommonReply msg_reply;
    if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
        return PackOutput(msg_reply, reply, reply_len);
    } else {
        return false;
    }
}
bool BHHeartBeatEasy(const int timeout_ms)
{
    return ProcNode().Heartbeat(timeout_ms);
}
bool BHHeartBeat(const void *proc_info,
                 const int proc_info_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms)
{
    ProcInfo pi;
    if (!pi.ParseFromArray(proc_info, proc_info_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    MsgCommonReply msg_reply;
    if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
        return PackOutput(msg_reply, reply, reply_len);
    } else {
        return false;
    }
}
bool BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms)
{
    MsgPublish pub;
    if (!pub.ParseFromArray(msgpub, msgpub_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    return ProcNode().Publish(pub, timeout_ms);
}
bool BHReadSub(void **proc_id,
               int *proc_id_len,
               void **msgpub,
               int *msgpub_len,
               const int timeout_ms)
{
    std::string proc;
    MsgPublish pub;
    if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
            *proc_id = pproc.release();
            *proc_id_len = pproc.size();
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
    }
    return false;
}
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
               void **reply,
               int *reply_len,
               const int timeout_ms)
{
    MsgRequestTopic req;
    if (!req.ParseFromArray(request, request_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    std::string proc;
    MsgRequestTopicReply out_msg;
    if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(out_msg, reply, reply_len)) {
            *proc_id = pproc.release();
            *proc_id_len = pproc.size();
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
    }
    return false;
}
bool BHReadRequest(void **proc_id,
                   int *proc_id_len,
                   void **request,
                   int *request_len,
                   void **src,
                   const int timeout_ms)
{
    void *src_info = 0;
    std::string proc;
    MsgRequestTopic out_msg;
    if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(out_msg, request, request_len)) {
            *proc_id = pproc.release();
            *proc_id_len = pproc.size();
            *src = src_info;
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
    }
    return false;
}
bool BHSendReply(void *src,
                 const void *reply,
                 const int reply_len,
                 const int timeout_ms)
{
    MsgRequestTopicReply rep;
    if (!rep.ParseFromArray(reply, reply_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    return ProcNode().ServerSendReply(src, rep, timeout_ms);
}
int BHCleanUp()
{
    return 0;
}
namespace
{
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
{
    TopicNode::ServerCB on_req;
    TopicNode::SubDataCB on_sub;
    if (server_cb) {
        on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
            std::string sreq(request.SerializeAsString());
            bool r = false;
            ServerSender sender = [&](const void *p, const int len) {
                r = reply.ParseFromArray(p, len);
                return r;
            };
            server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
            return r;
        };
    }
    if (sub_cb) {
        on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) {
            std::string s(pub.SerializeAsString());
            sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
        };
    }
    ProcNode().Start(on_req, on_sub);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len)
{
    auto &sender = *(const ServerSender *) (tag);
    return sender(data, data_len);
}
void BHFree(void *data, int size)
{
    free(data);
}
int BHGetLastError(void **msg, int *msg_len)
{
    int ec = 0;
    if (msg && msg_len) {
        std::string err_msg;
        GetLastError(ec, err_msg);
        TmpPtr p(err_msg);
        if (p) {
            *msg = p.release();
            *msg_len = p.size();
        }
    }
    return ec;
}
#undef BH_SOCKET_MEMF_CALL
src/bh_api.h
New file
@@ -0,0 +1,79 @@
#ifndef BH_API_WRAPPER_O81WKNXI
#define BH_API_WRAPPER_O81WKNXI
#ifdef __cplusplus
extern "C" {
#endif
struct BHSrcInfo;
struct BHServerCallbackTag;
bool BHRegister(const void *proc_info,
                const int proc_info_len,
                void **reply,
                int *reply_len,
                const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 const int proc_id_len,
                                 const void *data,
                                 const int data_len);
typedef void (*FServerCallback)(const void *proc_id,
                                const int proc_id_len,
                                const void *data,
                                const int data_len,
                                BHServerCallbackTag *tag);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb);
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len);
bool BHHeartBeatEasy(const int timeout_ms);
bool BHHeartBeat(const void *proc_info,
                 const int proc_info_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms);
bool BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms);
bool BHReadSub(const void *proc_id,
               const int proc_id_len,
               void **msgpub,
               int *msgpub_len,
               const int timeout_ms);
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
               void **reply,
               int *reply_len,
               const int timeout_ms);
bool BHReadRequest(void **proc_id,
                   int *proc_id_len,
                   void **request,
                   int *request_len,
                   BHSrcInfo **src,
                   const int timeout_ms);
bool BHSendReply(BHSrcInfo *src,
                 const void *reply,
                 const int reply_len,
                 const int timeout_ms);
// int BHCleanUp();
void BHFree(void *buf, int size);
int BHGetLastError(void **msg, int &msg_len);
#ifdef __cplusplus
}
#endif
#endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */
src/defs.cpp
@@ -23,8 +23,31 @@
const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
struct LastError {
    int ec_ = 0;
    std::string msg_;
};
LastError &LastErrorStore()
{
    thread_local LastError le;
    return le;
}
} // namespace
const MQId &BHTopicBusAddress() { return kBHTopicBus; }
const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
void SetLastError(const int ec, const std::string &msg)
{
    LastErrorStore().ec_ = ec;
    LastErrorStore().msg_ = msg;
}
void GetLastError(int &ec, std::string &msg)
{
    ec = LastErrorStore().ec_;
    msg = LastErrorStore().msg_;
}
src/defs.h
@@ -38,7 +38,8 @@
bhome_shm::SharedMemory &BHomeShm();
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
#endif // end of include guard: DEFS_KP8LKGD0
src/topic_node.cpp
@@ -39,7 +39,7 @@
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
    Start();
    SockNode().Start();
}
TopicNode::~TopicNode()
@@ -47,14 +47,15 @@
    Stop();
}
void TopicNode::Start()
void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb)
{
    SockNode().Start();
    SockClient().Start();
    SockServer().Start();
    ServerStart(server_cb, 1);
    SubscribeStartWorker(sub_cb, 1);
    // SockClient().Start();
}
void TopicNode::Stop()
{
    SockSub().Stop();
    SockServer().Stop();
    SockClient().Stop();
    SockNode().Stop();
@@ -132,10 +133,8 @@
    return r;
}
bool TopicNode::ServerStart(const OnRequest &rcb, int nworker)
bool TopicNode::ServerStart(const ServerCB &rcb, int nworker)
{
    //TODO check registered
    auto failed_q = std::make_shared<ServerFailedQ>();
    auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
@@ -144,10 +143,8 @@
        if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
            MsgRequestTopic req;
            if (imsg.ParseBody(req)) {
                std::string out;
                if (rcb(req.topic(), req.data(), out)) {
                    MsgRequestTopicReply reply_body;
                    reply_body.set_data(std::move(out));
                MsgRequestTopicReply reply_body;
                if (rcb(head.proc_id(), req, reply_body)) {
                    BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
                    for (int i = 0; i < head.route_size() - 1; ++i) {
@@ -173,17 +170,15 @@
    return rcb && sock.Start(onRecv, onIdle, nworker);
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
    auto &sock = SockServer();
    MsgI imsg;
    BHMsgHead head;
    if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
        MsgRequestTopic request;
        if (imsg.ParseBody(request)) {
            request.mutable_topic()->swap(topic);
            request.mutable_data()->swap(data);
            head.mutable_proc_id()->swap(proc_id);
            SrcInfo *p = new SrcInfo;
            p->route.assign(head.route().begin(), head.route().end());
            p->msg_id = head.msg_id();
@@ -194,7 +189,7 @@
    return false;
}
bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms)
{
    auto &sock = SockServer();
@@ -203,14 +198,10 @@
    if (!p || p->route.empty()) {
        return false;
    }
    MsgRequestTopicReply body;
    body.set_data(data);
    BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        head.add_route()->Swap(&p->route[i]);
    }
    return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
}
@@ -223,7 +214,7 @@
        if (head.type() == kMsgTypeRequestTopicReply) {
            MsgRequestTopicReply reply;
            if (imsg.ParseBody(reply)) {
                cb(reply.data());
                cb(head.proc_id(), reply);
            }
        }
    };
@@ -231,14 +222,11 @@
    return SockRequest().Start(onData, nworker);
}
bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        auto &sock = SockRequest();
        MsgRequestTopic req;
        req.set_topic(topic);
        req.set_data(data, size);
        BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
        AddRoute(head, sock.id());
@@ -247,7 +235,7 @@
                if (head.type() == kMsgTypeRequestTopicReply) {
                    MsgRequestTopicReply reply;
                    if (imsg.ParseBody(reply)) {
                        cb(reply.data());
                        cb(head.proc_id(), reply);
                    }
                }
            };
@@ -259,9 +247,10 @@
    try {
        BHAddress addr;
        if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
        if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) {
            return Call(addr.mq_id().data());
        } else {
            SetLastError(eNotFound, "remote not found.");
            return false;
        }
    } catch (...) {
@@ -269,37 +258,30 @@
    }
}
bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
    try {
        auto &sock = SockRequest();
        BHAddress addr;
        if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
            MsgRequestTopic req;
            req.set_topic(topic);
            req.set_data(data, size);
            BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
        if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
            AddRoute(head, sock.id());
            MsgI reply;
            DEFER1(reply.Release(shm_););
            MsgI reply_msg;
            DEFER1(reply_msg.Release(shm_););
            BHMsgHead reply_head;
            if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) {
                MsgRequestTopicReply dr;
                if (reply.ParseBody(dr)) {
                    dr.mutable_data()->swap(out);
            if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) {
                if (reply_msg.ParseBody(out_reply)) {
                    reply_head.mutable_proc_id()->swap(out_proc_id);
                    return true;
                } else {
                    printf("error parse reply.\n");
                }
            } else {
                printf("error recv data. line: %d\n", __LINE__);
            }
        } else {
            printf("error recv data. line: %d\n", __LINE__);
            SetLastError(eNotFound, "remote not found.");
        }
    } catch (...) {
        printf("error recv data. line: %d\n", __LINE__);
@@ -344,14 +326,10 @@
// publish
bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
    try {
        auto &sock = SockPub();
        MsgPublish pub;
        pub.set_topic(topic);
        pub.set_data(data, size);
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
        AddRoute(head, sock.id());
@@ -386,7 +364,7 @@
    }
}
bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker)
bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker)
{
    auto &sock = SockSub();
@@ -394,7 +372,7 @@
        if (head.type() == kMsgTypePublish) {
            MsgPublish pub;
            if (imsg.ParseBody(pub)) {
                tdcb(head.proc_id(), pub.topic(), pub.data());
                tdcb(head.proc_id(), pub);
            }
        } else {
            // ignored, or dropped
@@ -404,18 +382,16 @@
    return tdcb && sock.Start(AsyncRecvProc, nworker);
}
bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
    auto &sock = SockSub();
    MsgI msg;
    DEFER1(msg.Release(shm()););
    BHMsgHead head;
    //TODO error msg.
    if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
        MsgPublish pub;
        if (msg.ParseBody(pub)) {
            head.mutable_proc_id()->swap(proc_id);
            pub.mutable_topic()->swap(topic);
            pub.mutable_data()->swap(data);
            return true;
        }
    }
src/topic_node.h
@@ -34,45 +34,39 @@
    SharedMemory &shm() { return shm_; }
public:
    typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB;
    TopicNode(SharedMemory &shm);
    ~TopicNode();
    void Start();
    void Stop();
    // topic node
    bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool ServerStart(OnRequest const &cb, const int nworker = 2);
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB;
    bool ServerStart(ServerCB const &cb, const int nworker = 2);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms);
    // topic client
    typedef std::function<void(const std::string &data)> RequestResultCB;
    typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
    {
        return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
    {
        return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
    bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
    // publish
    bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
    bool Publish(const MsgPublish &pub, const int timeout_ms);
    // subscribe
    typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
    bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
    typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
    bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
    bool Subscribe(MsgTopicList &topics, const int timeout_ms);
    bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
    bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
    void Start(ServerCB const &server_cb, SubDataCB const &sub_cb);
    void Stop();
private:
    bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
utest/api_test.cpp
New file
@@ -0,0 +1,22 @@
/*
 * =====================================================================================
 *
 *       Filename:  api_test.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月13日 14时31分46秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "util.h"
BOOST_AUTO_TEST_CASE(ApiTest)
{
}
utest/speed_test.cpp
@@ -171,7 +171,7 @@
            }
            MsgI msg;
            BHMsgHead head;
            if (!cli.SyncRecv(msg, head, 1000)) {
            if (!cli.SyncRecv(msg, head, 100)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(msg.Release(shm));
@@ -192,7 +192,7 @@
        BHMsgHead req_head;
        while (!stop) {
            if (srv.SyncRecv(req, req_head, 100)) {
            if (srv.SyncRecv(req, req_head, 10)) {
                DEFER1(req.Release(shm));
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
utest/utest.cpp
@@ -124,7 +124,7 @@
        std::condition_variable cv;
        std::atomic<uint64_t> n(0);
        auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) {
        auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) {
            ++total_count;
            auto cur = Now();
@@ -149,8 +149,10 @@
        for (unsigned i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
            MsgPublish pub;
            pub.set_topic(topic);
            pub.set_data(data);
            bool r = provider.Publish(pub, timeout);
            if (!r) {
                static std::atomic<int> an(0);
                int n = ++an;
@@ -218,8 +220,8 @@
        std::atomic<int> count(0);
        std::string reply;
        auto onRecv = [&](const std::string &rep) {
            reply = rep;
        auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) {
            reply = msg.data();
            if (++count >= nreq) {
                printf("count: %d\n", count.load());
            }
@@ -227,7 +229,10 @@
        client.ClientStartWorker(onRecv, 2);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) {
            MsgRequestTopic req;
            req.set_topic(topic);
            req.set_data("data " + std::to_string(i));
            if (!client.ClientAsyncRequest(req, 1000)) {
                printf("client request failed\n");
                ++count;
            }
@@ -248,9 +253,9 @@
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        DemoNode server(name, shm);
        auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
        auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
            ++server_msg_count;
            reply = topic + ':' + data;
            reply.set_data(request.topic() + ':' + request.data());
            return true;
        };
        server.ServerStart(onData);