zhangmeng
2022-12-26 4d938d149e782a6e2fed668eec4e1a023df9f35c
main.cpp
@@ -13,52 +13,114 @@
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
static cproc* make_proc(const char* name, const char* id){
    cproc* pinfo = (cproc*)calloc(1,sizeof(cproc));
    auto assign = [](char** d, size_t* l, const char* tmp){
        *l = strlen(tmp);
        *d = (char*)malloc(*l);
        memcpy(*d, tmp, *l);
    };
    assign(&pinfo->name.str, &pinfo->name.size, name);
    assign(&pinfo->id.str, &pinfo->id.size, id);
template <class F> void ignoref(F&& f){}
    return pinfo;
static void pub(const vector<string>& topics){
    ignoref(pub);
    vector<const char*> tpc;
    for(auto& t : topics) tpc.push_back(t.c_str());
    creg* reg = make_creg(make_cproc("pub", "pubid"),
        NULL, 0, &tpc[0], tpc.size(), NULL, 0, NULL, 0);
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    size_t count = 0;
    string base_msg("test_pub_sub==");
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        for(auto && i : topics){
            auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
            // MsgPublish pbmsg;
            // pbmsg.set_topic(i);
            // pbmsg.set_data(msg);
            // auto data = pbmsg.SerializeAsString();
            // int ret = bus_client_pubmsg(handle, data.data(), data.size());
            int ret = bus_client_publish(handle, i.data(), i.size(), msg.data(), msg.size());
            printf("======>> bus_client_pubmsg [%s] ret %d\n", msg.c_str(), ret);
            this_thread::sleep_for(chrono::seconds{1});
        }
    }
}
static unique_ptr<thread> pub(const vector<string>& topics){
static void sub(const vector<string>& topics){
    ignoref(sub);
    auto ptr = unique_ptr<thread>(new thread([&]{
        creg reg;
        memset(&reg, 0, sizeof(reg));
        reg.pinfo = make_proc("pub", "pubid");
        reg.topic_pub = cstr_arr_new(topics.size());
        size_t i = 0;
        for(; i < topics.size(); i++){
            cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
    vector<const char*> tpc;
    for(auto& t : topics) tpc.push_back(t.c_str());
    creg* reg = make_creg(make_cproc("sub", "subid"),
        NULL, 0, NULL, 0, &tpc[0], tpc.size(), NULL, 0);
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    while (true) {
        auto msg = bus_client_get_submsg(handle);
        printf("SUB msg topic [%s] data [%s]\n", msg->topic, msg->msg);
        free_submsg(msg);
    }
    bus_client_free(handle);
}
static void req(const char* topic){
    ignoref(req);
    const auto topicl = strlen(topic);
    creg* reg = make_creg_from_cproc(make_cproc("request", "requestid"));
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    size_t count = 0;
    string base_msg("test_request==request message -> msg-");
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        auto msg = base_msg + to_string(count++);
        auto reqmsg = make_req_msg(topic, topicl, msg.data(), msg.size());
        crepmsg* repmsg = NULL;
        if (bus_client_request(handle, reqmsg, &repmsg)){
            printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data);
        }
        void* handle = bus_client_init(NULL, 0, &reg);
        free_reqmsg(reqmsg);
        free_reply_msg(repmsg);
        this_thread::sleep_for(chrono::seconds{2});
    }
}
        size_t count = 0;
        string base_msg("test_pub_sub==");
        this_thread::sleep_for(chrono::seconds(3));
        while (true) {
            for(auto && i : topics){
                auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
                MsgPublish pbmsg;
                pbmsg.set_topic(i);
                pbmsg.set_data(msg);
                auto data = pbmsg.SerializeAsString();
                // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
                int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
                printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
static void reply(const char* topic){
    ignoref(reply);
                this_thread::sleep_for(chrono::seconds{2});
            }
        }
    }));
    ptr->detach();
    return ptr;
    const auto topicl = strlen(topic);
    vector<const char*> tpc{topic};
    // creg* reg = make_creg(make_cproc("reply", "replyid"),
    //     &tpc[0], tpc.size(), &tpc[0], tpc.size(), NULL, 0, NULL, 0);
    creg* reg = make_creg_from_cproc(make_cproc("reply", "replyid"));
    creg_add_topic_reply(reg, tpc.data(), tpc.size());
    creg_add_topic_pub(reg, tpc.data(), tpc.size());
    void* handle = bus_client_init(NULL, 0, reg);
    creg_free(reg);
    size_t count = 0;
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        void* src = NULL;
        auto msg = bus_client_get_reqmsg(handle, &src);
        auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
        bus_client_reply_msg(handle, src, repmsg);
        free_reply_msg(repmsg);
        printf("REPREQ msg [%s] \n", msg->msg);
        free_reqmsg(msg);
        // this_thread::sleep_for(chrono::seconds{2});
    }
}
int main(int argc, char const *argv[])
@@ -66,26 +128,14 @@
    vector<string> topics{
        "cbhomeclient_test_pubsub"
    };
    auto p = pub(topics);
    // while (true) this_thread::sleep_for(chrono::seconds{1});
    thread([&]{ pub(topics); }).detach();
    thread([&]{ sub(topics); }).detach();
    // sub(topics);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("sub", "subid");
    printf("start RR\n");
    const char* rrtopic = "cbhomeclient_req_rep";
    thread([&]{ req(rrtopic); }).detach();
    reply(rrtopic);
    reg.topic_sub = cstr_arr_new(topics.size());
    size_t i = 0;
    for(; i < topics.size(); i++){
        cstr_arr_add(&reg.topic_sub, topics.at(0).data(), topics.at(0).size(), i);
    }
    void* handle = bus_client_init(NULL, 0, &reg);
    while (true) {
        auto msg = bus_client_get_submsg(handle);
        printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
    }
    bus_client_free(handle);
    return 0;
}
}