| | |
| | | lock_guard<mutex> l{pub->mtx_msg_}; |
| | | pub->msg_.emplace_back(topic, string{(const char*)data, (const size_t)data_len}); |
| | | pub->cv_msg_.notify_one(); |
| | | return pub->msg_.size(); |
| | | return (*pub)(); |
| | | } |
| | | |
| | | /////////////////////////////////////////////// |
| | |
| | | nn_freemsg(m); |
| | | string topic{}, msg{}; |
| | | { |
| | | lock_guard<mutex> l{sub->mtx_topics_}; |
| | | lock_guard<mutex> l{(*sub)()}; |
| | | for(auto && i : sub->topics_){ |
| | | if (tmp_msg.size() < i.size()) continue; |
| | | topic = move(tmp_msg.substr(0, i.size())); |
| | |
| | | lock_guard<mutex> l{sub->mtx_failed_topics_}; |
| | | sub->failed_topics_.insert(topic); |
| | | } |
| | | lock_guard<mutex> l{sub->mtx_topics_}; |
| | | lock_guard<mutex> l{(*sub)()}; |
| | | sub->topics_.insert(topic); |
| | | |
| | | return 0; |
| | |
| | | _ps_sub* sub = (_ps_sub*)arg; |
| | | if (!sub) sub = singleton<_ps_sub>(); |
| | | |
| | | lock_guard<mutex> l(sub->mtx_topics_); |
| | | lock_guard<mutex> l{(*sub)()}; |
| | | auto iter = sub->topics_.find(topic); |
| | | if (iter != sub->topics_.end()){ |
| | | nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, topic.c_str(), topic.length()); |
| | |
| | | TAG; |
| | | |
| | | int& sock = sv->socket_; |
| | | const auto& msg = sv->fixed_msg_; |
| | | |
| | | while (!sv->t_quit_.load()) { |
| | | if (sock < 0){ |
| | | sock = client_socket(sv->url_, NN_RESPONDENT); |
| | |
| | | int rc = nn_recv(sock, &tmp, NN_MSG, 0); |
| | | if (rc > 0){ |
| | | nn_freemsg(tmp); |
| | | rc = nn_send(sock, msg.data(), msg.size(), 0); |
| | | rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0); |
| | | if (rc < 0){ |
| | | PRNTVITAG("heartbeat survey failed"); |
| | | PRNTVITAG(nn_strerror(nn_errno())); |