// // Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include "core/nng_impl.h" #ifdef NNG_PLATFORM_POSIX #include "platform/posix/posix_pollq.h" #include #include #include #include #include #include // UDP support. // If we can suppress SIGPIPE on send, please do so. #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif struct nni_plat_udp { nni_posix_pfd *udp_pfd; int udp_fd; nni_list udp_recvq; nni_list udp_sendq; nni_mtx udp_mtx; }; static void nni_posix_udp_doerror(nni_plat_udp *udp, int rv) { nni_aio *aio; while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) || ((aio = nni_list_first(&udp->udp_sendq)) != NULL)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } } static void nni_posix_udp_doclose(nni_plat_udp *udp) { nni_posix_udp_doerror(udp, NNG_ECLOSED); } static void nni_posix_udp_dorecv(nni_plat_udp *udp) { nni_aio * aio; nni_list *q = &udp->udp_recvq; // While we're able to recv, do so. while ((aio = nni_list_first(q)) != NULL) { struct iovec iov[4]; unsigned niov; nni_iov * aiov; struct sockaddr_storage ss; nng_sockaddr * sa; struct msghdr hdr = { .msg_name = NULL }; int rv = 0; int cnt = 0; nni_aio_get_iov(aio, &niov, &aiov); for (unsigned i = 0; i < niov; i++) { iov[i].iov_base = aiov[i].iov_buf; iov[i].iov_len = aiov[i].iov_len; } hdr.msg_iov = iov; hdr.msg_iovlen = niov; hdr.msg_name = &ss; hdr.msg_namelen = sizeof(ss); if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { // No data available at socket. Leave // the AIO at the head of the queue. return; } rv = nni_plat_errno(errno); } else if ((sa = nni_aio_get_input(aio, 0)) != NULL) { // We need to store the address information. // It is incumbent on the AIO submitter to supply // storage for the address. nni_posix_sockaddr2nn( sa, (void *) &ss, hdr.msg_namelen); } nni_list_remove(q, aio); nni_aio_finish(aio, rv, cnt); } } static void nni_posix_udp_dosend(nni_plat_udp *udp) { nni_aio * aio; nni_list *q = &udp->udp_sendq; // While we're able to send, do so. while ((aio = nni_list_first(q)) != NULL) { struct sockaddr_storage ss; int len; int rv = 0; int cnt = 0; len = nni_posix_nn2sockaddr(&ss, nni_aio_get_input(aio, 0)); if (len < 1) { rv = NNG_EADDRINVAL; } else { unsigned niov; nni_iov * aiov; struct iovec iov[16]; nni_aio_get_iov(aio, &niov, &aiov); if (niov > NNI_NUM_ELEMENTS(iov)) { rv = NNG_EINVAL; } if (rv == 0) { struct msghdr hdr = { .msg_name = NULL }; for (unsigned i = 0; i < niov; i++) { iov[i].iov_base = aiov[i].iov_buf; iov[i].iov_len = aiov[i].iov_len; } hdr.msg_iov = iov; hdr.msg_iovlen = niov; hdr.msg_name = &ss; hdr.msg_namelen = len; cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL); if (cnt < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { // Cannot send now, leave. return; } rv = nni_plat_errno(errno); } } } nni_list_remove(q, aio); nni_aio_finish(aio, rv, cnt); } } // This function is called by the poller on activity on the FD. static void nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) { nni_plat_udp *udp = arg; NNI_ARG_UNUSED(pfd); nni_mtx_lock(&udp->udp_mtx); if (events & (unsigned) POLLIN) { nni_posix_udp_dorecv(udp); } if (events & (unsigned) POLLOUT) { nni_posix_udp_dosend(udp); } if (events & ((unsigned) POLLHUP | (unsigned) POLLERR | (unsigned) POLLNVAL)) { nni_posix_udp_doclose(udp); } else { events = 0; if (!nni_list_empty(&udp->udp_sendq)) { events |= (unsigned) POLLOUT; } if (!nni_list_empty(&udp->udp_recvq)) { events |= (unsigned) POLLIN; } if (events) { int rv; rv = nni_posix_pfd_arm(udp->udp_pfd, events); if (rv != 0) { nni_posix_udp_doerror(udp, rv); } } } nni_mtx_unlock(&udp->udp_mtx); } int nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) { nni_plat_udp * udp; int salen; struct sockaddr_storage sa; int rv; if ((salen = nni_posix_nn2sockaddr(&sa, bindaddr)) < 1) { return (NNG_EADDRINVAL); } // UDP opens can actually run synchronously. if ((udp = NNI_ALLOC_STRUCT(udp)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&udp->udp_mtx); udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (udp->udp_fd < 0) { rv = nni_plat_errno(errno); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); return (rv); } if (bind(udp->udp_fd, (void *) &sa, salen) != 0) { rv = nni_plat_errno(errno); (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); return (rv); } if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) { (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); return (rv); } nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp); nni_aio_list_init(&udp->udp_recvq); nni_aio_list_init(&udp->udp_sendq); *upp = udp; return (0); } void nni_plat_udp_close(nni_plat_udp *udp) { nni_posix_pfd_fini(udp->udp_pfd); nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); nni_mtx_unlock(&udp->udp_mtx); (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); } void nni_plat_udp_cancel(nni_aio *aio, void *arg, int rv) { nni_plat_udp *udp = arg; nni_mtx_lock(&udp->udp_mtx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&udp->udp_mtx); } void nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) { int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&udp->udp_mtx); if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) { nni_mtx_unlock(&udp->udp_mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&udp->udp_recvq, aio); if (nni_list_first(&udp->udp_recvq) == aio) { if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } } nni_mtx_unlock(&udp->udp_mtx); } void nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) { int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&udp->udp_mtx); if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) { nni_mtx_unlock(&udp->udp_mtx); nni_aio_finish_error(aio, rv); return; } nni_list_append(&udp->udp_sendq, aio); if (nni_list_first(&udp->udp_sendq) == aio) { if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } } nni_mtx_unlock(&udp->udp_mtx); } int nni_plat_udp_sockname(nni_plat_udp *udp, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sz; sz = sizeof(ss); if (getsockname(udp->udp_fd, (struct sockaddr *) &ss, &sz) < 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss, sz)); } #endif // NNG_PLATFORM_POSIX