| | |
| | | }
|
| | |
|
| | | template <class F>
|
| | | MsgCR to_topic(client* cli, F&& f, const struct cstrarr topic){
|
| | | MsgCR to_topic(client* cli, F&& f, const struct cstrarr& topic){
|
| | | MsgCR msg(dummy());
|
| | | if (topic.arr && topic.count){
|
| | | MsgTopicList tlist;
|
| | |
| | | }
|
| | | }
|
| | |
|
| | | static void registered(client* cli, const creg* rinfo, const bool must_reg=true){
|
| | | static void registered(client* cli, const creg* rinfo){
|
| | |
|
| | | if (must_reg){
|
| | | ProcInfo pinfo;
|
| | | auto tmp = rinfo->pinfo;
|
| | | pinfo.set_name(tmp->name.str, tmp->name.size);
|
| | | pinfo.set_proc_id(tmp->id.str, tmp->id.size);
|
| | | const auto& reg = pinfo.SerializeAsString();
|
| | |
|
| | | while (!cli->thrd_quit.load(memory_order_acquire)) {
|
| | | void* replymsg = NULL;
|
| | | int replysize = 0;
|
| | | cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
|
| | | bus_free(replymsg, replysize);
|
| | | if (cli->bus) break;
|
| | | }
|
| | |
|
| | | // register success start read request thread
|
| | | cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
|
| | | ProcInfo pinfo;
|
| | | auto tmp = rinfo->pinfo;
|
| | | pinfo.set_name(tmp->name.str, tmp->name.size);
|
| | | pinfo.set_proc_id(tmp->id.str, tmp->id.size);
|
| | | const auto& reg = pinfo.SerializeAsString();
|
| | | while (!cli->thrd_quit.load(memory_order_acquire)) {
|
| | | void* replymsg = NULL;
|
| | | int replysize = 0;
|
| | | cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
|
| | | bus_free(replymsg, replysize);
|
| | | if (cli->bus) break;
|
| | | }
|
| | | // register success start read request thread
|
| | | cli->thrd_readreq.reset(new thread([cli]{ thread_readreq(cli); }));
|
| | |
|
| | | // request/reply和pub topic一起处理
|
| | | auto tmparr = cstr_arr_new(rinfo->channel.count + rinfo->topic_pub.count);
|
| | | auto addarr = [&tmparr](size_t& start, const struct cstrarr* arr){
|
| | | for(size_t i = 0; i < arr->count; i++){
|
| | | cstr_arr_add(&tmparr, arr->arr[i].str, arr->arr[i].size, start+i);
|
| | | }
|
| | | start += arr->count;
|
| | | // 只需要将字符串指针拷贝就行,不需创建字符串内存
|
| | | auto shallowMerge = [](const cstrarr& arr1, const cstrarr& arr2){
|
| | | auto tmp = cstr_arr_new(arr1.count + arr2.count);
|
| | | auto dst = tmp.arr;
|
| | | auto cp2dst = [&dst](const cstr* src, const size_t cnt){
|
| | | memcpy(dst, src, cnt * sizeof(cstr));
|
| | | dst += cnt;
|
| | | };
|
| | | cp2dst(arr1.arr, arr1.count);
|
| | | cp2dst(arr2.arr, arr2.count);
|
| | | return tmp;
|
| | | };
|
| | | size_t s = 0;
|
| | | addarr(s, &rinfo->channel);
|
| | | addarr(s, &rinfo->topic_pub);
|
| | | auto tmparr = shallowMerge(rinfo->channel, rinfo->topic_pub);
|
| | | auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
|
| | | cstr_arr_free(tmparr);
|
| | | // auto channelmsg = to_topic(cli, bus_register_topics, rinfo->channel);
|
| | | // auto pubmsg = to_topic(cli, bus_register_topics, rinfo->topic_pub);
|
| | | free(tmparr.arr);
|
| | |
|
| | | // if topic pub/sub[net] exist, register topics
|
| | | auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
|
| | | auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
|