zhangmeng
2023-03-01 777333ff834744ac5665fa9abe5ec6373d25cda8
cbhomeclient.cpp
@@ -10,6 +10,7 @@
#include "fixed_q.h"
#include "3rdparty/bus_nng/interface_bus_api.h"
#include "3rdparty/bus_nng/bn_api.h"
#include "bhome_msg.pb.h"
@@ -82,7 +83,7 @@
Msg to_bus(client* cli, F&& f, Args&&... args){
    Msg mesg;
    if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
        mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))));
        mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)));
    return mesg;
}
@@ -96,7 +97,7 @@
        tie(d, s) = tp;
        if (m.ParseFromArray(d, s)) {
            bus_free(d, s);
            msg = std::move(make_tuple(true, std::move(m)));
            msg = make_tuple(true, std::move(m));
            break;
        }
        m.Clear();
@@ -110,8 +111,8 @@
template <size_t... Is, class F, class... Args>
MsgCR to_center(client* cli, F&& f, Args&&... args){
    MsgCR msg(dummy());
    auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
    if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
    auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...);
    if (!vmsg.empty()) msg = parse(cli, vmsg.at(0));
    return msg;
}
@@ -124,8 +125,8 @@
    const auto& tpc = tlist.SerializeAsString();
    void* replymsg = NULL;
    int replysize = 0;
    msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
        tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
    msg = to_center<2,3>(cli, std::forward<F>(f),
        tpc.data(), tpc.size(), &replymsg, &replysize, sndto);
    return msg;
}
@@ -248,11 +249,7 @@
    delete cli;
}
struct csubmsg* bus_client_get_submsg(void* handle){
    client* cli = ptr(handle);
    Msg msg = std::move(cli->sub_q->pop());
    if (msg.empty()) return NULL;
static struct csubmsg* parse_submsg(const Msg& msg){
    void* procid = NULL, *data = NULL;
    int pids = 0, size = 0;
@@ -265,12 +262,23 @@
    return pmsg;
}
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){
    client* cli = ptr(handle);
    Msg msg = std::move(cli->readreq_q->pop());
    Msg msg = cli->sub_q->pop(ms);
    if (msg.empty()) return NULL;
    return parse_submsg(msg);
}
struct csubmsg* bus_client_get_submsg(void* handle){
    client* cli = ptr(handle);
    Msg msg = cli->sub_q->pop();
    if (msg.empty()) return NULL;
    return parse_submsg(msg);
}
static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){
    void* procid = NULL, *data = NULL;
    int pids = 0, size = 0;
@@ -284,14 +292,28 @@
    return pmsg;
}
struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){
    client* cli = ptr(handle);
    Msg msg = cli->readreq_q->pop(ms);
    if (msg.empty()) return NULL;
    return parse_reqmsg(msg, src);
}
struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
    client* cli = ptr(handle);
    Msg msg = cli->readreq_q->pop();
    if (msg.empty()) return NULL;
    return parse_reqmsg(msg, src);
}
int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
    void* procid = NULL, *reply = NULL;
    int pids = 0, replys = 0;
    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
    auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
                    msg->msg, msg->msgl, &procid, &pids,
                    &reply, &replys, sndto));
                    &reply, &replys, sndto);
    if (!vmsg.empty()){
        void* procid = NULL, *data = NULL;
        int pids = 0, size = 0;
@@ -311,7 +333,7 @@
    return false;
}
int bus_client_reply_msg(void* handle, void* src, const struct crepmsg* msg){
int bus_client_reply(void* handle, void* src, const struct crepmsg* msg){
    MsgRequestTopicReply msgR;
    auto err = msgR.mutable_errmsg();
@@ -334,7 +356,15 @@
    void* rep = NULL;
    int repl = 0;
    auto msg = to_bus<4,5>(ptr(handle), bus_query_procs, pbaddr.data(), pbaddr.size(),
        pbtopic.data(), pbtopic.size(), &rep, &repl, sndto);
       pbtopic.data(), pbtopic.size(), &rep, &repl, sndto);
    if (msg.empty()) return NULL;
    // bug 已修复, 在 bus_nng 中
    // bus_query_procs 获取数据失败, 暂时改用BHQueryProcs
    // if (!BHQueryProcs(pbaddr.data(), pbaddr.size(), pbtopic.data(), pbtopic.size(), &rep, &repl, sndto)) {
    //     return NULL;
    // }
    MsgQueryProcReply msgR;
    msgR.ParseFromArray(rep, repl);
@@ -349,6 +379,7 @@
        memcpy(id, p.proc().proc_id().data(), idl);
        procs[i] = cqueryprocs{ .id = id, .idl = idl, .online = p.online() };
    }
    return procs;
}
@@ -358,11 +389,11 @@
    pbmsg.set_topic(topic, topicl);
    pbmsg.set_data(data, size);
    auto pbstr = pbmsg.SerializeAsString();
    return bus_client_pubmsg(handle, pbstr.data(), pbstr.size());
    return bus_client_publish_msg(handle, pbstr.data(), pbstr.size());
}
// test
int bus_client_pubmsg(void* handle, const char* data, const size_t size){
int bus_client_publish_msg(void* handle, const char* data, const size_t size){
    client* cli = ptr(handle);
    return bus_publish(cli->bus, data, size, 100);
}