| | |
| | | |
| | | namespace nng_wrap { |
| | | |
| | | // static int server_socket(const string& url, const int protocol, int family=AF_SP){ |
| | | // int sock = nn_socket(family, protocol); |
| | | // if (sock < 0) return sock; |
| | | // remove_exist(url); |
| | | // int rc = nn_bind(sock, url.c_str()); |
| | | // if (rc < 0) { |
| | | // nn_close(sock); |
| | | // return rc; |
| | | // } |
| | | // return sock; |
| | | // } |
| | | |
| | | static int client_socket(const string& url, const int protocol, int family=AF_SP){ |
| | | int sock = nn_socket(family, protocol); |
| | | if (sock < 0) return sock; |
| | |
| | | } |
| | | set_socket_timeout(sock, timeout_req_rep); |
| | | pub->socket_ = sock; |
| | | pub->t_ = thread([pub]{ |
| | | pub->t_ = get_thread([](const auto pub){ |
| | | while (!pub->t_quit_.load()) { |
| | | _ps::psmsg *msg{NULL}; |
| | | { |
| | |
| | | } |
| | | |
| | | } |
| | | }); |
| | | }, pub); |
| | | return sock; |
| | | } |
| | | |
| | |
| | | lock_guard<mutex> l{pub->mtx_msg_}; |
| | | pub->msg_.emplace_back(topic, string{(const char*)data, (const size_t)data_len}); |
| | | pub->cv_msg_.notify_one(); |
| | | return pub->msg_.size(); |
| | | return (*pub)(); |
| | | } |
| | | |
| | | /////////////////////////////////////////////// |
| | |
| | | } |
| | | // set_socket_timeout(sock, timeout_req_rep); |
| | | sub->socket_ = sock; |
| | | sub->t_ = thread([sub]{ |
| | | sub->t_ = get_thread([](const auto sub){ |
| | | while (!sub->t_quit_.load()) { |
| | | char* m; |
| | | int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); |
| | |
| | | nn_freemsg(m); |
| | | string topic{}, msg{}; |
| | | { |
| | | lock_guard<mutex> l{sub->mtx_topics_}; |
| | | lock_guard<mutex> l{(*sub)()}; |
| | | for(auto && i : sub->topics_){ |
| | | if (tmp_msg.size() < i.size()) continue; |
| | | topic = tmp_msg.substr(0, i.size()); |
| | | topic = move(tmp_msg.substr(0, i.size())); |
| | | if (topic == i){ |
| | | msg = tmp_msg.substr(i.size()); |
| | | msg = move(tmp_msg.substr(i.size())); |
| | | break; |
| | | } |
| | | } |
| | |
| | | // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno())); |
| | | } |
| | | } |
| | | }); |
| | | }, sub); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | lock_guard<mutex> l{sub->mtx_failed_topics_}; |
| | | sub->failed_topics_.insert(topic); |
| | | } |
| | | lock_guard<mutex> l{sub->mtx_topics_}; |
| | | lock_guard<mutex> l{(*sub)()}; |
| | | sub->topics_.insert(topic); |
| | | |
| | | return 0; |
| | |
| | | _ps_sub* sub = (_ps_sub*)arg; |
| | | if (!sub) sub = singleton<_ps_sub>(); |
| | | |
| | | lock_guard<mutex> l(sub->mtx_topics_); |
| | | lock_guard<mutex> l{(*sub)()}; |
| | | auto iter = sub->topics_.find(topic); |
| | | if (iter != sub->topics_.end()){ |
| | | nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, topic.c_str(), topic.length()); |
| | |
| | | |
| | | sv->url_ = url; |
| | | sv->fixed_msg_ = move(fixed_msg); |
| | | sv->t_ = thread([sv]{ |
| | | sv->t_ = get_thread([](const auto sv){ |
| | | |
| | | TAG; |
| | | |
| | | int& sock = sv->socket_; |
| | | const auto& msg = sv->fixed_msg_; |
| | | |
| | | while (!sv->t_quit_.load()) { |
| | | if (sock < 0){ |
| | | sock = client_socket(sv->url_, NN_RESPONDENT); |
| | |
| | | int rc = nn_recv(sock, &tmp, NN_MSG, 0); |
| | | if (rc > 0){ |
| | | nn_freemsg(tmp); |
| | | rc = nn_send(sock, msg.data(), msg.size(), 0); |
| | | rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0); |
| | | if (rc < 0){ |
| | | PRNTVITAG("heartbeat survey failed"); |
| | | PRNTVITAG(nn_strerror(nn_errno())); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | }, sv); |
| | | |
| | | return 0; |
| | | } |