| | |
| | | // simple interface |
| | | |
| | | void free_nng(void* data, const int data_len){ |
| | | if (data){ |
| | | free(data); |
| | | } |
| | | free(data); |
| | | } |
| | | |
| | | void copy_memory(void** dest, int *dest_len, const void* src, const int src_len){ |
| | |
| | | sub->socket_ = sock; |
| | | sub->t_ = get_thread([](const auto sub){ |
| | | while (!sub->t_quit_.load()) { |
| | | char* m; |
| | | char* m{}; |
| | | int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); |
| | | if (m_len > 0){ |
| | | string tmp_msg{m, (size_t)m_len}; |
| | | nn_freemsg(m); |
| | | const auto topic{tmp_msg.c_str()}; |
| | | string topic{tmp_msg.c_str()}; |
| | | string msg{}; |
| | | { |
| | | lock_guard<mutex> l{sub->operator()()}; |
| | |
| | | // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length()); |
| | | if (!msg.empty()){ |
| | | lock_guard<mutex> l(sub->mtx_msg_); |
| | | sub->msg_.emplace_back(topic, move(msg)); |
| | | sub->msg_.emplace_back(move(topic), move(msg)); |
| | | sub->cv_msg_.notify_all(); |
| | | } |
| | | |
| | |
| | | PRNTVITAG("subscribe_read timeout"); |
| | | return -1; |
| | | } |
| | | const auto& tmp = sub->msg_.front(); |
| | | *topic = tmp.topic_; |
| | | *msg = tmp.data_; |
| | | auto& tmp = sub->msg_.front(); |
| | | *topic = std::move(tmp.topic_); |
| | | *msg = std::move(tmp.data_); |
| | | sub->msg_.pop_front(); |
| | | |
| | | return 0; |
| | |
| | | |
| | | _rr* rep = (_rr*)w->user_data; |
| | | |
| | | string msg{(const char*)nng_msg_body(om), nng_msg_len(om)}; |
| | | nng_msg_free(om); |
| | | |
| | | auto t = (*rep)(); |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | rep->works_.emplace(get<0>(t), w); |
| | | get<1>(t).emplace(get<0>(t), move(msg)); |
| | | get<1>(t).emplace(get<0>(t), string{(const char*)nng_msg_body(om), nng_msg_len(om)}); |
| | | get<0>(t)++; |
| | | rep->cv_msg_.notify_all(); |
| | | |
| | | nng_msg_free(om); |
| | | } |
| | | |
| | | static struct work *alloc_work(nng_socket sock, _rr* rep, const int mode) |
| | |
| | | struct work* w{}; |
| | | { |
| | | auto key = *(static_cast<uint64_t*>(const_cast<void*>(src))); |
| | | free(const_cast<void*>(src)); |
| | | |
| | | lock_guard<mutex> l{rep->mtx_msg_}; |
| | | auto iter = rep->works_.find(key); |