| | |
| | | void* bus{nullptr};
|
| | | cproc* pinfo{nullptr};
|
| | | ~client(){
|
| | | free_proc_info(pinfo);
|
| | | internal_cproc_free(pinfo);
|
| | |
|
| | | thrd_quit.store(true, memory_order_acq_rel);
|
| | | if (thrd_sub) thrd_sub->join();
|
| | |
| | | if (sub_q) sub_q->clear(freeMsg);
|
| | | if (readreq_q) readreq_q->clear(freeMsg);
|
| | |
|
| | | if (bus) bus_cleanup(bus);
|
| | | bus_cleanup(bus);
|
| | | }
|
| | | };
|
| | |
|
| | |
| | | }
|
| | |
|
| | | template <class F>
|
| | | MsgCR to_topic(client* cli, F&& f, const struct cstrarr& topic){
|
| | | MsgCR to_topic(client* cli, F&& f, char** topic, const size_t count){
|
| | | MsgCR msg(dummy());
|
| | | if (topic.arr && topic.count){
|
| | | MsgTopicList tlist;
|
| | | for(size_t i = 0; i < topic.count; i++)
|
| | | tlist.add_topic_list(topic.arr[i].str, topic.arr[i].size);
|
| | | const auto& tpc = tlist.SerializeAsString();
|
| | | void* replymsg = NULL;
|
| | | int replysize = 0;
|
| | | msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
|
| | | tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
|
| | | }
|
| | | MsgTopicList tlist;
|
| | | for(size_t i = 0; i < count; i++)
|
| | | tlist.add_topic_list(topic[i]);
|
| | | const auto& tpc = tlist.SerializeAsString();
|
| | | void* replymsg = NULL;
|
| | | int replysize = 0;
|
| | | msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
|
| | | tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
|
| | | return msg;
|
| | | }
|
| | |
|
| | |
| | | static void registered(client* cli, const creg* rinfo){
|
| | |
|
| | | ProcInfo pinfo;
|
| | | auto tmp = rinfo->pinfo;
|
| | | pinfo.set_name(tmp->name.str, tmp->name.size);
|
| | | pinfo.set_proc_id(tmp->id.str, tmp->id.size);
|
| | | const auto& reg = pinfo.SerializeAsString();
|
| | | auto proc = creg_proc(rinfo);
|
| | | pinfo.set_name(cproc_name(proc));
|
| | | pinfo.set_proc_id(cproc_id(proc));
|
| | | const auto& pbproc = pinfo.SerializeAsString();
|
| | | while (!cli->thrd_quit.load(memory_order_acquire)) {
|
| | | void* replymsg = NULL;
|
| | | int replysize = 0;
|
| | | cli->bus = bus_register(reg.data(), reg.size(), &replymsg, &replysize, sndto);
|
| | | cli->bus = bus_register(pbproc.data(), pbproc.size(), &replymsg, &replysize, sndto);
|
| | | bus_free(replymsg, replysize);
|
| | | if (cli->bus) break;
|
| | | }
|
| | |
| | |
|
| | | // request/reply和pub topic一起处理
|
| | | // 只需要将字符串指针拷贝就行,不需创建字符串内存
|
| | | auto shallowMerge = [](const cstrarr& arr1, const cstrarr& arr2){
|
| | | auto tmp = cstr_arr_new(arr1.count + arr2.count);
|
| | | auto dst = tmp.arr;
|
| | | auto cp2dst = [&dst](const cstr* src, const size_t cnt){
|
| | | memcpy(dst, src, cnt * sizeof(cstr));
|
| | | auto shallowMerge = [](char** a1, const size_t c1, char** a2, const size_t c2){
|
| | | auto tmp = (char**)malloc((c1 + c2) * sizeof(char*));
|
| | | auto dst = tmp;
|
| | | auto cp2dst = [&dst](char** src, const size_t cnt){
|
| | | memcpy(dst, src, cnt * sizeof(char*));
|
| | | dst += cnt;
|
| | | };
|
| | | cp2dst(arr1.arr, arr1.count);
|
| | | cp2dst(arr2.arr, arr2.count);
|
| | | cp2dst(a1, c1);
|
| | | cp2dst(a2, c2);
|
| | | return tmp;
|
| | | };
|
| | | auto tmparr = shallowMerge(rinfo->channel, rinfo->topic_pub);
|
| | | auto tpcmsg = to_topic(cli, bus_register_topics, tmparr);
|
| | | free(tmparr.arr);
|
| | | size_t tpcc = 0, pubc = 0;
|
| | | char** tpc = creg_rep_topic(rinfo, &tpcc);
|
| | | char** pub = creg_pub_topic(rinfo, &pubc);
|
| | | auto tmparr = shallowMerge(tpc, tpcc, pub, pubc);
|
| | | auto tpcmsg = to_topic(cli, bus_register_topics, tmparr, tpcc + pubc);
|
| | | for(size_t i = 0; i < tpcc+pubc; i++){
|
| | | printf("======>> proc [%s] topic %lu -> %s\n", cproc_name(proc), i, tmparr[i]);
|
| | | }
|
| | | free(tmparr);
|
| | |
|
| | | // if topic pub/sub[net] exist, register topics
|
| | | auto submsg = to_topic(cli, bus_subscribe_topics, rinfo->topic_sub);
|
| | | auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, rinfo->topic_sub_net);
|
| | | tpc = creg_sub_topic(rinfo, &tpcc);
|
| | | auto submsg = to_topic(cli, bus_subscribe_topics, tpc, tpcc);
|
| | | tpc = creg_subnet_topic(rinfo, &tpcc);
|
| | | auto subnetmsg = to_topic(cli, bus_subscribe_topics_net, tpc, tpcc);
|
| | |
|
| | | if (get<0>(submsg) && !cli->thrd_sub)
|
| | | cli->thrd_sub.reset(new thread([cli]{ thread_sub(cli); }));
|
| | |
| | |
|
| | | static void unregistered(client* cli){
|
| | | ProcInfo pinfo;
|
| | | auto tmp = cli->pinfo;
|
| | | pinfo.set_name(tmp->name.str, tmp->name.size);
|
| | | pinfo.set_proc_id(tmp->id.str, tmp->id.size);
|
| | | const auto& reg = pinfo.SerializeAsString();
|
| | | auto proc = cli->pinfo;
|
| | | pinfo.set_name(cproc_name(proc));
|
| | | pinfo.set_proc_id(cproc_id(proc));
|
| | | const auto& pbproc = pinfo.SerializeAsString();
|
| | |
|
| | | void* rep;
|
| | | int repl;
|
| | | to_center<2,3>(cli, bus_unregister, reg.data(), reg.size(), &rep, &repl, sndto);
|
| | | to_center<2,3>(cli, bus_unregister, pbproc.data(), pbproc.size(), &rep, &repl, sndto);
|
| | | }
|
| | |
|
| | | static inline client* ptr(void* handle){ return static_cast<client*>(handle); }
|
| | |
|
| | | void* bus_client_init(const char* srvid, const size_t sidsize, const creg* rinfo){
|
| | | client* cli = new client;
|
| | | cli->pinfo = clone_proc_info(rinfo->pinfo);
|
| | | cli->pinfo = internal_clone_cproc(creg_proc(rinfo));
|
| | |
|
| | | auto pred = [cli]{ return cli->thrd_quit.load(memory_order_relaxed); };
|
| | | const size_t qsize = 5;
|
| | |
| | | void* procid = NULL, *reply = NULL;
|
| | | int pids = 0, replys = 0;
|
| | | auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
|
| | | msg->msg.str, msg->msg.size, &procid, &pids,
|
| | | msg->msg, msg->msgl, &procid, &pids,
|
| | | &reply, &replys, sndto));
|
| | | if (!vmsg.empty()){
|
| | | void* procid = NULL, *data = NULL;
|
| | |
| | | MsgRequestTopicReply msgR;
|
| | | auto err = msgR.mutable_errmsg();
|
| | | err->set_errcode((ErrorCode)msg->errcode);
|
| | | err->set_errstring(msg->errmsg.str, msg->errmsg.size);
|
| | | err->set_errstring(msg->errmsg, msg->errmsgl);
|
| | |
|
| | | msgR.set_data(msg->data.str, msg->data.size);
|
| | | msgR.set_data(msg->data, msg->datal);
|
| | | auto pbstr = msgR.SerializeAsString();
|
| | |
|
| | | auto cli = ptr(handle);
|