zhangmeng
2022-12-12 1d3df8349f8ec3d09efa8f41ec8acc706979f40f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
 
#include <vector>
#include <string>
#include <thread>
#include <memory>
using namespace std;
 
#include "cbhomeclient.h"
#include "message.h"
 
// #include "3dparty/bus_nng/bn_api.h"
#include "bhome_msg_api.pb.h"
using namespace bhome_msg;
 
static cproc* make_proc(const char* name, const char* id){
    cproc* pinfo = (cproc*)calloc(1,sizeof(cproc));
    auto assign = [](char** d, size_t* l, const char* tmp){
        *l = strlen(tmp);
        *d = (char*)malloc(*l);
        memcpy(*d, tmp, *l);
    };
    assign(&pinfo->name.str, &pinfo->name.size, name);
    assign(&pinfo->id.str, &pinfo->id.size, id);
 
    return pinfo;
}
 
static unique_ptr<thread> pub(const vector<string>& topics){
 
    auto ptr = unique_ptr<thread>(new thread([&]{
        creg reg;
        memset(&reg, 0, sizeof(reg));
        reg.pinfo = make_proc("pub", "pubid");
        reg.topic_pub = cstr_arr_new(topics.size());
        size_t i = 0;
        for(; i < topics.size(); i++){
            cstr_arr_add(&reg.topic_pub, topics.at(0).data(), topics.at(0).size(), i);
        }
        void* handle = bus_client_init(NULL, 0, &reg);
 
        size_t count = 0;
        string base_msg("test_pub_sub==");
        this_thread::sleep_for(chrono::seconds(3));
        while (true) {
            for(auto && i : topics){
                auto msg = base_msg + "test_ps pub message "+i+"-->msg-"+to_string(count++);
                MsgPublish pbmsg;
                pbmsg.set_topic(i);
                pbmsg.set_data(msg);
                auto data = pbmsg.SerializeAsString();
                // TestPub(i.c_str(), i.length(), data.c_str(), data.length());
                int pubres = bus_client_pubmsg(handle, (void*)data.data(), data.size());
                printf("======>> bus_client_pubmsg [%s]\n", msg.c_str());
 
                this_thread::sleep_for(chrono::seconds{2});
            }
        }
    }));
    ptr->detach();
    return ptr;
}
 
int main(int argc, char const *argv[])
{
    vector<string> topics{
        "cbhomeclient_test_pubsub"
    };
    auto p = pub(topics);
    // while (true) this_thread::sleep_for(chrono::seconds{1});
 
    creg reg;
    memset(&reg, 0, sizeof(reg));
    reg.pinfo = make_proc("sub", "subid");
 
    reg.topic_sub = cstr_arr_new(topics.size());
    size_t i = 0;
    for(; i < topics.size(); i++){
        cstr_arr_add(&reg.topic_sub, topics.at(0).data(), topics.at(0).size(), i);
    }
 
    void* handle = bus_client_init(NULL, 0, &reg);
 
    while (true) {
        auto msg = bus_client_get_submsg(handle);
        printf("msg topic [%s] data [%s]\n", msg->topic.str, msg->msg.str);
    }
 
    bus_client_free(handle);
    return 0;
}