zhangmeng
2022-12-12 ecf23f882ca1b8aaf0863980fc4781c515da1695
main.cpp
@@ -26,48 +26,40 @@
    return pinfo;
}
static unique_ptr<thread> pub(const vector<string>& topics){
template <class F> void ignoref(F&& f){}
    auto ptr = unique_ptr<thread>(new thread([&]{
        creg reg;
        memset(&reg, 0, sizeof(reg));
        reg.pinfo = make_proc("pub", "pubid");
        reg.topic_pub = cstr_arr_new(topics.size());
        size_t i = 0;
        for(; i < topics.size(); i++){
            cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
static void pub(const vector<string>& topics){
    ignoref(pub);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("pub", "pubid");
    reg.topic_pub = cstr_arr_new(topics.size());
    size_t i = 0;
    for(; i < topics.size(); i++){
        cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
    }
    void* handle = bus_client_init(NULL, 0, &reg);
    size_t count = 0;
    string base_msg("test_pub_sub==");
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        for(auto && i : topics){
            auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
            MsgPublish pbmsg;
            pbmsg.set_topic(i);
            pbmsg.set_data(msg);
            auto data = pbmsg.SerializeAsString();
            // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
            int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
            printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
            this_thread::sleep_for(chrono::seconds{2});
        }
        void* handle = bus_client_init(NULL, 0, &reg);
        size_t count = 0;
        string base_msg("test_pub_sub==");
        this_thread::sleep_for(chrono::seconds(3));
        while (true) {
            for(auto && i : topics){
                auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
                MsgPublish pbmsg;
                pbmsg.set_topic(i);
                pbmsg.set_data(msg);
                auto data = pbmsg.SerializeAsString();
                // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
                int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
                printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
                this_thread::sleep_for(chrono::seconds{2});
            }
        }
    }));
    ptr->detach();
    return ptr;
    }
}
int main(int argc, char const *argv[])
{
    vector<string> topics{
        "cbhomeclient_test_pubsub"
    };
    auto p = pub(topics);
    // while (true) this_thread::sleep_for(chrono::seconds{1});
static void sub(const vector<string>& topics){
    ignoref(sub);
    creg reg;
    memset(&reg, 0, sizeof(reg));
@@ -83,9 +75,81 @@
    while (true) {
        auto msg = bus_client_get_submsg(handle);
        printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
        printf("SUB msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
    }
    bus_client_free(handle);
}
static void req(const char* topic){
    ignoref(req);
    string strtpc(topic);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("request", "requestid");
    // reg.channel = cstr_arr_new(1);
    // size_t i = 0;
    // for(; i < 1; i++){
    //     cstr_arr_add(&reg.topic_pub, topic, strlen(topic), i);
    // }
    void* handle = bus_client_init(NULL, 0, &reg);
    size_t count = 0;
    string base_msg("test_request==");
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        auto msg = base_msg + "request message -> msg-"+to_string(count++);
        auto reqmsg = make_req_msg(strtpc.data(), strtpc.size(), msg.data(), msg.size());
        crepmsg* repmsg = NULL;
        if (bus_client_request(handle, reqmsg, &repmsg)){
            printf("======>> bus_client_reqest [%s] get [%s]\n", msg.c_str(), repmsg->data.str);
        }
        free_reqmsg(reqmsg);
        free_reply_msg(repmsg);
        this_thread::sleep_for(chrono::seconds{2});
    }
}
static void reply(const char* topic){
    ignoref(reply);
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("reply", "replyid");
    reg.channel = cstr_arr_new(1);
    size_t i = 0;
    for(; i < 1; i++){
        cstr_arr_add(&reg.channel, topic, strlen(topic), i);
    }
    void* handle = bus_client_init(NULL, 0, &reg);
    size_t count = 0;
    this_thread::sleep_for(chrono::seconds(3));
    while (true) {
        void* src = NULL;
        auto msg = bus_client_get_reqmsg(handle, &src);
        auto repmsg = make_reply_msg(0, NULL, 0, "recv request", 12);
        bus_client_reply_msg(handle, src, repmsg);
        free_reply_msg(repmsg);
        printf("REPREQ msg [%s] \n", msg->msg.str);
        free_reqmsg(msg);
        this_thread::sleep_for(chrono::seconds{2});
    }
}
int main(int argc, char const *argv[])
{
    vector<string> topics{
        "cbhomeclient_test_pubsub"
    };
    thread([&]{ pub(topics); }).detach();
    thread([&]{ sub(topics); }).detach();
    // sub(topics);
    printf("start RR\n");
    const char* rrtopic = "cbhomeclient_req_rep";
    thread([&]{ req(rrtopic); }).detach();
    reply(rrtopic);
    return 0;
}