#include "nng_wrap.h" #include #include #include "common.h" using namespace std; #include "nng/compat/nanomsg/reqrep.h" #include "nng/compat/nanomsg/pubsub.h" #include "nng/compat/nanomsg/survey.h" namespace nng_wrap { static int client_socket(const string& url, const int protocol, int family=AF_SP){ int sock = nn_socket(family, protocol); if (sock < 0) return sock; int rc = nn_connect(sock, url.c_str()); if (rc < 0) { nn_close(sock); return rc; } return sock; } static void set_socket_timeout(int sock, const int to_ms){ nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &to_ms, sizeof(to_ms)); nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to_ms, sizeof(to_ms)); } static int send_and_recv(int sock, const void* in, const int in_len, void** out, int *out_len) { int rc = nn_send(sock, in, in_len, 0); if (rc != in_len) { nn_close(sock); return -1; } char *msg = NULL; rc = nn_recv(sock, &msg, NN_MSG, 0); if (rc < 0) { nn_close(sock); return rc; } nn_close(sock); copy_memory(out, out_len, msg, rc); nn_freemsg(msg); return 0; } ///////////////////////////////////////////////// // simple interface void free_nng(void* data, const int data_len){ if (data){ free(data); } } void copy_memory(void** dest, int *dest_len, const void* src, const int src_len){ char* tmp = (char*)malloc(src_len); memcpy(tmp, src, src_len); *dest = tmp; if(dest_len) *dest_len = src_len; } void get_last_error(int* ec, void** emsg, int* emsg_len){ *emsg = NULL; *emsg_len = 0; *ec = nn_errno(); const char* msg = nn_strerror(*ec); string strMsg(msg); strMsg = strMsg + "{" + verbose_info + "}"; copy_memory(emsg, emsg_len, strMsg.data(), strMsg.size()); verbose_info.clear(); } /////////////////////////////////////////////////////// // simple request waiting reply int simple_request(const std::string& url, const void* in, const int in_len, void** out, int *out_len, const int to_ms){ TAG; *out = NULL; *out_len = 0; // printf("------>> simple_request url %s\n", (char*)url.data); int sock = client_socket(url, NN_REQ); if (sock < 0) { PRNTVITAG("client_socket faild"); return false; } int tm = to_ms < timeout_req_rep ? timeout_req_rep : to_ms; set_socket_timeout(sock, tm); auto ret = send_and_recv(sock, in, in_len, out, out_len); // printf("------>> simple_request out data address %p len %d\n", out->data, out->data_len); if (ret < 0) { PRNTVITAG("send_and_recv faild"); return false; } return true; } ///////////////////////////////////////////////////// // publish static int pub_connect_to_center(const string& topic, _ps* pub){ if (pub->socket_ > 0) return pub->socket_; pub->url_ = topic; TAG; int sock = client_socket(topic, NN_REQ); if (sock < 0){ PRNTVITAG("client_socket faild"); return -1; } set_socket_timeout(sock, timeout_req_rep); pub->socket_ = sock; pub->t_ = get_thread([](const auto pub){ while (!pub->t_quit_.load()) { _ps::psmsg *msg{NULL}; { unique_lock l{pub->mtx_msg_}; pub->cv_msg_.wait(l, [pub]{ return !pub->msg_.empty() || pub->t_quit_.load(); }); if(pub->t_quit_.load()) break; msg = &pub->msg_.front(); if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;} } string sndmsg(msg->topic_ + msg->data_); int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0); if (rc == (int)sndmsg.size()){ char* tmp{}; rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0); if (rc > 0){ nn_freemsg(tmp); printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size()); lock_guard l{pub->mtx_msg_}; pub->msg_.pop_front(); continue; }else{ PRNTVITAG("publish req-rep thread nn_recv faild"); } }else{ PRNTVITAG("publish req-rep thread nn_send faild"); } } }, pub); return sock; } int publish(const std::string& topic, const void* data, const int data_len, void* arg/*=NULL*/){ _ps* pub = (_ps*)arg; if (!pub) pub = singleton<_ps>(); if (!data && data_len == 0){ // printf("======>> publish start url %s\n", topic.c_str()); return pub_connect_to_center(topic, pub); } if (pub->socket_ < 0){ pub_connect_to_center(pub->url_, pub); } if(pub->socket_ < 0) { PRNTVITAG("publish socket_ < 0"); return -1; } // printf("======>> publish topic %s\n", topic.c_str()); lock_guard l{pub->mtx_msg_}; pub->msg_.emplace_back(topic, string{(const char*)data, (const size_t)data_len}); pub->cv_msg_.notify_one(); return pub->msg_.size(); } /////////////////////////////////////////////// // subscribe int subscribe_center(const std::string& url, void* arg/*=NULL*/){ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); if (sub->socket_ > 0) return 0; sub->url_ = url; TAG; int sock = client_socket(url, NN_SUB); if (sock < 0){ PRNTVITAG("client_socket faild\n"); return -1; } // set_socket_timeout(sock, timeout_req_rep); sub->socket_ = sock; sub->t_ = get_thread([](const auto sub){ while (!sub->t_quit_.load()) { char* m; int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT); if (m_len > 0){ string tmp_msg{m, (size_t)m_len}; nn_freemsg(m); string topic{}, msg{}; { lock_guard l{sub->mtx_topics_}; for(auto && i : sub->topics_){ if (tmp_msg.size() < i.size()) continue; topic = move(tmp_msg.substr(0, i.size())); if (topic == i){ msg = move(tmp_msg.substr(i.size())); break; } } } printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length()); if (!msg.empty()){ lock_guard l(sub->mtx_msg_); sub->msg_.emplace_back(topic, move(msg)); sub->cv_msg_.notify_all(); } }else { { lock_guard l{sub->mtx_failed_topics_}; if (!sub->failed_topics_.empty()){ for(auto iter = sub->failed_topics_.begin(); iter != sub->failed_topics_.end();){ if (nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, iter->c_str(), iter->length()) >= 0){ iter = sub->failed_topics_.erase(iter); }else{ iter++; } } } } this_thread::sleep_for(chrono::milliseconds{6}); // printf("======>> subscribe nn_recv failed %s\n", nn_strerror(nn_errno())); } } }, sub); return 0; } int subscribe_topic(const std::string& topic, void* arg/*=NULL*/){ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); TAG; if (sub->socket_ < 0){ subscribe_center(sub->url_, sub); } if (sub->socket_ < 0) { PRNTVITAG("socket_ < 0"); return -1; } auto ret = nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_SUBSCRIBE, topic.c_str(), topic.length()); // printf("set NN_SUB_SUBSCRIBE topic %s ret %d\n", topic.c_str(), ret); if (ret < 0){ PRNTVITAG("nn_setsockopt failed"); lock_guard l{sub->mtx_failed_topics_}; sub->failed_topics_.insert(topic); } lock_guard l{sub->mtx_topics_}; sub->topics_.insert(topic); return 0; } int unsubscribe_topic(const std::string& topic, void* arg/*=NULL*/){ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); lock_guard l(sub->mtx_topics_); auto iter = sub->topics_.find(topic); if (iter != sub->topics_.end()){ nn_setsockopt(sub->socket_, NN_SUB, NN_SUB_UNSUBSCRIBE, topic.c_str(), topic.length()); sub->topics_.erase(iter); } return 0; } int subscribe_read(std::string* topic, std::string* msg, const int to_ms, void* arg/*=NULL*/){ _ps_sub* sub = (_ps_sub*)arg; if (!sub) sub = singleton<_ps_sub>(); TAG; int tm = to_ms > 0 ? to_ms : 30; unique_lock l(sub->mtx_msg_); auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{ return !sub->msg_.empty(); }); if (!status){ PRNTVITAG("subscribe_read timeout"); return -1; } const auto& tmp = sub->msg_.front(); *topic = tmp.topic_; *msg = tmp.data_; sub->msg_.pop_front(); return 0; } /////////////////////////////////////////////////////////// // survey respondent for heartbeat int respond_survey(const std::string& url, std::string&& fixed_msg, void* arg/*=NULL*/){ _sv* sv = (_sv*)arg; if (!sv) sv = singleton<_sv>(); sv->url_ = url; sv->fixed_msg_ = move(fixed_msg); sv->t_ = get_thread([](const auto sv){ TAG; int& sock = sv->socket_; const auto& msg = sv->fixed_msg_; while (!sv->t_quit_.load()) { if (sock < 0){ sock = client_socket(sv->url_, NN_RESPONDENT); if (sock > 0) set_socket_timeout(sock, 126); } if (sock < 0) continue; char* tmp{}; int rc = nn_recv(sock, &tmp, NN_MSG, 0); if (rc > 0){ nn_freemsg(tmp); rc = nn_send(sock, msg.data(), msg.size(), 0); if (rc < 0){ PRNTVITAG("heartbeat survey failed"); PRNTVITAG(nn_strerror(nn_errno())); } } } }, sv); return 0; } }