| | |
| | | if (rc == (int)sndmsg.size()){ |
| | | char* tmp{}; |
| | | rc = nn_recv(pub_.socket_, &tmp, NN_MSG, 0); |
| | | nn_freemsg(tmp); |
| | | if (rc > 0){ |
| | | nn_freemsg(tmp); |
| | | printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size()); |
| | | lock_guard<mutex> l{pub_.mtx_msg_}; |
| | | pub_.msg_.pop_front(); |
| | |
| | | |
| | | char* tmp{}; |
| | | int rc = nn_recv(sock, &tmp, NN_MSG, 0); |
| | | nn_freemsg(tmp); |
| | | if (rc > 0){ |
| | | nn_freemsg(tmp); |
| | | rc = nn_send(sock, msg.data(), msg.size(), 0); |
| | | if (rc < 0){ |
| | | PRNTVITAG("heartbeat survey failed"); |
| | |
| | | return (w); |
| | | } |
| | | |
| | | static constexpr int PARALLEL = 62; |
| | | static struct work* works_local[PARALLEL]{}; |
| | | static struct work* works_remote[PARALLEL]{}; |
| | | |
| | | static int create_server(nng_socket* sock, const string& url, work** works){ |
| | | static int create_server(nng_socket* sock, const string& url, const int count){ |
| | | TAG; |
| | | if (sock->id > 0) return 0; |
| | | |
| | |
| | | PRNTVITAG(url); |
| | | return rv; |
| | | } |
| | | for (int i = 0; i < PARALLEL; i++) { |
| | | |
| | | work** works = (work**)malloc(sizeof(work*) * count); |
| | | for (int i = 0; i < count; i++) { |
| | | works[i] = alloc_work(*sock); |
| | | } |
| | | |
| | | remove_exist(url); |
| | | rv = nng_listen(*sock, url.c_str(), NULL, 0); |
| | | if (rv < 0){ |
| | | free(works); |
| | | PRNTVITAG("create_server nng_listen failed"); |
| | | PRNTVITAG(url); |
| | | return rv; |
| | | } |
| | | |
| | | for (int i = 0; i < PARALLEL; i++) { |
| | | for (int i = 0; i < count; i++) { |
| | | server_cb(works[i]); // this starts them going (INIT state) |
| | | } |
| | | |
| | | free(works); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | ipc = url; |
| | | } |
| | | reply_.url_ = ipc; |
| | | if(create_server(&reply_.sock_local_, ipc, works_local) != 0) return -1; |
| | | if(create_server(&reply_.sock_local_, ipc, 62) != 0) return -1; |
| | | |
| | | if (port > 0){ |
| | | reply_.port_ = port; |
| | | ipc = "tcp://0.0.0.0:" + to_string(port); |
| | | if(create_server(&reply_.sock_remote_, ipc, works_remote) != 0) return -1; |
| | | if(create_server(&reply_.sock_remote_, ipc, 62) != 0) return -1; |
| | | }else { |
| | | reply_.sock_remote_.id = numeric_limits<int32_t>::max(); |
| | | } |