| | |
| | | // printf("======>> BHRegisterTopics topic %s\n", mtl.topic_list(i).c_str()); |
| | | mtl2.add_topic_list(mtl.topic_list(i)); |
| | | } |
| | | std::move(mtl); |
| | | |
| | | string msg(mtl2.SerializeAsString()); |
| | | std::move(mtl2); |
| | | |
| | | auto ret = simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms); |
| | | // printf("======>> BHRegisterTopics return value %d msg size %lu\n", ret, msg.size()); |
| | |
| | | { |
| | | if (!topic || topic_len <= 0) return false; |
| | | |
| | | auto url(get_url(URLQueryTopic)); |
| | | const auto& url = get_url(URLQueryTopic); |
| | | if (url.empty()) { |
| | | set_last_error("BHQueryTopicAddress url empty"); |
| | | return false; |
| | |
| | | { |
| | | // if (!query || query_len <= 0) return false; |
| | | |
| | | auto url(get_url(URLQueryProcs)); |
| | | const auto& url = get_url(URLQueryProcs); |
| | | if (url.empty()) { |
| | | set_last_error("BHQueryProcs url empty"); |
| | | return false; |
| | |
| | | newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len}); |
| | | |
| | | string msg(newPub.SerializeAsString()); |
| | | std::move(newPub); |
| | | |
| | | if (pub.topic().empty()) { |
| | | set_last_error("BHPublish topic empty"); |
| | | return false; |
| | |
| | | // printf("BHSubscribeTopics %s\n", t.c_str()); |
| | | subscribe_topic(t); |
| | | } |
| | | std::move(mtl); |
| | | |
| | | auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms); |
| | | if (!ret){ |
| | |
| | | |
| | | bhome_msg::MsgQueryTopic msg; |
| | | msg.set_topic(req.topic()); |
| | | std::move(req); |
| | | |
| | | string s(msg.SerializeAsString()); |
| | | std::move(msg); |
| | | |
| | | void* reply2; |
| | | int reply_len2; |
| | |
| | | int ret = BHQueryTopicAddress(remote.c_str(), remote.length(), s.data(), s.size(), |
| | | &reply2, &reply_len2, timeout_ms); |
| | | if (!ret) return false; |
| | | |
| | | |
| | | bhome_msg::MsgQueryTopicReply mr; |
| | | if (!mr.ParseFromArray(reply2, reply_len2)){ |
| | |
| | | } |
| | | |
| | | auto& node_addr = mr.node_address(0); |
| | | auto& procid = node_addr.proc_id(); |
| | | *proc_id = node_addr.proc_id(); |
| | | |
| | | *proc_id = procid; |
| | | return true; |
| | | } |
| | | |
| | |
| | | bus *b = new bus; |
| | | bhome_msg::ProcInfo pi; |
| | | if (pi.ParseFromArray(proc_info, proc_info_len)) get<8>(*b) = pi.proc_id(); |
| | | std::move(pi); |
| | | |
| | | const auto& url_hb = get_url(URLHeartBeat); |
| | | respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<5>(*b)); |
| | |
| | | for(int i = 0; i < mtl.topic_list_size(); i++){ |
| | | mtl2.add_topic_list(mtl.topic_list(i)); |
| | | } |
| | | std::move(mtl); |
| | | |
| | | string msg(mtl2.SerializeAsString()); |
| | | std::move(mtl2); |
| | | |
| | | const auto& url = get_url(URLRegTopic); |
| | | if (url.empty()) { |
| | | set_last_error("bus_register_topics url empty"); |
| | |
| | | return false; |
| | | } |
| | | |
| | | auto url(get_url(URLQueryTopic)); |
| | | const auto& url = get_url(URLQueryTopic); |
| | | if (url.empty()) { |
| | | set_last_error("bus_query_topic_address url empty"); |
| | | return false; |
| | |
| | | return false; |
| | | } |
| | | |
| | | auto url(get_url(URLQueryProcs)); |
| | | const auto& url = get_url(URLQueryProcs); |
| | | if (url.empty()) { |
| | | set_last_error("bus_query_procs url empty"); |
| | | return false; |
| | |
| | | for(int i = 0; i < mtl.topic_list_size(); i ++){ |
| | | subscribe_topic(mtl.topic_list(i), &get<2>(*b)); |
| | | } |
| | | std::move(mtl); |
| | | |
| | | /////////////////////////////////////////////////// |
| | | auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms); |
| | |
| | | newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len}); |
| | | |
| | | string msg(newPub.SerializeAsString()); |
| | | std::move(newPub); |
| | | |
| | | auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<1>(*b)); |
| | | if (ret > 0) return true; |
| | | return false; |
| | |
| | | // simple interface |
| | | |
| | | void free_nng(void* data, const int data_len){ |
| | | if (data){ |
| | | free(data); |
| | | } |
| | | free(data); |
| | | } |
| | | |
| | | void copy_memory(void** dest, int *dest_len, const void* src, const int src_len){ |
| | |
| | | sub->socket_ = sock; |
| | | sub->t_ = get_thread([](const auto sub){ |
| | | while (!sub->t_quit_.load()) { |
| | | char* m; |
| | | char* m{}; |
| | | int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); |
| | | if (m_len > 0){ |
| | | string tmp_msg{m, (size_t)m_len}; |
| | | nn_freemsg(m); |
| | | const auto topic{tmp_msg.c_str()}; |
| | | string topic{tmp_msg.c_str()}; |
| | | string msg{}; |
| | | { |
| | | lock_guard<mutex> l{sub->operator()()}; |
| | |
| | | // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length()); |
| | | if (!msg.empty()){ |
| | | lock_guard<mutex> l(sub->mtx_msg_); |
| | | sub->msg_.emplace_back(topic, move(msg)); |
| | | sub->msg_.emplace_back(move(topic), move(msg)); |
| | | sub->cv_msg_.notify_all(); |
| | | } |
| | | |
| | |
| | | PRNTVITAG("subscribe_read timeout"); |
| | | return -1; |
| | | } |
| | | const auto& tmp = sub->msg_.front(); |
| | | *topic = tmp.topic_; |
| | | *msg = tmp.data_; |
| | | auto& tmp = sub->msg_.front(); |
| | | *topic = std::move(tmp.topic_); |
| | | *msg = std::move(tmp.data_); |
| | | sub->msg_.pop_front(); |
| | | |
| | | return 0; |
| | |
| | | |
| | | _rr* rep = (_rr*)w->user_data; |
| | | |
| | | string msg{(const char*)nng_msg_body(om), nng_msg_len(om)}; |
| | | nng_msg_free(om); |
| | | |
| | | auto t = (*rep)(); |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | rep->works_.emplace(get<0>(t), w); |
| | | get<1>(t).emplace(get<0>(t), move(msg)); |
| | | get<1>(t).emplace(get<0>(t), string{(const char*)nng_msg_body(om), nng_msg_len(om)}); |
| | | get<0>(t)++; |
| | | rep->cv_msg_.notify_all(); |
| | | |
| | | nng_msg_free(om); |
| | | } |
| | | |
| | | static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode) |
| | |
| | | struct work* w{}; |
| | | { |
| | | auto key = *(static_cast<uint64_t*>(const_cast<void*>(src))); |
| | | free(const_cast<void*>(src)); |
| | | |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | auto iter = rep->works_.find(key); |