// // Copyright 2021 Capitar IT Group BV // Copyright 2020 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include #include #include #include #include "core/nng_impl.h" #include "zthash.h" #include "nng/transport/zerotier/zerotier.h" #include // ZeroTier Transport. This sits on the ZeroTier L2 network, which itself // is implemented on top of UDP. This requires the 3rd party // libzerotiercore library (which is GPLv3!) and platform specific UDP // functionality to be built in. Note that care must be taken to link // dynamically if one wishes to avoid making your entire application GPL3. // (Alternatively ZeroTier offers commercial licenses which may prevent // this particular problem.) This implementation does not make use of // certain advanced capabilities in ZeroTier such as more sophisticated // route management and TCP fallback. You need to have connectivity // to the Internet to use this. (Or at least to your Planetary root.) // // Because ZeroTier takes a while to establish connectivity, it is even // more important that applications using the ZeroTier transport not // assume that a connection will be immediately available. It can take // quite a few seconds for peer-to-peer connectivity to be established. // // The ZeroTier transport was funded by Capitar IT Group, BV. // // This transport is highly experimental. // ZeroTier and UDP are connectionless, but nng is designed around // connection oriented paradigms. An "unreliable" connection is created // on top using our own network protocol. The details of this are // documented in the RFC. // Every participant has an "address", which is a 64-bit value constructed // using the ZT node number in the upper 40-bits, and a 24-bit port number // in the lower bits. We elect to operate primarily on these addresses, // but the wire protocol relies on just conveying the 24-bit port along // with the MAC address (from which the ZT node number can be derived, // given the network ID.) typedef struct zt_pipe zt_pipe; typedef struct zt_ep zt_ep; typedef struct zt_node zt_node; typedef struct zt_frag zt_frag; typedef struct zt_fraglist zt_fraglist; // Port numbers are stored as 24-bit values in network byte order. #define ZT_GET24(ptr, v) \ v = (((uint32_t) ((uint8_t) (ptr)[0])) << 16) + \ (((uint32_t) ((uint8_t) (ptr)[1])) << 8) + \ (((uint32_t) (uint8_t) (ptr)[2])) #define ZT_PUT24(ptr, u) \ do { \ (ptr)[0] = (uint8_t) (((uint32_t) (u)) >> 16); \ (ptr)[1] = (uint8_t) (((uint32_t) (u)) >> 8); \ (ptr)[2] = (uint8_t) ((uint32_t) (u)); \ } while (0) static const uint16_t zt_ethertype = 0x901; static const uint8_t zt_version = 0x01; static const uint32_t zt_ephemeral = 0x800000u; // start of ephemeral ports static const uint32_t zt_max_port = 0xffffffu; // largest port static const uint32_t zt_port_mask = 0xffffffu; // mask of valid ports static const uint32_t zt_port_shift = 24; static const int zt_conn_tries = 240; // max connect attempts static const nng_duration zt_conn_time = 500; // between attempts (msec) static const int zt_ping_tries = 10; // max keepalive attempts static const nng_duration zt_ping_time = 60000; // keepalive time (msec) // These are compile time tunables for now. enum zt_tunables { zt_listenq = 128, // backlog queue length zt_listen_expire = 10000, // maximum time in backlog (msec) zt_rcv_bufsize = 4096, // max UDP recv zt_udp_sendq = 16, // outgoing UDP queue length zt_recvq = 2, // max pending recv (per pipe) zt_recv_stale = 1000, // frags older than are stale (msec) }; enum zt_op_codes { zt_op_data = 0x00, // data, final fragment zt_op_conn_req = 0x10, // connect request zt_op_conn_ack = 0x12, // connect accepted zt_op_disc_req = 0x20, // disconnect request (no ack) zt_op_ping = 0x30, // ping request zt_op_pong = 0x32, // ping response zt_op_error = 0x40, // error response }; enum zt_offsets { zt_offset_op = 0x00, zt_offset_flags = 0x01, zt_offset_version = 0x02, // protocol version number (2 bytes) zt_offset_zero1 = 0x04, // reserved, must be zero (1 byte) zt_offset_dst_port = 0x05, // destination port (3 bytes) zt_offset_zero2 = 0x08, // reserved, must be zero (1 byte) zt_offset_src_port = 0x09, // source port number (3 bytes) zt_offset_creq_proto = 0x0C, // SP protocol number (2 bytes) zt_offset_cack_proto = 0x0C, // SP protocol number (2 bytes) zt_offset_err_code = 0x0C, // error code (1 byte) zt_offset_err_msg = 0x0D, // error message (string) zt_offset_data_id = 0x0C, // message ID (2 bytes) zt_offset_data_fragsz = 0x0E, // fragment size zt_offset_data_frag = 0x10, // fragment number, first is 1 (2 bytes) zt_offset_data_nfrag = 0x12, // total fragments (2 bytes) zt_offset_data_data = 0x14, // user payload zt_size_headers = 0x0C, // size of headers zt_size_conn_req = 0x0E, // size of conn_req (connect request) zt_size_conn_ack = 0x0E, // size of conn_ack (connect reply) zt_size_disc_req = 0x0C, // size of disc_req (disconnect) zt_size_ping = 0x0C, // size of ping request zt_size_pong = 0x0C, // size of ping reply zt_size_data = 0x14, // size of data message (w/o payload) }; enum zt_errors { zt_err_refused = 0x01, // Connection refused zt_err_notconn = 0x02, // Connection does not exit zt_err_wrongsp = 0x03, // SP protocol mismatch zt_err_proto = 0x04, // Other protocol error zt_err_msgsize = 0x05, // Message to large zt_err_unknown = 0x06, // Other errors }; // This node structure is wrapped around the ZT_node; this allows us to // have multiple endpoints referencing the same ZT_node, but also to // support different nodes (identities) based on different home dirs. // This means we need to stick these on a global linked list, manage // them with a reference count, and uniquely identify them using the // homedir. struct zt_node { char zn_path[NNG_MAXADDRLEN]; // ought to be sufficient nni_file_lockh *zn_flock; ZT_Node * zn_znode; uint64_t zn_self; nni_list_node zn_link; bool zn_closed; nni_plat_udp * zn_udp4; nni_plat_udp * zn_udp6; nni_list zn_eplist; nni_list zn_plist; zt_hash * zn_ports; zt_hash * zn_eps; zt_hash * zn_lpipes; zt_hash * zn_rpipes; nni_aio * zn_rcv4_aio; uint8_t * zn_rcv4_buf; nng_sockaddr zn_rcv4_addr; nni_aio * zn_rcv6_aio; uint8_t * zn_rcv6_buf; nng_sockaddr zn_rcv6_addr; nni_thr zn_bgthr; int64_t zn_bgtime; nni_cv zn_bgcv; nni_cv zn_snd6_cv; }; // The fragment list is used to keep track of incoming received // fragments for reassembly into a complete message. struct zt_fraglist { nni_time fl_time; // time first frag was received uint32_t fl_msgid; // message id int fl_ready; // we have all messages size_t fl_fragsz; unsigned int fl_nfrags; uint8_t * fl_missing; size_t fl_missingsz; nni_msg * fl_msg; }; struct zt_pipe { nni_list_node zp_link; zt_node * zp_ztn; nni_pipe * zp_npipe; uint64_t zp_nwid; uint64_t zp_laddr; uint64_t zp_raddr; uint16_t zp_peer; uint16_t zp_proto; uint16_t zp_next_msgid; size_t zp_rcvmax; size_t zp_mtu; nni_aio * zp_user_rxaio; nni_time zp_last_recv; zt_fraglist zp_recvq[zt_recvq]; int zp_ping_try; int zp_ping_tries; bool zp_closed; nni_duration zp_ping_time; nni_aio * zp_ping_aio; uint8_t * zp_send_buf; nni_atomic_flag zp_reaped; nni_reap_node zp_reap; }; typedef struct zt_creq zt_creq; struct zt_creq { uint64_t cr_expire; uint64_t cr_raddr; uint16_t cr_proto; }; struct zt_ep { nni_list_node ze_link; char ze_home[NNG_MAXADDRLEN]; // should be enough zt_node * ze_ztn; uint64_t ze_nwid; bool ze_running; uint64_t ze_raddr; // remote node address uint64_t ze_laddr; // local node address uint16_t ze_proto; size_t ze_rcvmax; nni_aio * ze_aio; nni_aio * ze_creq_aio; bool ze_creq_active; int ze_creq_try; nni_list ze_aios; int ze_mtu; int ze_ping_tries; nni_duration ze_ping_time; nni_duration ze_conn_time; int ze_conn_tries; // Incoming connection requests (server only). We only // only have "accepted" requests -- that is we won't have an // established connection/pipe unless the application calls // accept. Since the "application" is our library, that should // be pretty much as fast we can run. zt_creq ze_creqs[zt_listenq]; int ze_creq_head; int ze_creq_tail; nni_dialer * ze_ndialer; nni_listener *ze_nlistener; }; // Locking strategy. At present the ZeroTier core is not reentrant or fully // threadsafe. (We expect this will be fixed.) Furthermore, there are // some significant challenges in dealing with locks associated with the // callbacks, etc. So we take a big-hammer approach, and just use a single // global lock for everything. We hold this lock when calling into the // ZeroTier framework. Since ZeroTier has no independent threads, that // means that it will always hold this lock in its core, and the lock will // also be held automatically in any of our callbacks. We never hold any // other locks across ZeroTier core calls. We may not acquire the global // lock in callbacks (they will already have it held). Any other locks // can be acquired as long as they are not held during calls into ZeroTier. // // This will have a detrimental impact on performance, but to be completely // honest we don't think anyone will be using the ZeroTier transport in // performance critical applications; scalability may become a factor for // large servers sitting in a ZeroTier hub situation. (Then again, since // only the zerotier processing is single threaded, it may not // be that much of a bottleneck -- really depends on how expensive these // operations are. We can use lockstat or other lock-hotness tools to // check for this later.) static nni_mtx zt_lk; static nni_list zt_nodes; static void zt_ep_send_conn_req(zt_ep *); static void zt_ep_conn_req_cb(void *); static void zt_ep_doaccept(zt_ep *); static void zt_pipe_dorecv(zt_pipe *); static int zt_pipe_alloc(zt_pipe **, zt_ep *, uint64_t, uint64_t, bool); static void zt_pipe_ping_cb(void *); static void zt_fraglist_clear(zt_fraglist *); static void zt_fraglist_free(zt_fraglist *); static void zt_virtual_recv(ZT_Node *, void *, void *, uint64_t, void **, uint64_t, uint64_t, unsigned int, unsigned int, const void *, unsigned int); static void zt_pipe_start_ping(zt_pipe *); static int64_t zt_now(void) { // We return msec return ((int64_t) nni_clock()); } static void zt_bgthr(void *arg) { zt_node *ztn = arg; int64_t now; nni_mtx_lock(&zt_lk); for (;;) { now = zt_now(); if (ztn->zn_closed) { break; } if (now < ztn->zn_bgtime) { nni_cv_until(&ztn->zn_bgcv, (nni_time) ztn->zn_bgtime); continue; } ztn->zn_bgtime = 0; ZT_Node_processBackgroundTasks(ztn->zn_znode, NULL, now, &now); ztn->zn_bgtime = now; } nni_mtx_unlock(&zt_lk); } static void zt_node_resched(zt_node *ztn, int64_t msec) { if (msec > ztn->zn_bgtime && ztn->zn_bgtime != 0) { return; } ztn->zn_bgtime = msec; nni_cv_wake1(&ztn->zn_bgcv); } static void zt_node_rcv4_cb(void *arg) { zt_node * ztn = arg; nni_aio * aio = ztn->zn_rcv4_aio; struct sockaddr_storage sa; struct sockaddr_in * sin; nng_sockaddr_in * nsin; int64_t now; if (nni_aio_result(aio) != 0) { // Outside of memory exhaustion, we can't really think // of any reason for this to legitimately fail. // Arguably we should inject a fallback delay, but for // now we just carry on. return; } memset(&sa, 0, sizeof(sa)); sin = (void *) &sa; nsin = &ztn->zn_rcv4_addr.s_in; sin->sin_family = AF_INET; sin->sin_port = nsin->sa_port; sin->sin_addr.s_addr = nsin->sa_addr; nni_mtx_lock(&zt_lk); now = zt_now(); // We are not going to perform any validation of the data; we // just pass this straight into the ZeroTier core. // XXX: CHECK THIS, if it fails then we have a fatal error with // the znode, and have to shut everything down. ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, ztn->zn_rcv4_buf, nni_aio_count(aio), &now); // Schedule background work zt_node_resched(ztn, now); // Schedule another receive. if (ztn->zn_udp4 != NULL) { nni_iov iov; iov.iov_buf = ztn->zn_rcv4_buf; iov.iov_len = zt_rcv_bufsize; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_input(aio, 0, &ztn->zn_rcv4_addr); nni_plat_udp_recv(ztn->zn_udp4, aio); } nni_mtx_unlock(&zt_lk); } static void zt_node_rcv6_cb(void *arg) { zt_node * ztn = arg; nni_aio * aio = ztn->zn_rcv6_aio; struct sockaddr_storage sa; struct sockaddr_in6 * sin6; struct nng_sockaddr_in6 *nsin6; int64_t now; if (nni_aio_result(aio) != 0) { // Outside of memory exhaustion, we can't really think // of any reason for this to legitimately fail. // Arguably we should inject a fallback delay, but for // now we just carry on. return; } memset(&sa, 0, sizeof(sa)); sin6 = (void *) &sa; nsin6 = &ztn->zn_rcv6_addr.s_in6; sin6->sin6_family = AF_INET6; sin6->sin6_port = nsin6->sa_port; memcpy(&sin6->sin6_addr, nsin6->sa_addr, 16); nni_mtx_lock(&zt_lk); now = (uint64_t) zt_now(); // msec // We are not going to perform any validation of the data; we // just pass this straight into the ZeroTier core. ZT_Node_processWirePacket(ztn->zn_znode, NULL, now, 0, (void *) &sa, ztn->zn_rcv6_buf, nni_aio_count(aio), &now); // Schedule background work zt_node_resched(ztn, now); // Schedule another receive. if (ztn->zn_udp6 != NULL) { nni_iov iov; iov.iov_buf = ztn->zn_rcv6_buf; iov.iov_len = zt_rcv_bufsize; nni_aio_set_iov(aio, 1, &iov); nni_aio_set_input(aio, 0, &ztn->zn_rcv6_addr); nni_plat_udp_recv(ztn->zn_udp6, aio); } nni_mtx_unlock(&zt_lk); } static uint64_t zt_mac_to_node(uint64_t mac, uint64_t nwid) { uint64_t node; // This extracts a node address from a mac address. The // network ID is mixed in, and has to be extricated. We // the node ID is located in the lower 40 bits, and scrambled // against the nwid. node = mac & 0xffffffffffull; node ^= ((nwid >> 8) & 0xff) << 32; node ^= ((nwid >> 16) & 0xff) << 24; node ^= ((nwid >> 24) & 0xff) << 16; node ^= ((nwid >> 32) & 0xff) << 8; node ^= (nwid >> 40) & 0xff; return (node); } static uint64_t zt_node_to_mac(uint64_t node, uint64_t nwid) { uint64_t mac; // We use LSB of network ID, and make sure that we clear // multicast and set local administration -- this is the first // octet of the 48 bit mac address. We also avoid 0x52, which // is known to be used in KVM, libvirt, etc. mac = ((uint8_t) (nwid & 0xfe) | 0x02); if (mac == 0x52) { mac = 0x32; } mac <<= 40; mac |= node; // The rest of the network ID is XOR'd in, in reverse byte // order. mac ^= ((nwid >> 8) & 0xff) << 32; mac ^= ((nwid >> 16) & 0xff) << 24; mac ^= ((nwid >> 24) & 0xff) << 16; mac ^= ((nwid >> 32) & 0xff) << 8; mac ^= (nwid >> 40) & 0xff; return (mac); } static int zt_result(enum ZT_ResultCode rv) { switch (rv) { case ZT_RESULT_OK: return (0); case ZT_RESULT_OK_IGNORED: return (0); case ZT_RESULT_FATAL_ERROR_OUT_OF_MEMORY: return (NNG_ENOMEM); case ZT_RESULT_FATAL_ERROR_DATA_STORE_FAILED: return (NNG_EPERM); case ZT_RESULT_FATAL_ERROR_INTERNAL: return (NNG_EINTERNAL); case ZT_RESULT_ERROR_NETWORK_NOT_FOUND: return (NNG_EADDRINVAL); case ZT_RESULT_ERROR_UNSUPPORTED_OPERATION: return (NNG_ENOTSUP); case ZT_RESULT_ERROR_BAD_PARAMETER: return (NNG_EINVAL); default: return (NNG_ETRANERR + (int) rv); } } // ZeroTier Node API callbacks static int zt_virtual_config(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, void **netptr, enum ZT_VirtualNetworkConfigOperation op, const ZT_VirtualNetworkConfig *config) { zt_node *ztn = userptr; zt_ep * ep; NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(netptr); NNI_ASSERT(node == ztn->zn_znode); // Maybe we don't have to create taps or anything like that. // We do get our mac and MTUs from this, so there's that. switch (op) { case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_UP: case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_CONFIG_UPDATE: // We only really care about changes to the MTU. From // an API perspective the MAC could change, but that // cannot really happen because the node identity and // the nwid are fixed. NNI_LIST_FOREACH (&ztn->zn_eplist, ep) { NNI_ASSERT(nwid == config->nwid); if (ep->ze_nwid != config->nwid) { continue; } ep->ze_mtu = config->mtu; } break; case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DESTROY: case ZT_VIRTUAL_NETWORK_CONFIG_OPERATION_DOWN: // XXX: tear down endpoints? default: break; } return (0); } // zt_send modifies the start of the supplied buffer to update the // message headers with protocol specific details (version, port numbers, // etc.) and then sends it over the virtual network. static void zt_send(zt_node *ztn, uint64_t nwid, uint8_t op, uint64_t raddr, uint64_t laddr, uint8_t *data, size_t len) { uint64_t srcmac = zt_node_to_mac(laddr >> 24, nwid); uint64_t dstmac = zt_node_to_mac(raddr >> 24, nwid); int64_t now = zt_now(); NNI_ASSERT(len >= zt_size_headers); data[zt_offset_op] = op; data[zt_offset_flags] = 0; data[zt_offset_zero1] = 0; data[zt_offset_zero2] = 0; NNI_PUT16(data + zt_offset_version, zt_version); ZT_PUT24(data + zt_offset_dst_port, raddr & zt_port_mask); ZT_PUT24(data + zt_offset_src_port, laddr & zt_port_mask); (void) ZT_Node_processVirtualNetworkFrame(ztn->zn_znode, NULL, now, nwid, srcmac, dstmac, zt_ethertype, 0, data, len, &now); zt_node_resched(ztn, now); } static void zt_send_err(zt_node *ztn, uint64_t nwid, uint64_t raddr, uint64_t laddr, uint8_t err, const char *msg) { uint8_t data[128]; NNI_ASSERT((strlen(msg) + zt_offset_err_msg) < sizeof(data)); data[zt_offset_err_code] = err; nni_strlcpy((char *) data + zt_offset_err_msg, msg, sizeof(data) - zt_offset_err_msg); zt_send(ztn, nwid, zt_op_error, raddr, laddr, data, strlen(msg) + zt_offset_err_msg); } static void zt_pipe_send_err(zt_pipe *p, uint8_t err, const char *msg) { zt_send_err(p->zp_ztn, p->zp_nwid, p->zp_raddr, p->zp_laddr, err, msg); } static void zt_pipe_send_disc_req(zt_pipe *p) { uint8_t data[zt_size_disc_req]; zt_send(p->zp_ztn, p->zp_nwid, zt_op_disc_req, p->zp_raddr, p->zp_laddr, data, sizeof(data)); } static void zt_pipe_send_ping(zt_pipe *p) { uint8_t data[zt_size_ping]; zt_send(p->zp_ztn, p->zp_nwid, zt_op_ping, p->zp_raddr, p->zp_laddr, data, sizeof(data)); } static void zt_pipe_send_pong(zt_pipe *p) { uint8_t data[zt_size_ping]; zt_send(p->zp_ztn, p->zp_nwid, zt_op_pong, p->zp_raddr, p->zp_laddr, data, sizeof(data)); } static void zt_pipe_send_conn_ack(zt_pipe *p) { uint8_t data[zt_size_conn_ack]; NNI_PUT16(data + zt_offset_cack_proto, p->zp_proto); zt_send(p->zp_ztn, p->zp_nwid, zt_op_conn_ack, p->zp_raddr, p->zp_laddr, data, sizeof(data)); } static void zt_ep_send_conn_req(zt_ep *ep) { uint8_t data[zt_size_conn_req]; NNI_PUT16(data + zt_offset_creq_proto, ep->ze_proto); zt_send(ep->ze_ztn, ep->ze_nwid, zt_op_conn_req, ep->ze_raddr, ep->ze_laddr, data, sizeof(data)); } static void zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) { zt_node *ztn = ep->ze_ztn; nni_aio *aio = ep->ze_creq_aio; zt_pipe *p; int rv; if (ep->ze_ndialer == NULL) { zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Inappropriate operation"); return; } if (len != zt_size_conn_ack) { zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Bad message length"); return; } if (ep->ze_creq_try == 0) { return; } // Do we already have a matching pipe? If so, we can discard // the operation. This should not happen, since we normally, // deregister the endpoint when we create the pipe. if ((zt_hash_find(ztn->zn_lpipes, ep->ze_laddr, (void **) &p)) == 0) { return; } if ((rv = zt_pipe_alloc(&p, ep, raddr, ep->ze_laddr, false)) != 0) { // We couldn't create the pipe, just drop it. nni_aio_finish_error(aio, rv); return; } NNI_GET16(data + zt_offset_cack_proto, p->zp_peer); // Reset the address of the endpoint, so that the next call to // ep_connect will bind a new one -- we are using this one for the // pipe. zt_hash_remove(ztn->zn_eps, ep->ze_laddr); ep->ze_laddr = 0; nni_aio_set_output(aio, 0, p); nni_aio_finish(aio, 0, 0); } static void zt_ep_recv_conn_req(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) { zt_node *ztn = ep->ze_ztn; zt_pipe *p; int i; if (ep->ze_nlistener == NULL) { zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Inappropriate operation"); return; } if (len != zt_size_conn_req) { zt_send_err(ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Bad message length"); return; } // If we already have created a pipe for this connection // then just reply the conn ack. if ((zt_hash_find(ztn->zn_rpipes, raddr, (void **) &p)) == 0) { zt_pipe_send_conn_ack(p); return; } // We may already have a connection request queued (if this was // a resend for example); if that's the case we just ignore // this one. for (i = ep->ze_creq_tail; i != ep->ze_creq_head; i++) { if (ep->ze_creqs[i % zt_listenq].cr_raddr == raddr) { return; } } // We may already have filled our listenq, in which case we just drop. if ((ep->ze_creq_tail + zt_listenq) == ep->ze_creq_head) { // We have taken as many as we can, so just drop it. return; } // Record the connection request, and then process any // pending acceptors. i = ep->ze_creq_head % zt_listenq; NNI_GET16(data + zt_offset_creq_proto, ep->ze_creqs[i].cr_proto); ep->ze_creqs[i].cr_raddr = raddr; ep->ze_creqs[i].cr_expire = nni_clock() + zt_listen_expire; ep->ze_creq_head++; zt_ep_doaccept(ep); } static void zt_ep_recv_error(zt_ep *ep, const uint8_t *data, size_t len) { int code; // Most of the time we don't care about errors. The exception here // is that when we have an outstanding CON_REQ, we would like to // process that appropriately. if (ep->ze_ndialer == NULL) { // Not a dialer. Drop it. return; } if (len < zt_offset_err_msg) { // Malformed error frame. return; } code = data[zt_offset_err_code]; switch (code) { case zt_err_refused: code = NNG_ECONNREFUSED; break; case zt_err_notconn: code = NNG_ECLOSED; break; case zt_err_wrongsp: code = NNG_EPROTO; break; default: code = NNG_ETRANERR; break; } if (ep->ze_creq_active) { ep->ze_creq_try = 0; ep->ze_creq_active = 0; nni_aio_finish_error(ep->ze_creq_aio, code); } } static void zt_ep_virtual_recv( zt_ep *ep, uint8_t op, uint64_t raddr, const uint8_t *data, size_t len) { // Only listeners should be receiving. Dialers receive on the pipe, // rather than the endpoint. The only message that endpoints can // receive are connection requests. switch (op) { case zt_op_conn_req: zt_ep_recv_conn_req(ep, raddr, data, len); return; case zt_op_conn_ack: zt_ep_recv_conn_ack(ep, raddr, data, len); return; case zt_op_error: zt_ep_recv_error(ep, data, len); return; default: zt_send_err(ep->ze_ztn, ep->ze_nwid, raddr, ep->ze_laddr, zt_err_proto, "Bad operation"); return; } } static void zt_pipe_close_err(zt_pipe *p, int err, uint8_t code, const char *msg) { nni_aio *aio; if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; nni_aio_finish_error(aio, err); } nni_aio_close(p->zp_ping_aio); p->zp_closed = true; if (msg != NULL) { zt_pipe_send_err(p, code, msg); } } static void zt_pipe_recv_data(zt_pipe *p, const uint8_t *data, size_t len) { uint16_t msgid; uint16_t fragno; uint16_t nfrags; size_t fragsz; zt_fraglist *fl; int i; int slot; uint8_t bit; uint8_t * body; if (len < zt_size_data) { // Runt frame. Drop it and close pipe with a protocol error. zt_pipe_close_err(p, NNG_EPROTO, zt_err_proto, "Runt frame"); return; } NNI_GET16(data + zt_offset_data_id, msgid); NNI_GET16(data + zt_offset_data_fragsz, fragsz); NNI_GET16(data + zt_offset_data_frag, fragno); NNI_GET16(data + zt_offset_data_nfrag, nfrags); len -= zt_offset_data_data; data += zt_offset_data_data; // Check for cases where message size is clearly too large. Note // that we only can catch the case where a message is larger by // more than a fragment, since the final fragment may be shorter, // and we won't know that until we receive it. if ((p->zp_rcvmax > 0) && ((nfrags * fragsz) >= (p->zp_rcvmax + fragsz))) { // Discard, as the forwarder might be on the other side // of a device. This is gentler than just shutting the pipe // down. Sending a remote error might be polite, but since // most peers will close the pipe on such an error, we // simply silently discard it. return; } // We run the recv logic once, to clear stale fragment entries. zt_pipe_dorecv(p); // Find a suitable fragment slot. slot = -1; for (i = 0; i < zt_recvq; i++) { fl = &p->zp_recvq[i]; // This was our message ID, we always use it. if (msgid == fl->fl_msgid) { slot = i; break; } if (slot < 0) { slot = i; } else if (fl->fl_time < p->zp_recvq[slot].fl_time) { // This has an earlier expiration, so lets choose it. slot = i; } } NNI_ASSERT(slot >= 0); fl = &p->zp_recvq[slot]; if (fl->fl_msgid != msgid) { // First fragment we've received for this message (but might // not be first fragment for message!) zt_fraglist_clear(fl); if (nni_msg_alloc(&fl->fl_msg, nfrags * fragsz) != 0) { // Out of memory. We don't close the pipe, but // just fail to receive the message. Bump a stat? return; } fl->fl_nfrags = nfrags; fl->fl_fragsz = fragsz; fl->fl_msgid = msgid; fl->fl_time = nni_clock(); // Set the missing mask. memset(fl->fl_missing, 0xff, nfrags / 8); fl->fl_missing[nfrags / 8] |= ((1 << (nfrags % 8)) - 1); } if ((nfrags != fl->fl_nfrags) || (fragsz != fl->fl_fragsz) || (fragno >= nfrags) || (fragsz == 0) || (nfrags == 0) || ((fragno != (nfrags - 1)) && (len != fragsz))) { // Protocol error, message parameters changed. zt_pipe_close_err( p, NNG_EPROTO, zt_err_proto, "Invalid message parameters"); zt_fraglist_clear(fl); return; } bit = (uint8_t) (1 << (fragno % 8)); if ((fl->fl_missing[fragno / 8] & bit) == 0) { // We've already got this fragment, ignore it. We don't // bother to check for changed data. return; } fl->fl_missing[fragno / 8] &= ~(bit); body = nni_msg_body(fl->fl_msg); body += fragno * fragsz; memcpy(body, data, len); if (fragno == (nfrags - 1)) { // Last frag, maybe shorten the message. nni_msg_chop(fl->fl_msg, (fragsz - len)); if ((nni_msg_len(fl->fl_msg) > p->zp_rcvmax) && (p->zp_rcvmax > 0)) { // Strict enforcement of max recv. zt_fraglist_clear(fl); // Just discard the message. return; } } for (i = 0; i < ((nfrags + 7) / 8); i++) { if (fl->fl_missing[i]) { return; } } // We got all fragments... try to send it up. fl->fl_ready = 1; zt_pipe_dorecv(p); } static void zt_pipe_recv_ping(zt_pipe *p, const uint8_t *data, size_t len) { NNI_ARG_UNUSED(data); if (len != zt_size_ping) { zt_pipe_send_err(p, zt_err_proto, "Incorrect ping size"); return; } zt_pipe_send_pong(p); } static void zt_pipe_recv_pong(zt_pipe *p, const uint8_t *data, size_t len) { NNI_ARG_UNUSED(data); if (len != zt_size_pong) { zt_pipe_send_err(p, zt_err_proto, "Incorrect pong size"); } } static void zt_pipe_recv_disc_req(zt_pipe *p, const uint8_t *data, size_t len) { nni_aio *aio; NNI_ARG_UNUSED(data); NNI_ARG_UNUSED(len); // NB: lock held already. // Don't bother to check the length, going to disconnect anyway. if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; p->zp_closed = true; nni_aio_finish_error(aio, NNG_ECLOSED); } } static void zt_pipe_recv_error(zt_pipe *p, const uint8_t *data, size_t len) { nni_aio *aio; NNI_ARG_UNUSED(data); NNI_ARG_UNUSED(len); // Perhaps we should log an error message, but at the end of // the day, the details are just not that interesting. if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; p->zp_closed = true; nni_aio_finish_error(aio, NNG_ETRANERR); } } // This function is called when we have determined that a frame has // arrived for a pipe. The remote and local addresses were both // matched by the caller. static void zt_pipe_virtual_recv(zt_pipe *p, uint8_t op, const uint8_t *data, size_t len) { // We got data, so update our recv time. p->zp_last_recv = nni_clock(); p->zp_ping_try = 0; switch (op) { case zt_op_data: zt_pipe_recv_data(p, data, len); return; case zt_op_disc_req: zt_pipe_recv_disc_req(p, data, len); return; case zt_op_ping: zt_pipe_recv_ping(p, data, len); return; case zt_op_pong: zt_pipe_recv_pong(p, data, len); return; case zt_op_error: zt_pipe_recv_error(p, data, len); return; case zt_op_conn_req: zt_pipe_send_conn_ack(p); return; } } // This function is called when a frame arrives on the // *virtual* network. static void zt_virtual_recv(ZT_Node *node, void *userptr, void *thr, uint64_t nwid, void **netptr, uint64_t srcmac, uint64_t dstmac, unsigned int ethertype, unsigned int vlanid, const void *payload, unsigned int len) { zt_node * ztn = userptr; uint8_t op; const uint8_t *data = payload; uint16_t version; uint32_t rport; uint32_t lport; zt_ep * ep; zt_pipe * p; uint64_t raddr; uint64_t laddr; NNI_ARG_UNUSED(node); NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(netptr); if ((ethertype != zt_ethertype) || (len < zt_size_headers) || (data[zt_offset_flags] != 0) || (data[zt_offset_zero1] != 0) || (data[zt_offset_zero2] != 0)) { return; } NNI_GET16(data + zt_offset_version, version); if (version != zt_version) { return; } if (vlanid != 0) { // for now we only use vlan 0. return; } op = data[zt_offset_op]; ZT_GET24(data + zt_offset_dst_port, lport); ZT_GET24(data + zt_offset_src_port, rport); raddr = zt_mac_to_node(srcmac, nwid); raddr <<= 24; raddr |= rport; laddr = zt_mac_to_node(dstmac, nwid); laddr <<= 24; laddr |= lport; // NB: We are holding the zt_lock. // Look up a pipe, but also we use this chance to check that // the source address matches what the pipe was established with. // If the pipe does not match then we nak it. Note that pipes can // appear on the znode twice (loopback), so we have to be careful // to check the entire set of parameters, and to check for server // vs. client pipes separately. // If its a local address match on a client pipe, process it. if ((zt_hash_find(ztn->zn_lpipes, laddr, (void *) &p) == 0) && (p->zp_nwid == nwid) && (p->zp_raddr == raddr)) { zt_pipe_virtual_recv(p, op, data, len); return; } // If its a remote address match on a server pipe, process it. if ((zt_hash_find(ztn->zn_rpipes, raddr, (void *) &p) == 0) && (p->zp_nwid == nwid) && (p->zp_laddr == laddr)) { zt_pipe_virtual_recv(p, op, data, len); return; } // No pipe, so look for an endpoint. if ((zt_hash_find(ztn->zn_eps, laddr, (void **) &ep) == 0) && (ep->ze_nwid == nwid)) { // direct this to an endpoint. zt_ep_virtual_recv(ep, op, raddr, data, len); return; } // We have a request for which we have no listener, and no // pipe. For some of these we send back a NAK, but for others // we just drop the frame. switch (op) { case zt_op_conn_req: // No listener. Connection refused. zt_send_err(ztn, nwid, raddr, laddr, zt_err_refused, "Connection refused"); return; case zt_op_data: case zt_op_ping: case zt_op_conn_ack: zt_send_err(ztn, nwid, raddr, laddr, zt_err_notconn, "Connection not found"); break; case zt_op_error: case zt_op_pong: case zt_op_disc_req: default: // Just drop these. break; } } static void zt_event_cb(ZT_Node *node, void *userptr, void *thr, enum ZT_Event event, const void *payload) { NNI_ARG_UNUSED(node); NNI_ARG_UNUSED(userptr); NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(payload); switch (event) { case ZT_EVENT_ONLINE: // Connected to the virtual net. case ZT_EVENT_UP: // Node initialized (may not be connected). case ZT_EVENT_DOWN: // Teardown of the node. case ZT_EVENT_OFFLINE: // Removal of the node from the net. case ZT_EVENT_TRACE: // Local trace events. // printf("TRACE: %s\n", (const char *) payload); break; case ZT_EVENT_REMOTE_TRACE: // Remote trace, not supported. default: break; } } static const char *zt_files[] = { // clang-format off NULL, // none, i.e. not used at all "identity.public", "identity.secret", "planet", "moon.%llx", NULL, // peer, e.g. peers.d/ -- we don't persist this "network.%llx", // clang-format on }; static struct { size_t len; void * data; } zt_ephemeral_state[ZT_STATE_OBJECT_NETWORK_CONFIG + 1]; static void zt_state_put(ZT_Node *node, void *userptr, void *thr, enum ZT_StateObjectType objtype, const uint64_t objid[2], const void *data, int len) { zt_node *ztn = userptr; char * path; const char *template; char fname[32]; NNI_ARG_UNUSED(node); NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(objid); // only use global files if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) || ((template = zt_files[(int) objtype]) == NULL)) { return; } (void) snprintf(fname, sizeof(fname), template, (unsigned long long) objid[0], (unsigned long long) objid[1]); // If we have no valid path, then we just use ephemeral data. // Note that for moons, and so forth, we wind up just storing them // all in the same place, but it does not matter since we don't // really persist them anyway. if (strlen(ztn->zn_path) == 0) { void * ndata = NULL; void * odata = zt_ephemeral_state[objtype].data; size_t olen = zt_ephemeral_state[objtype].len; if ((len >= 0) && ((ndata = nni_alloc(len)) != NULL)) { memcpy(ndata, data, len); zt_ephemeral_state[objtype].data = ndata; zt_ephemeral_state[objtype].len = len; } else if (len < 0) { zt_ephemeral_state[objtype].data = NULL; zt_ephemeral_state[objtype].len = 0; } if (olen > 0) { nni_free(odata, olen); } return; } if ((path = nni_file_join(ztn->zn_path, fname)) == NULL) { return; } if (len < 0) { (void) nni_file_delete(path); } else { (void) nni_file_put(path, data, len); } nni_strfree(path); } static int zt_state_get(ZT_Node *node, void *userptr, void *thr, enum ZT_StateObjectType objtype, const uint64_t objid[2], void *data, unsigned int len) { zt_node *ztn = userptr; char * path; char fname[32]; const char *template; size_t sz; void * buf; NNI_ARG_UNUSED(node); NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(objid); // we only use global files if ((objtype > ZT_STATE_OBJECT_NETWORK_CONFIG) || ((template = zt_files[(int) objtype]) == NULL)) { return (-1); } snprintf(fname, sizeof(fname), template, objid[0], objid[1]); // If no base directory, we are using ephemeral data. if (strlen(ztn->zn_path) == 0) { if (zt_ephemeral_state[objtype].data == NULL) { return (-1); } if (zt_ephemeral_state[objtype].len > len) { return (-1); } len = zt_ephemeral_state[objtype].len; memcpy(data, zt_ephemeral_state[objtype].data, len); return (len); } if ((path = nni_file_join(ztn->zn_path, fname)) == NULL) { return (-1); } if (nni_file_get(path, &buf, &sz) != 0) { nni_strfree(path); return (-1); } nni_strfree(path); if (sz > len) { nni_free(buf, sz); return (-1); } memcpy(data, buf, sz); nni_free(buf, sz); return ((int) sz); } typedef struct zt_send_hdr { nni_sockaddr sa; size_t len; } zt_send_hdr; // This function is called when ZeroTier desires to send a // physical frame. The data is a UDP payload, the rest of the // payload should be set over vanilla UDP. static int zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket, const struct sockaddr_storage *remaddr, const void *data, unsigned int len, unsigned int ttl) { nni_aio * aio; nni_sockaddr addr; struct sockaddr_in * sin = (void *) remaddr; struct sockaddr_in6 *sin6 = (void *) remaddr; zt_node * ztn = userptr; nni_plat_udp * udp; uint8_t * buf; zt_send_hdr * hdr; nni_iov iov; NNI_ARG_UNUSED(node); NNI_ARG_UNUSED(thr); NNI_ARG_UNUSED(socket); NNI_ARG_UNUSED(ttl); // Kind of unfortunate, but we have to convert the // sockaddr to a neutral form, and then back again in // the platform layer. switch (sin->sin_family) { case AF_INET: addr.s_in.sa_family = NNG_AF_INET; addr.s_in.sa_port = sin->sin_port; addr.s_in.sa_addr = sin->sin_addr.s_addr; udp = ztn->zn_udp4; break; case AF_INET6: addr.s_in6.sa_family = NNG_AF_INET6; addr.s_in6.sa_port = sin6->sin6_port; udp = ztn->zn_udp6; memcpy(addr.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); break; default: // No way to understand the address. return (-1); } if (nni_aio_alloc(&aio, NULL, NULL) != 0) { // Out of memory return (-1); } if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) { nni_aio_free(aio); return (-1); } hdr = (void *) buf; buf += sizeof(*hdr); memcpy(buf, data, len); hdr->sa = addr; hdr->len = len; nni_aio_set_input(aio, 0, &hdr->sa); iov.iov_buf = buf; iov.iov_len = len; nni_aio_set_iov(aio, 1, &iov); // This should be non-blocking/best-effort, so while // not great that we're holding the lock, also not tragic. nni_plat_udp_send(udp, aio); // UDP sending is "fast" on all platforms -- given that its // best effort only, this will complete immediately, resulting // in either a message on the wire, or a discarded frame. We don't // care which. (There may be a few thread context switches, but // none of them are going to have to wait for some unbounded time.) nni_aio_wait(aio); nni_aio_free(aio); nni_free(hdr, hdr->len + sizeof(*hdr)); return (0); } static struct ZT_Node_Callbacks zt_callbacks = { .version = 0, .statePutFunction = zt_state_put, .stateGetFunction = zt_state_get, .wirePacketSendFunction = zt_wire_packet_send, .virtualNetworkFrameFunction = zt_virtual_recv, .virtualNetworkConfigFunction = zt_virtual_config, .eventCallback = zt_event_cb, .pathCheckFunction = NULL, .pathLookupFunction = NULL, }; static void zt_node_destroy(zt_node *ztn) { nni_aio_stop(ztn->zn_rcv4_aio); nni_aio_stop(ztn->zn_rcv6_aio); // Wait for background thread to exit! nni_thr_fini(&ztn->zn_bgthr); if (ztn->zn_znode != NULL) { ZT_Node_delete(ztn->zn_znode); } if (ztn->zn_udp4 != NULL) { nni_plat_udp_close(ztn->zn_udp4); } if (ztn->zn_udp6 != NULL) { nni_plat_udp_close(ztn->zn_udp6); } if (ztn->zn_rcv4_buf != NULL) { nni_free(ztn->zn_rcv4_buf, zt_rcv_bufsize); } if (ztn->zn_rcv6_buf != NULL) { nni_free(ztn->zn_rcv6_buf, zt_rcv_bufsize); } if (ztn->zn_flock != NULL) { nni_file_unlock(ztn->zn_flock); } nni_aio_free(ztn->zn_rcv4_aio); nni_aio_free(ztn->zn_rcv6_aio); zt_hash_fini(ztn->zn_eps); zt_hash_fini(ztn->zn_lpipes); zt_hash_fini(ztn->zn_rpipes); nni_cv_fini(&ztn->zn_bgcv); NNI_FREE_STRUCT(ztn); } static int zt_node_create(zt_node **ztnp, const char *path) { zt_node * ztn; nng_sockaddr sa4; nng_sockaddr sa6; int rv; enum ZT_ResultCode zrv; nni_iov iov; // XXX: Right now we depend on having both IPv6 and IPv4 available. // Probably we should support coping with the lack of either of them. // We want to bind to any address we can (for now). memset(&sa4, 0, sizeof(sa4)); sa4.s_in.sa_family = NNG_AF_INET; memset(&sa6, 0, sizeof(sa6)); sa6.s_in6.sa_family = NNG_AF_INET6; if ((ztn = NNI_ALLOC_STRUCT(ztn)) == NULL) { return (NNG_ENOMEM); } NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link); NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link); nni_cv_init(&ztn->zn_bgcv, &zt_lk); nni_aio_alloc(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn); nni_aio_alloc(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn); if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) || ((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) { zt_node_destroy(ztn); return (NNG_ENOMEM); } if (((rv = zt_hash_init(&ztn->zn_ports)) != 0) || ((rv = zt_hash_init(&ztn->zn_eps)) != 0) || ((rv = zt_hash_init(&ztn->zn_lpipes)) != 0) || ((rv = zt_hash_init(&ztn->zn_rpipes)) != 0) || ((rv = nni_thr_init(&ztn->zn_bgthr, zt_bgthr, ztn)) != 0) || ((rv = nni_plat_udp_open(&ztn->zn_udp4, &sa4)) != 0) || ((rv = nni_plat_udp_open(&ztn->zn_udp6, &sa6)) != 0)) { zt_node_destroy(ztn); return (rv); } nni_thr_set_name(&ztn->zn_bgthr, "nng:zt"); if (strlen(path) > 0) { char *lkfile; if ((lkfile = nni_file_join(path, "lock")) == NULL) { zt_node_destroy(ztn); return (NNG_ENOMEM); } if ((rv = nni_file_lock(lkfile, &ztn->zn_flock)) != 0) { zt_node_destroy(ztn); nni_strfree(lkfile); return (rv); } nni_strfree(lkfile); } // Setup for dynamic ephemeral port allocations. We // set the range to allow for ephemeral ports, but not // higher than the max port, and starting with an // initial random value. Note that this should give us // about 8 million possible ephemeral ports. zt_hash_limits(ztn->zn_ports, zt_ephemeral, zt_max_port, (nni_random() % (zt_max_port - zt_ephemeral)) + zt_ephemeral); nni_strlcpy(ztn->zn_path, path, sizeof(ztn->zn_path)); zrv = ZT_Node_new(&ztn->zn_znode, ztn, NULL, &zt_callbacks, zt_now()); if (zrv != ZT_RESULT_OK) { zt_node_destroy(ztn); return (zt_result(zrv)); } nni_list_append(&zt_nodes, ztn); ztn->zn_self = ZT_Node_address(ztn->zn_znode); nni_thr_run(&ztn->zn_bgthr); // Schedule an initial background run. zt_node_resched(ztn, 1); // Schedule receive iov.iov_buf = ztn->zn_rcv4_buf; iov.iov_len = zt_rcv_bufsize; nni_aio_set_iov(ztn->zn_rcv4_aio, 1, &iov); nni_aio_set_input(ztn->zn_rcv4_aio, 0, &ztn->zn_rcv4_addr); iov.iov_buf = ztn->zn_rcv6_buf; iov.iov_len = zt_rcv_bufsize; nni_aio_set_iov(ztn->zn_rcv6_aio, 1, &iov); nni_aio_set_input(ztn->zn_rcv6_aio, 0, &ztn->zn_rcv6_addr); nni_plat_udp_recv(ztn->zn_udp4, ztn->zn_rcv4_aio); nni_plat_udp_recv(ztn->zn_udp6, ztn->zn_rcv6_aio); *ztnp = ztn; return (0); } static int zt_walk_moons(const char *path, void *arg) { zt_node * ztn = arg; const char *bn = nni_file_basename(path); char * end; uint64_t moonid; if (strncmp(bn, "moon.", 5) != 0) { return (NNI_FILE_WALK_CONTINUE); } if (((moonid = (uint64_t) strtoull(bn + 5, &end, 16)) != 0) && (*end == '\0')) { ZT_Node_orbit(ztn->zn_znode, NULL, moonid, 0); } return (NNI_FILE_WALK_CONTINUE); } static int zt_node_find(zt_ep *ep) { zt_node * ztn; int rv; ZT_VirtualNetworkConfig *cf; NNI_LIST_FOREACH (&zt_nodes, ztn) { if (strcmp(ep->ze_home, ztn->zn_path) == 0) { goto done; } } // We didn't find a node, so make one. And try to // initialize it. if ((rv = zt_node_create(&ztn, ep->ze_home)) != 0) { return (rv); } // Load moons if (strlen(ep->ze_home) != 0) { (void) nni_file_walk(ep->ze_home, zt_walk_moons, ztn, NNI_FILE_WALK_FILES_ONLY | NNI_FILE_WALK_SHALLOW); } done: ep->ze_ztn = ztn; if (nni_list_node_active(&ep->ze_link)) { nni_list_node_remove(&ep->ze_link); } nni_list_append(&ztn->zn_eplist, ep); (void) ZT_Node_join(ztn->zn_znode, ep->ze_nwid, ztn, NULL); if ((cf = ZT_Node_networkConfig(ztn->zn_znode, ep->ze_nwid)) != NULL) { NNI_ASSERT(cf->nwid == ep->ze_nwid); ep->ze_mtu = cf->mtu; ZT_Node_freeQueryResult(ztn->zn_znode, cf); } return (0); } static void zt_tran_init(void) { nni_mtx_init(&zt_lk); NNI_LIST_INIT(&zt_nodes, zt_node, zn_link); } static void zt_tran_fini(void) { zt_node *ztn; nni_mtx_lock(&zt_lk); while ((ztn = nni_list_first(&zt_nodes)) != 0) { nni_list_remove(&zt_nodes, ztn); ztn->zn_closed = true; nni_cv_wake(&ztn->zn_bgcv); nni_mtx_unlock(&zt_lk); zt_node_destroy(ztn); nni_mtx_lock(&zt_lk); } nni_mtx_unlock(&zt_lk); for (int i = 0; i <= ZT_STATE_OBJECT_NETWORK_CONFIG; i++) { if (zt_ephemeral_state[i].len > 0) { nni_free(zt_ephemeral_state[i].data, zt_ephemeral_state[i].len); } } NNI_ASSERT(nni_list_empty(&zt_nodes)); nni_mtx_fini(&zt_lk); } static int zt_check_recvmaxsz(const void *v, size_t sz, nni_type t) { return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } static int zt_check_orbit(const void *v, size_t sz, nni_type t) { NNI_ARG_UNUSED(v); if ((t != NNI_TYPE_UINT64) && (t != NNI_TYPE_OPAQUE)) { return (NNG_EBADTYPE); } if (sz != sizeof(uint64_t) && sz != sizeof(uint64_t) * 2) { return (NNG_EINVAL); } return (0); } static int zt_check_deorbit(const void *v, size_t sz, nni_type t) { return (nni_copyin_u64(NULL, v, sz, t)); } static int zt_check_string(const void *v, size_t sz, nni_type t) { size_t len; if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) { return (NNG_EBADTYPE); } len = nni_strnlen(v, sz); if ((len >= sz) || (len >= NNG_MAXADDRLEN)) { return (NNG_EINVAL); } return (0); } static int zt_check_time(const void *v, size_t sz, nni_type t) { return (nni_copyin_ms(NULL, v, sz, t)); } static int zt_check_tries(const void *v, size_t sz, nni_type t) { return (nni_copyin_int(NULL, v, sz, 0, 1000000, t)); } static void zt_pipe_close(void *arg) { zt_pipe *p = arg; nni_aio *aio; nni_mtx_lock(&zt_lk); p->zp_closed = true; nni_aio_close(p->zp_ping_aio); if ((aio = p->zp_user_rxaio) != NULL) { p->zp_user_rxaio = NULL; nni_aio_finish_error(aio, NNG_ECLOSED); } zt_pipe_send_disc_req(p); nni_mtx_unlock(&zt_lk); } static int zt_pipe_init(void *arg, nni_pipe *npipe) { zt_pipe *p = arg; p->zp_npipe = npipe; return (0); } static void zt_pipe_fini(void *arg) { zt_pipe *p = arg; zt_node *ztn = p->zp_ztn; nni_aio_free(p->zp_ping_aio); // This tosses the connection details and all state. nni_mtx_lock(&zt_lk); zt_hash_remove(ztn->zn_ports, p->zp_laddr & zt_port_mask); zt_hash_remove(ztn->zn_lpipes, p->zp_laddr); zt_hash_remove(ztn->zn_rpipes, p->zp_raddr); nni_mtx_unlock(&zt_lk); for (int i = 0; i < zt_recvq; i++) { zt_fraglist_free(&p->zp_recvq[i]); } nni_free(p->zp_send_buf, ZT_MAX_MTU); NNI_FREE_STRUCT(p); } static nni_reap_list zt_reap_list = { .rl_offset = offsetof(zt_pipe, zp_reap), .rl_func = zt_pipe_fini, }; static void zt_pipe_reap(zt_pipe *p) { if (!nni_atomic_flag_test_and_set(&p->zp_reaped)) { nni_reap(&zt_reap_list, p); } } static int zt_pipe_alloc( zt_pipe **pipep, zt_ep *ep, uint64_t raddr, uint64_t laddr, bool listener) { zt_pipe *p; int rv; zt_node *ztn = ep->ze_ztn; int i; size_t maxfrag; size_t maxfrags = 0; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if ((p->zp_send_buf = nni_alloc(ZT_MAX_MTU)) == NULL) { NNI_FREE_STRUCT(p); return (NNG_ENOMEM); } p->zp_ztn = ztn; p->zp_raddr = raddr; p->zp_laddr = laddr; p->zp_proto = ep->ze_proto; p->zp_nwid = ep->ze_nwid; p->zp_mtu = ep->ze_mtu; p->zp_rcvmax = ep->ze_rcvmax; p->zp_ping_tries = ep->ze_ping_tries; p->zp_ping_time = ep->ze_ping_time; p->zp_next_msgid = (uint16_t) nni_random(); p->zp_ping_try = 0; nni_atomic_flag_reset(&p->zp_reaped); if (listener) { // listener rv = zt_hash_insert(ztn->zn_rpipes, raddr, p); } else { // dialer rv = zt_hash_insert(ztn->zn_lpipes, laddr, p); } if ((rv != 0) || ((rv = nni_aio_alloc(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) { zt_pipe_reap(p); return (rv); } // The largest fragment we can accept on this pipe. The MTU is // configurable by the network administrator. Probably ZT would // pass a larger one (up to MAXMTU), but we honor the network // administration's configuration. maxfrag = p->zp_mtu - zt_offset_data_data; // The largest fragment count we can accept on this pipe. // This is rounded up to account for alignment. if (p->zp_rcvmax > 0) { maxfrags = (p->zp_rcvmax + (maxfrag - 1)) / maxfrag; } if ((maxfrags > 0xffff) || (maxfrags == 0)) { maxfrags = 0xffff; } for (i = 0; i < zt_recvq; i++) { zt_fraglist *fl = &p->zp_recvq[i]; fl->fl_time = NNI_TIME_ZERO; fl->fl_msgid = 0; fl->fl_ready = 0; fl->fl_missingsz = (maxfrags + 7) / 8; fl->fl_missing = nni_alloc(fl->fl_missingsz); if (fl->fl_missing == NULL) { zt_pipe_reap(p); return (NNG_ENOMEM); } } *pipep = p; return (0); } static void zt_pipe_send(void *arg, nni_aio *aio) { // As we are sending UDP, and there is no callback to worry // about, we just go ahead and send out a stream of messages // synchronously. zt_pipe *p = arg; uint8_t *data = p->zp_send_buf; size_t offset; uint16_t id; uint16_t nfrags; uint16_t fragno; size_t fragsz; size_t bytes; size_t msg_header_len; size_t msg_len; nni_msg *m; if (nni_aio_begin(aio) != 0) { return; } if ((m = nni_aio_get_msg(aio)) == NULL) { nni_aio_finish_error(aio, NNG_EINVAL); return; } nni_mtx_lock(&zt_lk); if (p->zp_closed) { nni_mtx_unlock(&zt_lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } fragsz = p->zp_mtu - zt_offset_data_data; NNI_ASSERT(fragsz < 0x10000); // Because zp_mtu is 16 bits msg_header_len = nni_msg_header_len(m); msg_len = nni_msg_len(m); bytes = msg_header_len + msg_len; if (bytes >= (0xfffe * fragsz)) { nni_aio_finish_error(aio, NNG_EMSGSIZE); nni_mtx_unlock(&zt_lk); return; } // above check means nfrags will fit in 16-bits. nfrags = (uint16_t) ((bytes + (fragsz - 1)) / fragsz); // get the next message ID, but skip 0 if ((id = p->zp_next_msgid++) == 0) { id = p->zp_next_msgid++; } offset = 0; fragno = 0; do { uint8_t *dest = data + zt_offset_data_data; size_t room = fragsz; size_t fraglen = 0; size_t len; // Prepend the header first. if ((!offset) && (msg_header_len > 0)) { if (msg_header_len > fragsz) { // This shouldn't happen! SP headers are // supposed to be quite small. nni_aio_finish_error(aio, NNG_EMSGSIZE); nni_mtx_unlock(&zt_lk); return; } memcpy(dest, nni_msg_header(m), msg_header_len); dest += msg_header_len; room -= msg_header_len; offset += msg_header_len; fraglen += msg_header_len; } len = msg_header_len + msg_len - offset; if (len > room) { len = room; } memcpy(dest, nni_msg_body(m) + offset - msg_header_len, len); NNI_PUT16(data + zt_offset_data_id, id); NNI_PUT16(data + zt_offset_data_fragsz, (uint16_t) fragsz); NNI_PUT16(data + zt_offset_data_frag, fragno); NNI_PUT16(data + zt_offset_data_nfrag, nfrags); offset += len; fraglen += len; fragno++; zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr, p->zp_laddr, data, fraglen + zt_offset_data_data); } while (msg_header_len + msg_len - offset != 0); nni_mtx_unlock(&zt_lk); // NB, We never bothered to call nn_aio_sched, because we run this // synchronously, relying on UDP to simply discard messages if we // cannot deliver them. This means that pipe send operations with // this transport are not cancellable. nni_aio_set_msg(aio, NULL); nni_msg_free(m); nni_aio_finish(aio, 0, offset); } static void zt_pipe_cancel_recv(nni_aio *aio, void *arg, int rv) { zt_pipe *p = arg; nni_mtx_lock(&zt_lk); if (p->zp_user_rxaio == aio) { p->zp_user_rxaio = NULL; nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&zt_lk); } static void zt_fraglist_clear(zt_fraglist *fl) { nni_msg *msg; fl->fl_ready = 0; fl->fl_msgid = 0; fl->fl_time = NNI_TIME_ZERO; if ((msg = fl->fl_msg) != NULL) { fl->fl_msg = NULL; nni_msg_free(msg); } memset(fl->fl_missing, 0, fl->fl_missingsz); } static void zt_fraglist_free(zt_fraglist *fl) { zt_fraglist_clear(fl); nni_free(fl->fl_missing, fl->fl_missingsz); fl->fl_missing = NULL; } static void zt_pipe_dorecv(zt_pipe *p) { nni_aio *aio = p->zp_user_rxaio; nni_time now = nni_clock(); if (aio == NULL) { return; } for (int i = 0; i < zt_recvq; i++) { zt_fraglist *fl = &p->zp_recvq[i]; nni_msg * msg; if (now > (fl->fl_time + zt_recv_stale)) { // fragment list is stale, clean it. zt_fraglist_clear(fl); continue; } if (!fl->fl_ready) { continue; } // Got data. Let's pass it up. msg = fl->fl_msg; fl->fl_msg = NULL; NNI_ASSERT(msg != NULL); p->zp_user_rxaio = NULL; nni_aio_finish_msg(aio, msg); zt_fraglist_clear(fl); return; } } static void zt_pipe_recv(void *arg, nni_aio *aio) { zt_pipe *p = arg; int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&zt_lk); if (p->zp_closed) { nni_mtx_unlock(&zt_lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } if ((rv = nni_aio_schedule(aio, zt_pipe_cancel_recv, p)) != 0) { nni_mtx_unlock(&zt_lk); nni_aio_finish_error(aio, rv); return; } p->zp_user_rxaio = aio; zt_pipe_dorecv(p); nni_mtx_unlock(&zt_lk); } static uint16_t zt_pipe_peer(void *arg) { zt_pipe *pipe = arg; return (pipe->zp_peer); } static int zt_get_nw_status(zt_node *ztn, uint64_t nwid, int *statusp) { ZT_VirtualNetworkConfig *vcfg; int status; vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid); if (vcfg == NULL) { return (NNG_ECLOSED); } switch (vcfg->status) { case ZT_NETWORK_STATUS_REQUESTING_CONFIGURATION: status = NNG_ZT_STATUS_CONFIG; break; case ZT_NETWORK_STATUS_OK: status = NNG_ZT_STATUS_UP; break; case ZT_NETWORK_STATUS_ACCESS_DENIED: status = NNG_ZT_STATUS_DENIED; break; case ZT_NETWORK_STATUS_NOT_FOUND: status = NNG_ZT_STATUS_NOTFOUND; break; case ZT_NETWORK_STATUS_PORT_ERROR: status = NNG_ZT_STATUS_ERROR; break; case ZT_NETWORK_STATUS_CLIENT_TOO_OLD: status = NNG_ZT_STATUS_OBSOLETE; break; default: status = NNG_ZT_STATUS_UNKNOWN; break; } ZT_Node_freeQueryResult(ztn->zn_znode, vcfg); *statusp = status; return (0); } static int zt_get_nw_name(zt_node *ztn, uint64_t nwid, void *buf, size_t *szp, nni_type t) { ZT_VirtualNetworkConfig *vcfg; int rv; vcfg = ZT_Node_networkConfig(ztn->zn_znode, nwid); if (vcfg == NULL) { return (NNG_ECLOSED); } rv = nni_copyout_str(vcfg->name, buf, szp, t); ZT_Node_freeQueryResult(ztn->zn_znode, vcfg); return (rv); } static int zt_pipe_get_recvmaxsz(void *arg, void *buf, size_t *szp, nni_type t) { zt_pipe *p = arg; return (nni_copyout_size(p->zp_rcvmax, buf, szp, t)); } static int zt_pipe_get_nwid(void *arg, void *buf, size_t *szp, nni_type t) { zt_pipe *p = arg; return (nni_copyout_u64(p->zp_nwid, buf, szp, t)); } static int zt_pipe_get_node(void *arg, void *buf, size_t *szp, nni_type t) { zt_pipe *p = arg; return (nni_copyout_u64(p->zp_laddr >> 24, buf, szp, t)); } static void zt_pipe_ping_cb(void *arg) { zt_pipe *p = arg; nni_aio *aio = p->zp_ping_aio; int rv; if ((rv = nni_aio_result(aio)) != 0) { // We were canceled. That means we're done. return; } nni_mtx_lock(&zt_lk); if (p->zp_closed || aio == NULL || (p->zp_ping_tries == 0) || (p->zp_ping_time == NNG_DURATION_INFINITE) || (p->zp_ping_time == NNG_DURATION_ZERO)) { nni_mtx_unlock(&zt_lk); return; } if (p->zp_ping_try > p->zp_ping_tries) { // Ping count exceeded; the other side is AFK. // Close the pipe, but no need to send a reason to the peer. zt_pipe_close_err(p, NNG_ECLOSED, 0, NULL); nni_mtx_unlock(&zt_lk); return; } if (nni_clock() > (p->zp_last_recv + p->zp_ping_time)) { p->zp_ping_try++; zt_pipe_send_ping(p); } nni_sleep_aio(p->zp_ping_time, aio); // Schedule a recheck. nni_mtx_unlock(&zt_lk); } static void zt_pipe_start_ping(zt_pipe *p) { // send a gratuitous ping, and start the ping interval timer. if ((p->zp_ping_tries > 0) && (p->zp_ping_time != NNG_DURATION_ZERO) && (p->zp_ping_time != NNG_DURATION_INFINITE)) { p->zp_ping_try = 0; zt_pipe_send_ping(p); nni_sleep_aio(p->zp_ping_time, p->zp_ping_aio); } } static void zt_ep_fini(void *arg) { zt_ep *ep = arg; nni_aio_stop(ep->ze_creq_aio); nni_aio_free(ep->ze_creq_aio); NNI_FREE_STRUCT(ep); } static int zt_parsehex(const char **sp, uint64_t *valp, bool wildok) { int n; const char *s = *sp; char c; uint64_t v; if (wildok && *s == '*') { *valp = 0; s++; *sp = s; return (0); } for (v = 0, n = 0; (n < 16) && isxdigit(c = tolower(*s)); n++, s++) { v *= 16; if (isdigit(c)) { v += (c - '0'); } else { v += ((c - 'a') + 10); } } *sp = s; *valp = v; return (n ? 0 : NNG_EINVAL); } static int zt_parsedec(const char **sp, uint64_t *valp) { int n; const char *s = *sp; char c; uint64_t v; for (v = 0, n = 0; (n < 20) && isdigit(c = *s); n++, s++) { v *= 10; v += (c - '0'); } *sp = s; *valp = v; return (n ? 0 : NNG_EINVAL); } static int zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer, nni_listener *nlistener) { zt_ep * ep; uint64_t node; uint64_t port; int rv; const char *h; if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } ep->ze_mtu = ZT_MIN_MTU; ep->ze_aio = NULL; ep->ze_ping_tries = zt_ping_tries; ep->ze_ping_time = zt_ping_time; ep->ze_conn_time = zt_conn_time; ep->ze_conn_tries = zt_conn_tries; ep->ze_proto = nni_sock_proto_id(sock); ep->ze_ndialer = ndialer; ep->ze_nlistener = nlistener; nni_aio_list_init(&ep->ze_aios); rv = nni_aio_alloc(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep); if (rv != 0) { zt_ep_fini(ep); return (rv); } // Our URL format is: // // zt://.: // // The port must be specified, but may be zero. The nodeid // may be '*' to refer to ourself. There may be a trailing slash // which will be ignored. h = url->u_hostname; if (((strlen(url->u_path) == 1) && (url->u_path[0] != '/')) || (strlen(url->u_path) > 1) || (url->u_fragment != NULL) || (url->u_query != NULL) || (url->u_userinfo != NULL) || (zt_parsehex(&h, &node, true) != 0) || (*h++ != '.') || (zt_parsehex(&h, &ep->ze_nwid, false) != 0) || (node > 0xffffffffffull)) { return (NNG_EADDRINVAL); } h = url->u_port; if ((zt_parsedec(&h, &port) != 0) || (port > zt_max_port)) { return (NNG_EADDRINVAL); } // Parse the URL. if (nlistener != NULL) { // listener ep->ze_laddr = node; ep->ze_laddr <<= 24; ep->ze_laddr |= port; ep->ze_raddr = 0; ep->ze_nlistener = nlistener; } else { // dialer if (port == 0) { return (NNG_EADDRINVAL); } ep->ze_raddr = node; ep->ze_raddr <<= 24; ep->ze_raddr |= port; ep->ze_laddr = 0; ep->ze_ndialer = ndialer; } *epp = ep; return (0); } static int zt_dialer_init(void **epp, nni_url *url, nni_dialer *d) { return (zt_ep_init(epp, url, nni_dialer_sock(d), d, NULL)); } static int zt_listener_init(void **epp, nni_url *url, nni_listener *l) { return (zt_ep_init(epp, url, nni_listener_sock(l), NULL, l)); } static void zt_ep_close(void *arg) { zt_ep * ep = arg; zt_node *ztn; nni_aio *aio; nni_aio_abort(ep->ze_creq_aio, NNG_ECLOSED); // Cancel any outstanding user operation(s) - they should have // been aborted by the above cancellation, but we need to be // sure, as the cancellation callback may not have run yet. nni_mtx_lock(&zt_lk); while ((aio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } // Endpoint framework guarantees to only call us once, // and to not call other things while we are closed. ztn = ep->ze_ztn; // If we're on the ztn node list, pull us off. if (ztn != NULL) { nni_list_node_remove(&ep->ze_link); zt_hash_remove(ztn->zn_ports, ep->ze_laddr & zt_port_mask); zt_hash_remove(ztn->zn_eps, ep->ze_laddr); } nni_mtx_unlock(&zt_lk); } static int zt_ep_bind_locked(zt_ep *ep) { int rv; uint64_t port; uint64_t node; zt_node *ztn; // If we haven't already got a ZT node, get one. if ((ztn = ep->ze_ztn) == NULL) { if ((rv = zt_node_find(ep)) != 0) { return (rv); } ztn = ep->ze_ztn; } node = ep->ze_laddr >> 24; if ((node != 0) && (node != ztn->zn_self)) { // User requested node id, but it doesn't match our // own. return (NNG_EADDRINVAL); } if ((ep->ze_laddr & zt_port_mask) == 0) { // ask for an ephemeral port if ((rv = zt_hash_alloc(ztn->zn_ports, &port, ep)) != 0) { return (rv); } NNI_ASSERT(port & zt_ephemeral); } else { void *conflict; // make sure port requested is free. port = ep->ze_laddr & zt_port_mask; if (zt_hash_find(ztn->zn_ports, port, &conflict) == 0) { return (NNG_EADDRINUSE); } if ((rv = zt_hash_insert(ztn->zn_ports, port, ep)) != 0) { return (rv); } } NNI_ASSERT(port <= zt_max_port); NNI_ASSERT(port > 0); ep->ze_laddr = ztn->zn_self; ep->ze_laddr <<= 24; ep->ze_laddr |= port; ep->ze_running = true; if ((rv = zt_hash_insert(ztn->zn_eps, ep->ze_laddr, ep)) != 0) { zt_hash_remove(ztn->zn_ports, port); return (rv); } return (0); } static int zt_ep_bind(void *arg) { int rv; zt_ep *ep = arg; nni_mtx_lock(&zt_lk); rv = zt_ep_bind_locked(ep); nni_mtx_unlock(&zt_lk); return (rv); } static void zt_ep_cancel(nni_aio *aio, void *arg, int rv) { zt_ep *ep = arg; nni_mtx_lock(&zt_lk); if (nni_aio_list_active(aio)) { if (ep->ze_aio != NULL) { nni_aio_abort(ep->ze_aio, rv); } nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&zt_lk); } static void zt_ep_doaccept(zt_ep *ep) { // Call with ep lock held. nni_time now; zt_pipe *p; int rv; now = nni_clock(); // Consume any timedout connect requests. while (ep->ze_creq_tail != ep->ze_creq_head) { zt_creq creq; nni_aio *aio; creq = ep->ze_creqs[ep->ze_creq_tail % zt_listenq]; // Discard old connection requests. if (creq.cr_expire < now) { ep->ze_creq_tail++; continue; } if ((aio = nni_list_first(&ep->ze_aios)) == NULL) { // No outstanding accept. We're done. break; } // We have both conn request, and a place to accept it. // Advance the tail. ep->ze_creq_tail++; // We remove this AIO. This keeps it from being canceled. nni_aio_list_remove(aio); rv = zt_pipe_alloc(&p, ep, creq.cr_raddr, ep->ze_laddr, true); if (rv != 0) { zt_send_err(ep->ze_ztn, ep->ze_nwid, creq.cr_raddr, ep->ze_laddr, zt_err_unknown, "Failed creating pipe"); nni_aio_finish_error(aio, rv); continue; } p->zp_peer = creq.cr_proto; zt_pipe_send_conn_ack(p); zt_pipe_start_ping(p); nni_aio_set_output(aio, 0, p); nni_aio_finish(aio, 0, 0); } } static void zt_ep_accept(void *arg, nni_aio *aio) { zt_ep *ep = arg; int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&zt_lk); if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) { nni_mtx_unlock(&zt_lk); nni_aio_finish_error(aio, rv); return; } nni_aio_list_append(&ep->ze_aios, aio); zt_ep_doaccept(ep); nni_mtx_unlock(&zt_lk); } static void zt_ep_conn_req_cancel(nni_aio *aio, void *arg, int rv) { zt_ep *ep = arg; // We don't have much to do here. The AIO will have been // canceled as a result of the "parent" AIO canceling. nni_mtx_lock(&zt_lk); if (ep->ze_creq_active) { ep->ze_creq_active = false; nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&zt_lk); } static void zt_ep_conn_req_cb(void *arg) { zt_ep * ep = arg; zt_pipe *p; nni_aio *aio = ep->ze_creq_aio; nni_aio *uaio; int rv; nni_mtx_lock(&zt_lk); ep->ze_creq_active = false; switch ((rv = nni_aio_result(aio))) { case 0: p = nni_aio_get_output(aio, 0); // Already canceled, or already handled? if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); zt_pipe_start_ping(p); nni_aio_set_output(uaio, 0, p); nni_aio_finish(uaio, 0, 0); } else { // We have a pipe, but nowhere to stick it. // Just discard it. zt_pipe_fini(p); } ep->ze_creq_try = 0; break; case NNG_ETIMEDOUT: if ((ep->ze_creq_try > ep->ze_conn_tries) && (ep->ze_conn_tries > 0)) { // Final timeout attempt. if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); nni_aio_finish_error(uaio, rv); // reset the counter. ep->ze_creq_try = 0; } } break; default: // Failed hard? if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); nni_aio_finish_error(uaio, rv); } ep->ze_creq_try = 0; break; } if (nni_list_first(&ep->ze_aios) != NULL) { nni_aio_set_timeout(aio, ep->ze_conn_time); if (nni_aio_begin(aio) == 0) { rv = nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); if (rv != 0) { nni_aio_finish_error(aio, rv); } else { ep->ze_creq_active = true; ep->ze_creq_try++; zt_ep_send_conn_req(ep); } } } nni_mtx_unlock(&zt_lk); } static void zt_ep_connect(void *arg, nni_aio *aio) { zt_ep *ep = arg; int rv; if (nni_aio_begin(aio) != 0) { return; } // We bind locally. We'll use the address later when we give // it to the pipe, but this allows us to receive the initial // ack back from the server. (This gives us an ephemeral // address to work with.) nni_mtx_lock(&zt_lk); // Clear the port so we get an ephemeral port. ep->ze_laddr &= ~((uint64_t) zt_port_mask); if ((rv = zt_ep_bind_locked(ep)) != 0) { nni_aio_finish_error(aio, rv); nni_mtx_unlock(&zt_lk); return; } if ((ep->ze_raddr >> 24) == 0) { ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); } if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) { nni_aio_finish_error(aio, rv); nni_mtx_unlock(&zt_lk); return; } nni_aio_list_append(&ep->ze_aios, aio); ep->ze_running = true; nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); if (nni_aio_begin(ep->ze_creq_aio) == 0) { rv = nni_aio_schedule( ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); if (rv != 0) { nni_aio_finish_error(ep->ze_creq_aio, rv); } else { // Send out the first connect message; if not // yet attached to network message will be dropped. ep->ze_creq_try = 1; ep->ze_creq_active = true; zt_ep_send_conn_req(ep); } } nni_mtx_unlock(&zt_lk); } static int zt_ep_set_recvmaxsz(void *arg, const void *data, size_t sz, nni_type t) { zt_ep *ep = arg; size_t val; int rv; if ((rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, t)) == 0) { nni_mtx_lock(&zt_lk); ep->ze_rcvmax = val; nni_mtx_unlock(&zt_lk); } return (rv); } static int zt_ep_get_recvmaxsz(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); rv = nni_copyout_size(ep->ze_rcvmax, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_set_home(void *arg, const void *data, size_t sz, nni_type t) { int rv; zt_ep *ep = arg; if ((rv = zt_check_string(data, sz, t)) == 0) { nni_mtx_lock(&zt_lk); if (ep->ze_running) { rv = NNG_ESTATE; } else { nni_strlcpy(ep->ze_home, data, sizeof(ep->ze_home)); if ((rv = zt_node_find(ep)) != 0) { ep->ze_ztn = NULL; } } nni_mtx_unlock(&zt_lk); } return (rv); } static int zt_ep_get_home(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); rv = nni_copyout_str(ep->ze_home, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_get_url(void *arg, void *data, size_t *szp, nni_type t) { char ustr[64]; // more than plenty zt_ep * ep = arg; uint64_t addr; nni_mtx_lock(&zt_lk); addr = ep->ze_nlistener != NULL ? ep->ze_laddr : ep->ze_raddr; snprintf(ustr, sizeof(ustr), "zt://%llx.%llx:%u", (unsigned long long) addr >> zt_port_shift, (unsigned long long) ep->ze_nwid, (unsigned) (addr & zt_port_mask)); nni_mtx_unlock(&zt_lk); return (nni_copyout_str(ustr, data, szp, t)); } static int zt_ep_set_orbit(void *arg, const void *data, size_t sz, nni_type t) { uint64_t moonid; uint64_t peerid; zt_ep * ep = arg; int rv; enum ZT_ResultCode zrv; if ((t != NNI_TYPE_UINT64) && (t != NNI_TYPE_OPAQUE)) { return (NNG_EBADTYPE); } if (sz == sizeof(uint64_t)) { memcpy(&moonid, data, sizeof(moonid)); peerid = 0; } else if (sz == sizeof(uint64_t) * 2) { memcpy(&moonid, data, sizeof(moonid)); memcpy(&peerid, ((char *) data) + sizeof(uint64_t), sizeof(peerid)); } else { return (NNG_EINVAL); } nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } zrv = ZT_Node_orbit(ep->ze_ztn->zn_znode, NULL, moonid, peerid); nni_mtx_unlock(&zt_lk); return (zt_result(zrv)); } static int zt_ep_set_deorbit(void *arg, const void *data, size_t sz, nni_type t) { uint64_t moonid; zt_ep * ep = arg; int rv; if ((rv = nni_copyin_u64(&moonid, data, sz, t)) == 0) { enum ZT_ResultCode zrv; nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } zrv = ZT_Node_deorbit(ep->ze_ztn->zn_znode, NULL, moonid); nni_mtx_unlock(&zt_lk); rv = zt_result(zrv); } return (rv); } static int zt_ep_set_add_local_addr(void *arg, const void *data, size_t sz, nni_type t) { nng_sockaddr sa; zt_ep * ep = arg; int rv; if ((rv = nni_copyin_sockaddr(&sa, data, sz, t)) == 0) { enum ZT_ResultCode zrv; zt_node * ztn; struct sockaddr_storage ss; struct sockaddr_in * sin; struct sockaddr_in6 * sin6; memset(&ss, 0, sizeof(ss)); switch (sa.s_family) { case NNG_AF_INET: sin = (void *) &ss; sin->sin_family = AF_INET; sin->sin_addr.s_addr = sa.s_in.sa_addr; sin->sin_port = 0; break; case NNG_AF_INET6: sin6 = (void *) &ss; sin6->sin6_family = AF_INET6; sin6->sin6_port = 0; memcpy(&sin6->sin6_addr, sa.s_in6.sa_addr, 16); break; default: return (NNG_EINVAL); } nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } ztn = ep->ze_ztn; zrv = ZT_Node_addLocalInterfaceAddress(ztn->zn_znode, &ss); nni_mtx_unlock(&zt_lk); rv = zt_result(zrv); } return (rv); } static int zt_ep_set_clear_local_addrs(void *arg, const void *data, size_t sz, nni_type t) { zt_ep *ep = arg; int rv; NNI_ARG_UNUSED(data); NNI_ARG_UNUSED(sz); NNI_ARG_UNUSED(t); ZT_Node *zn; nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } zn = ep->ze_ztn->zn_znode; ZT_Node_clearLocalInterfaceAddresses(zn); nni_mtx_unlock(&zt_lk); return (0); } static int zt_ep_get_node(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } rv = nni_copyout_u64(ep->ze_ztn->zn_self, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_get_nwid(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } rv = nni_copyout_u64(ep->ze_nwid, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_get_nw_name(void *arg, void *buf, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } rv = zt_get_nw_name(ep->ze_ztn, ep->ze_nwid, buf, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_get_nw_status(void *arg, void *buf, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; int status; nni_mtx_lock(&zt_lk); if ((ep->ze_ztn == NULL) && ((rv = zt_node_find(ep)) != 0)) { nni_mtx_unlock(&zt_lk); return (rv); } if ((rv = zt_get_nw_status(ep->ze_ztn, ep->ze_nwid, &status)) != 0) { nni_mtx_unlock(&zt_lk); return (rv); } nni_mtx_unlock(&zt_lk); return (nni_copyout_int(status, buf, szp, t)); } static int zt_ep_set_ping_time(void *arg, const void *data, size_t sz, nni_type t) { zt_ep * ep = arg; nng_duration val; int rv; if ((rv = nni_copyin_ms(&val, data, sz, t)) == 0) { nni_mtx_lock(&zt_lk); ep->ze_ping_time = val; nni_mtx_unlock(&zt_lk); } return (rv); } static int zt_ep_get_ping_time(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); rv = nni_copyout_ms(ep->ze_ping_time, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_set_ping_tries(void *arg, const void *data, size_t sz, nni_type t) { zt_ep *ep = arg; int val; int rv; if ((rv = nni_copyin_int(&val, data, sz, 0, 1000000, t)) == 0) { nni_mtx_lock(&zt_lk); ep->ze_ping_tries = val; nni_mtx_unlock(&zt_lk); } return (rv); } static int zt_ep_get_ping_tries(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); rv = nni_copyout_int(ep->ze_ping_tries, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_set_conn_time(void *arg, const void *data, size_t sz, nni_type t) { zt_ep * ep = arg; nng_duration val; int rv; if ((rv = nni_copyin_ms(&val, data, sz, t)) == 0) { nni_mtx_lock(&zt_lk); ep->ze_conn_time = val; nni_mtx_unlock(&zt_lk); } return (rv); } static int zt_ep_get_conn_time(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); rv = nni_copyout_ms(ep->ze_conn_time, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_set_conn_tries(void *arg, const void *data, size_t sz, nni_type t) { zt_ep *ep = arg; int val; int rv; if ((rv = nni_copyin_int(&val, data, sz, 0, 1000000, t)) == 0) { nni_mtx_lock(&zt_lk); ep->ze_conn_tries = val; nni_mtx_unlock(&zt_lk); } return (rv); } static int zt_ep_get_conn_tries(void *arg, void *data, size_t *szp, nni_type t) { zt_ep *ep = arg; int rv; nni_mtx_lock(&zt_lk); rv = nni_copyout_int(ep->ze_conn_tries, data, szp, t); nni_mtx_unlock(&zt_lk); return (rv); } static int zt_ep_get_locaddr(void *arg, void *data, size_t *szp, nni_type t) { zt_ep * ep = arg; nng_sockaddr sa; memset(&sa, 0, sizeof(sa)); sa.s_zt.sa_family = NNG_AF_ZT; nni_mtx_lock(&zt_lk); sa.s_zt.sa_nwid = ep->ze_nwid; sa.s_zt.sa_nodeid = ep->ze_laddr >> zt_port_shift; sa.s_zt.sa_port = ep->ze_laddr & zt_port_mask; nni_mtx_unlock(&zt_lk); return (nni_copyout_sockaddr(&sa, data, szp, t)); } static int zt_pipe_get_locaddr(void *arg, void *data, size_t *szp, nni_type t) { zt_pipe * p = arg; nng_sockaddr sa; memset(&sa, 0, sizeof(sa)); sa.s_zt.sa_family = NNG_AF_ZT; sa.s_zt.sa_nwid = p->zp_nwid; sa.s_zt.sa_nodeid = p->zp_laddr >> zt_port_shift; sa.s_zt.sa_port = p->zp_laddr & zt_port_mask; return (nni_copyout_sockaddr(&sa, data, szp, t)); } static int zt_pipe_get_remaddr(void *arg, void *data, size_t *szp, nni_type t) { zt_pipe * p = arg; nng_sockaddr sa; memset(&sa, 0, sizeof(sa)); sa.s_zt.sa_family = NNG_AF_ZT; sa.s_zt.sa_nwid = p->zp_nwid; sa.s_zt.sa_nodeid = p->zp_raddr >> zt_port_shift; sa.s_zt.sa_port = p->zp_raddr & zt_port_mask; return (nni_copyout_sockaddr(&sa, data, szp, t)); } static int zt_pipe_get_mtu(void *arg, void *data, size_t *szp, nni_type t) { zt_pipe *p = arg; return (nni_copyout_size(p->zp_mtu, data, szp, t)); } static const nni_option zt_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, .o_get = zt_pipe_get_locaddr, }, { .o_name = NNG_OPT_REMADDR, .o_get = zt_pipe_get_remaddr, }, { .o_name = NNG_OPT_ZT_MTU, .o_get = zt_pipe_get_mtu, }, { .o_name = NNG_OPT_ZT_NWID, .o_get = zt_pipe_get_nwid, }, { .o_name = NNG_OPT_ZT_NODE, .o_get = zt_pipe_get_node, }, { .o_name = NNG_OPT_RECVMAXSZ, .o_get = zt_pipe_get_recvmaxsz, }, // terminate list { .o_name = NULL, }, }; static int zt_pipe_getopt(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { zt_pipe *p = arg; return (nni_getopt(zt_pipe_options, name, p, buf, szp, t)); } static nni_tran_pipe_ops zt_pipe_ops = { .p_init = zt_pipe_init, .p_fini = zt_pipe_fini, .p_send = zt_pipe_send, .p_recv = zt_pipe_recv, .p_close = zt_pipe_close, .p_peer = zt_pipe_peer, .p_getopt = zt_pipe_getopt, }; static nni_option zt_dialer_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_get = zt_ep_get_recvmaxsz, .o_set = zt_ep_set_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_get = zt_ep_get_url, }, { .o_name = NNG_OPT_ZT_HOME, .o_get = zt_ep_get_home, .o_set = zt_ep_set_home, }, { .o_name = NNG_OPT_ZT_NODE, .o_get = zt_ep_get_node, }, { .o_name = NNG_OPT_ZT_NWID, .o_get = zt_ep_get_nwid, }, { .o_name = NNG_OPT_ZT_NETWORK_STATUS, .o_get = zt_ep_get_nw_status, }, { .o_name = NNG_OPT_ZT_NETWORK_NAME, .o_get = zt_ep_get_nw_name, }, { .o_name = NNG_OPT_ZT_PING_TIME, .o_get = zt_ep_get_ping_time, .o_set = zt_ep_set_ping_time, }, { .o_name = NNG_OPT_ZT_PING_TRIES, .o_get = zt_ep_get_ping_tries, .o_set = zt_ep_set_ping_tries, }, { .o_name = NNG_OPT_ZT_CONN_TIME, .o_get = zt_ep_get_conn_time, .o_set = zt_ep_set_conn_time, }, { .o_name = NNG_OPT_ZT_CONN_TRIES, .o_get = zt_ep_get_conn_tries, .o_set = zt_ep_set_conn_tries, }, { .o_name = NNG_OPT_ZT_ORBIT, .o_set = zt_ep_set_orbit, }, { .o_name = NNG_OPT_ZT_DEORBIT, .o_set = zt_ep_set_deorbit, }, { .o_name = NNG_OPT_ZT_ADD_LOCAL_ADDR, .o_set = zt_ep_set_add_local_addr, }, { .o_name = NNG_OPT_ZT_CLEAR_LOCAL_ADDRS, .o_set = zt_ep_set_clear_local_addrs, }, // terminate list { .o_name = NULL, }, }; static nni_option zt_listener_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_get = zt_ep_get_recvmaxsz, .o_set = zt_ep_set_recvmaxsz, }, { .o_name = NNG_OPT_URL, .o_get = zt_ep_get_url, }, { .o_name = NNG_OPT_ZT_HOME, .o_get = zt_ep_get_home, .o_set = zt_ep_set_home, }, { .o_name = NNG_OPT_ZT_NODE, .o_get = zt_ep_get_node, }, { .o_name = NNG_OPT_ZT_NWID, .o_get = zt_ep_get_nwid, }, { .o_name = NNG_OPT_ZT_NETWORK_STATUS, .o_get = zt_ep_get_nw_status, }, { .o_name = NNG_OPT_ZT_NETWORK_NAME, .o_get = zt_ep_get_nw_name, }, { .o_name = NNG_OPT_ZT_PING_TIME, .o_get = zt_ep_get_ping_time, .o_set = zt_ep_set_ping_time, }, { .o_name = NNG_OPT_ZT_PING_TRIES, .o_get = zt_ep_get_ping_tries, .o_set = zt_ep_set_ping_tries, }, { .o_name = NNG_OPT_ZT_ORBIT, .o_set = zt_ep_set_orbit, }, { .o_name = NNG_OPT_ZT_DEORBIT, .o_set = zt_ep_set_deorbit, }, { .o_name = NNG_OPT_LOCADDR, .o_get = zt_ep_get_locaddr, }, // terminate list { .o_name = NULL, }, }; static nni_tran_dialer_ops zt_dialer_ops = { .d_init = zt_dialer_init, .d_fini = zt_ep_fini, .d_connect = zt_ep_connect, .d_close = zt_ep_close, .d_options = zt_dialer_options, }; static nni_tran_listener_ops zt_listener_ops = { .l_init = zt_listener_init, .l_fini = zt_ep_fini, .l_bind = zt_ep_bind, .l_accept = zt_ep_accept, .l_close = zt_ep_close, .l_options = zt_listener_options, }; // This is the ZeroTier transport linkage, and should be the // only global symbol in this entire file. static struct nni_tran zt_tran = { .tran_scheme = "zt", .tran_dialer = &zt_dialer_ops, .tran_listener = &zt_listener_ops, .tran_pipe = &zt_pipe_ops, .tran_init = zt_tran_init, .tran_fini = zt_tran_fini, }; #ifndef NNG_ELIDE_DEPRECATED int nng_zt_register(void) { return (nni_init()); } #endif void nni_sp_zt_register(void) { nni_tran_register(&zt_tran); }