// // Copyright 2018 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include "nng/compat/nanomsg/nn.h" // transports #include "nng/compat/nanomsg/inproc.h" #include "nng/compat/nanomsg/ipc.h" #include "nng/compat/nanomsg/tcp.h" #include "nng/compat/nanomsg/ws.h" // protocols #include "nng/compat/nanomsg/bus.h" #include "nng/compat/nanomsg/pair.h" #include "nng/compat/nanomsg/pipeline.h" #include "nng/compat/nanomsg/pubsub.h" #include "nng/compat/nanomsg/reqrep.h" #include "nng/compat/nanomsg/survey.h" // underlying NNG headers #include "nng/nng.h" #include "nng/protocol/bus0/bus.h" #include "nng/protocol/pair0/pair.h" #include "nng/protocol/pipeline0/pull.h" #include "nng/protocol/pipeline0/push.h" #include "nng/protocol/pubsub0/pub.h" #include "nng/protocol/pubsub0/sub.h" #include "nng/protocol/reqrep0/rep.h" #include "nng/protocol/reqrep0/req.h" #include "nng/protocol/survey0/respond.h" #include "nng/protocol/survey0/survey.h" #include "core/nng_impl.h" #include #include // This file supplies the legacy compatibility API. Applications should // avoid using these if at all possible, and instead use the new style APIs. static const struct { int nerr; int perr; } nn_errnos[] = { // clang-format off { NNG_EINTR, EINTR }, { NNG_ENOMEM, ENOMEM }, { NNG_EINVAL, EINVAL }, { NNG_EBUSY, EBUSY }, { NNG_ETIMEDOUT, ETIMEDOUT }, { NNG_ECONNREFUSED, ECONNREFUSED }, { NNG_ECLOSED, EBADF }, { NNG_EAGAIN, EAGAIN }, { NNG_ENOTSUP, ENOTSUP }, { NNG_EADDRINUSE, EADDRINUSE }, { NNG_ESTATE, EFSM }, { NNG_ENOENT, ENOENT }, { NNG_EPROTO, EPROTO }, { NNG_EUNREACHABLE, EHOSTUNREACH }, { NNG_EADDRINVAL, EINVAL }, { NNG_EPERM, EACCES }, { NNG_EMSGSIZE, EMSGSIZE }, { NNG_ECONNABORTED, ECONNABORTED }, { NNG_ECONNRESET, ECONNRESET }, { NNG_ECANCELED, EBADF }, { NNG_EEXIST, EEXIST }, { NNG_EWRITEONLY, EACCES }, { NNG_EREADONLY, EACCES }, { NNG_ECRYPTO, EACCES }, { NNG_EPEERAUTH, EACCES }, { NNG_EBADTYPE, EINVAL }, { NNG_EAMBIGUOUS, EINVAL }, { NNG_ENOFILES, EMFILE }, { NNG_ENOSPC, ENOSPC }, { 0, 0 }, // clang-format on }; const char * nn_strerror(int err) { int i; static char msgbuf[32]; for (i = 0; nn_errnos[i].perr != 0; i++) { if (nn_errnos[i].perr == err) { return (nng_strerror(nn_errnos[i].nerr)); } } if (err == EIO) { return ("Unknown I/O error"); } // Arguably we could use strerror() here, but we should only // be getting errnos we understand at this point. (void) snprintf(msgbuf, sizeof(msgbuf), "Unknown error %d", err); return (msgbuf); } static void nn_seterror(int err) { int i; for (i = 0; nn_errnos[i].nerr != 0; i++) { if (nn_errnos[i].nerr == err) { errno = nn_errnos[i].perr; return; } } // No idea... errno = EIO; } int nn_errno(void) { return (errno); } static const struct { uint16_t p_id; int (*p_open)(nng_socket *); int (*p_open_raw)(nng_socket *); } nn_protocols[] = { #ifdef NNG_HAVE_BUS0 { .p_id = NN_BUS, .p_open = nng_bus0_open, .p_open_raw = nng_bus0_open_raw, }, #endif #ifdef NNG_HAVE_PAIR0 { .p_id = NN_PAIR, .p_open = nng_pair0_open, .p_open_raw = nng_pair0_open_raw, }, #endif #ifdef NNG_HAVE_PULL0 { .p_id = NN_PULL, .p_open = nng_pull0_open, .p_open_raw = nng_pull0_open_raw, }, #endif #ifdef NNG_HAVE_PUSH0 { .p_id = NN_PUSH, .p_open = nng_push0_open, .p_open_raw = nng_push0_open_raw, }, #endif #ifdef NNG_HAVE_PUB0 { .p_id = NN_PUB, .p_open = nng_pub0_open, .p_open_raw = nng_pub0_open_raw, }, #endif #ifdef NNG_HAVE_SUB0 { .p_id = NN_SUB, .p_open = nng_sub0_open, .p_open_raw = nng_sub0_open_raw, }, #endif #ifdef NNG_HAVE_REQ0 { .p_id = NN_REQ, .p_open = nng_req0_open, .p_open_raw = nng_req0_open_raw, }, #endif #ifdef NNG_HAVE_REP0 { .p_id = NN_REP, .p_open = nng_rep0_open, .p_open_raw = nng_rep0_open_raw, }, #endif #ifdef NNG_HAVE_SURVEYOR0 { .p_id = NN_SURVEYOR, .p_open = nng_surveyor0_open, .p_open_raw = nng_surveyor0_open_raw, }, #endif #ifdef NNG_HAVE_RESPONDENT0 { .p_id = NN_RESPONDENT, .p_open = nng_respondent0_open, .p_open_raw = nng_respondent0_open_raw, }, #endif { .p_id = 0, }, }; int nn_socket(int domain, int protocol) { nng_socket sock; int rv; int i; if ((domain != AF_SP) && (domain != AF_SP_RAW)) { errno = EAFNOSUPPORT; return (-1); } for (i = 0; nn_protocols[i].p_id != 0; i++) { if (nn_protocols[i].p_id == protocol) { break; } } if (nn_protocols[i].p_open == NULL) { errno = ENOTSUP; return (-1); } if (domain == AF_SP_RAW) { rv = nn_protocols[i].p_open_raw(&sock); } else { rv = nn_protocols[i].p_open(&sock); } if (rv != 0) { nn_seterror(rv); return (-1); } // Legacy sockets have nodelay disabled. (void) nng_socket_set_bool(sock, NNG_OPT_TCP_NODELAY, false); return ((int) sock.id); } int nn_close(int s) { int rv; nng_socket sid; sid.id = (uint32_t) s; if ((rv = nng_close(sid)) != 0) { nn_seterror(rv); return (-1); } return (0); } int nn_bind(int s, const char *addr) { int rv; nng_listener l; nng_socket sid; sid.id = (uint32_t) s; if ((rv = nng_listen(sid, addr, &l, 0)) != 0) { nn_seterror(rv); return (-1); } return ((int) l.id); } int nn_connect(int s, const char *addr) { int rv; nng_dialer d; nng_socket sid; sid.id = (uint32_t) s; if ((rv = nng_dial(sid, addr, &d, NNG_FLAG_NONBLOCK)) != 0) { nn_seterror(rv); return (-1); } return ((int) d.id); } int nn_shutdown(int s, int ep) { int rv; (void) s; // Unused nng_dialer d; nng_listener l; // Socket is wired into the endpoint... so passing a bad endpoint // ID can result in affecting the wrong socket. But this requires // a buggy application, and because we don't recycle endpoints // until wrap, its unlikely to actually come up in practice. // Note that listeners and dialers share the same namespace // in the core, so we can close either one this way. d.id = l.id = (uint32_t) ep; if (((rv = nng_dialer_close(d)) != 0) && ((rv = nng_listener_close(l)) != 0)) { nn_seterror(rv); return (-1); } return (0); } void * nn_allocmsg(size_t size, int type) { nng_msg *msg; int rv; // Validate type and non-zero size. This also checks for overflow. if ((type != 0) || (size < 1) || ((size + sizeof(msg) < size))) { nn_seterror(NNG_EINVAL); return (NULL); } // So our "messages" from nn are really going to be nng messages // but to make this work, we use a bit of headroom in the message // to stash the message header. if ((rv = nng_msg_alloc(&msg, size + (sizeof(msg)))) != 0) { nn_seterror(rv); return (NULL); } // This counts on message bodies being aligned sensibly. *(nng_msg **) (nng_msg_body(msg)) = msg; // We are counting on the implementation of nn_msg_trim to not // reallocate the message but just to leave the prefix inplace. (void) nng_msg_trim(msg, sizeof(msg)); return (nng_msg_body(msg)); } int nn_freemsg(void *ptr) { nng_msg *msg; msg = *(nng_msg **) (((char *) ptr) - sizeof(msg)); nng_msg_free(msg); return (0); } void * nn_reallocmsg(void *ptr, size_t len) { nng_msg *msg; int rv; if ((len + sizeof(msg)) < len) { // overflowed! nn_seterror(NNG_EINVAL); return (NULL); } // This counts on message bodies being aligned sensibly. msg = *(nng_msg **) (((char *) ptr) - sizeof(msg)); // We need to realloc the requested len, plus size for our header. if ((rv = nng_msg_realloc(msg, len + sizeof(msg))) != 0) { // We don't free the old message. Code is free to cope // as it sees fit. nn_seterror(rv); return (NULL); } // Stash the msg header pointer *(nng_msg **) (nng_msg_body(msg)) = msg; nng_msg_trim(msg, sizeof(msg)); return (nng_msg_body(msg)); } static int nn_flags(int flags) { switch (flags) { case 0: return (0); case NN_DONTWAIT: return (NNG_FLAG_NONBLOCK); default: nn_seterror(NNG_EINVAL); return (-1); } } int nn_send(int s, const void *buf, size_t len, int flags) { struct nn_iovec iov; struct nn_msghdr hdr; iov.iov_base = (void *) buf; iov.iov_len = len; hdr.msg_iov = &iov; hdr.msg_iovlen = 1; hdr.msg_control = NULL; hdr.msg_controllen = 0; return (nn_sendmsg(s, &hdr, flags)); } int nn_recv(int s, void *buf, size_t len, int flags) { struct nn_iovec iov; struct nn_msghdr hdr; iov.iov_base = buf; iov.iov_len = len; hdr.msg_iov = &iov; hdr.msg_iovlen = 1; hdr.msg_control = NULL; hdr.msg_controllen = 0; return (nn_recvmsg(s, &hdr, flags)); } int nn_recvmsg(int s, struct nn_msghdr *mh, int flags) { int rv; nng_msg * msg; size_t len; int keep = 0; nng_socket sid; if ((flags = nn_flags(flags)) == -1) { return (-1); } if (mh == NULL) { nn_seterror(NNG_EINVAL); return (-1); } if (mh->msg_iovlen < 0) { nn_seterror(NNG_EMSGSIZE); return (-1); } sid.id = (uint32_t) s; if ((rv = nng_recvmsg(sid, &msg, flags)) != 0) { nn_seterror(rv); return (-1); } if ((mh->msg_iovlen == 1) && (mh->msg_iov[0].iov_len == NN_MSG)) { // Receiver wants to have a dynamically allocated message. // There can only be one of these. if ((rv = nng_msg_insert(msg, &msg, sizeof(msg))) != 0) { nng_msg_free(msg); nn_seterror(rv); return (-1); } nng_msg_trim(msg, sizeof(msg)); *(void **) (mh->msg_iov[0].iov_base) = nng_msg_body(msg); len = nng_msg_len(msg); keep = 1; // Do not discard message! } else { // copyout to multiple iovecs. char *ptr = nng_msg_body(msg); len = nng_msg_len(msg); for (int i = 0; i < mh->msg_iovlen; i++) { size_t n; if ((n = mh->msg_iov[i].iov_len) == NN_MSG) { // This is forbidden! nn_seterror(NNG_EINVAL); nng_msg_free(msg); return (-1); } if (n > len) { n = len; } memcpy(mh->msg_iov[i].iov_base, ptr, n); len -= n; ptr += n; } // If we copied everything, len will be zero, otherwise, // it represents the amount of data that we were unable to // copyout. The caller is responsible for noticing this, // as there is no API to pass this information out. len = nng_msg_len(msg); } // If the caller has requested control information (header details), // we grab it. if (mh->msg_control != NULL) { char * cdata; size_t clen; size_t tlen; size_t spsz; struct nn_cmsghdr *hdr; spsz = nng_msg_header_len(msg); clen = NN_CMSG_SPACE(sizeof(spsz) + spsz); if ((tlen = mh->msg_controllen) == NN_MSG) { // Ideally we'd use the same msg, but we would need // to set up reference counts on the message, so // instead we just make a new message. nng_msg *nmsg; rv = nng_msg_alloc(&nmsg, clen + sizeof(nmsg)); if (rv != 0) { nng_msg_free(msg); nn_seterror(rv); return (-1); } memcpy(nng_msg_body(nmsg), &nmsg, sizeof(nmsg)); nng_msg_trim(nmsg, sizeof(nmsg)); cdata = nng_msg_body(nmsg); *(void **) mh->msg_control = cdata; tlen = clen; } else { cdata = mh->msg_control; memset(cdata, 0, tlen > sizeof(*hdr) ? sizeof(*hdr) : tlen); } if (clen <= tlen) { uint8_t *ptr = NN_CMSG_DATA(cdata); hdr = (void *) cdata; hdr->cmsg_len = clen; hdr->cmsg_level = PROTO_SP; hdr->cmsg_type = SP_HDR; memcpy(ptr, &spsz, sizeof(spsz)); ptr += sizeof(spsz); memcpy(ptr, nng_msg_header(msg), spsz); } } if (!keep) { nng_msg_free(msg); } return ((int) len); } int nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) { nng_msg * msg = NULL; nng_msg * cmsg = NULL; nng_socket sid; char * cdata; int keep = 0; size_t sz; int rv; sid.id = (uint32_t) s; if ((flags = nn_flags(flags)) == -1) { return (-1); } if (mh == NULL) { nn_seterror(NNG_EINVAL); return (-1); } if (mh->msg_iovlen < 0) { nn_seterror(NNG_EMSGSIZE); return (-1); } if ((mh->msg_iovlen == 1) && (mh->msg_iov[0].iov_len == NN_MSG)) { char *bufp = *(char **) (mh->msg_iov[0].iov_base); msg = *(nng_msg **) (bufp - sizeof(msg)); keep = 1; // keep the message on error } else { char *ptr; int i; sz = 0; // Get the total message size. for (i = 0; i < mh->msg_iovlen; i++) { sz += mh->msg_iov[i].iov_len; } if ((rv = nng_msg_alloc(&msg, sz)) != 0) { nn_seterror(rv); return (-1); } // Now copy it out. ptr = nng_msg_body(msg); for (i = 0; i < mh->msg_iovlen; i++) { memcpy(ptr, mh->msg_iov[i].iov_base, mh->msg_iov[i].iov_len); ptr += mh->msg_iov[i].iov_len; } } // Now suck up the control data... // This POSIX-inspired API is one of the most painful for // usability we've ever seen. cmsg = NULL; if ((cdata = mh->msg_control) != NULL) { size_t clen; size_t offs; size_t spsz; unsigned char *data; if ((clen = mh->msg_controllen) == NN_MSG) { // Underlying data is a message. This is awkward, // because we have to copy the data, but we should // only free this message on success. So we save the // message now. cdata = *(void **) cdata; cmsg = *(nng_msg **) (cdata - sizeof(cmsg)); clen = nng_msg_len(cmsg); } else { clen = mh->msg_controllen; } offs = 0; while ((offs + sizeof(NN_CMSG_LEN(0))) < clen) { struct nn_cmsghdr *chdr = (void *) (cdata + offs); if ((chdr->cmsg_level != PROTO_SP) || (chdr->cmsg_type != SP_HDR)) { offs += chdr->cmsg_len; } // SP header in theory. Starts with size, then // any backtrace details. if (chdr->cmsg_len < sizeof(size_t)) { offs += chdr->cmsg_len; continue; } data = NN_CMSG_DATA(chdr); memcpy(&spsz, data, sizeof(spsz)); if ((spsz + sizeof(spsz)) > chdr->cmsg_len) { // Truncated header? Ignore it. offs += chdr->cmsg_len; continue; } data += sizeof(spsz); rv = nng_msg_header_append(msg, data, spsz); if (rv != 0) { if (!keep) { nng_msg_free(msg); } nn_seterror(rv); return (-1); } break; } } sz = nng_msg_len(msg); if ((rv = nng_sendmsg(sid, msg, flags)) != 0) { if (!keep) { nng_msg_free(msg); } nn_seterror(rv); return (-1); } if (cmsg != NULL) { // We sent successfully, so free up the control message. nng_msg_free(cmsg); } return ((int) sz); } static int nn_getdomain(nng_socket s, void *valp, size_t *szp) { int i; bool b; int rv; if ((rv = nng_socket_get_bool(s, NNG_OPT_RAW, &b)) != 0) { nn_seterror(rv); return (-1); } i = b ? AF_SP_RAW : AF_SP; memcpy(valp, &i, *szp < sizeof(int) ? *szp : sizeof(int)); *szp = sizeof(int); return (0); } #ifndef NNG_PLATFORM_WINDOWS #define SOCKET int #endif static int nn_getfd(nng_socket s, void *valp, size_t *szp, const char *opt) { int ifd; int rv; SOCKET sfd; if ((rv = nng_socket_get_int(s, opt, &ifd)) != 0) { nn_seterror(rv); return (-1); } sfd = (SOCKET) ifd; memcpy(valp, &sfd, *szp < sizeof(sfd) ? *szp : sizeof(sfd)); *szp = sizeof(sfd); return (0); } static int nn_getrecvfd(nng_socket s, void *valp, size_t *szp) { return (nn_getfd(s, valp, szp, NNG_OPT_RECVFD)); } static int nn_getsendfd(nng_socket s, void *valp, size_t *szp) { return (nn_getfd(s, valp, szp, NNG_OPT_SENDFD)); } static int nn_getzero(nng_socket s, void *valp, size_t *szp) { int zero = 0; NNI_ARG_UNUSED(s); memcpy(valp, &zero, *szp < sizeof(zero) ? *szp : sizeof(zero)); *szp = sizeof(zero); return (0); } static int nn_setignore(nng_socket s, const void *valp, size_t sz) { NNI_ARG_UNUSED(valp); NNI_ARG_UNUSED(s); if (sz != sizeof(int)) { nn_seterror(NNG_EINVAL); return (-1); } return (0); } static int nn_getwsmsgtype(nng_socket s, void *valp, size_t *szp) { int val = NN_WS_MSG_TYPE_BINARY; NNI_ARG_UNUSED(s); memcpy(valp, &val, *szp < sizeof(val) ? *szp : sizeof(val)); *szp = sizeof(val); return (0); } static int nn_setwsmsgtype(nng_socket s, const void *valp, size_t sz) { int val; NNI_ARG_UNUSED(s); if (sz != sizeof(val)) { nn_seterror(NNG_EINVAL); return (-1); } memcpy(&val, valp, sizeof(val)); if (val != NN_WS_MSG_TYPE_BINARY) { nn_seterror(NNG_EINVAL); return (-1); } return (0); } static int nn_settcpnodelay(nng_socket s, const void *valp, size_t sz) { bool val; int ival; int rv; if (sz != sizeof(ival)) { errno = EINVAL; return (-1); } memcpy(&ival, valp, sizeof(ival)); switch (ival) { case 0: val = false; break; case 1: val = true; break; default: nn_seterror(NNG_EINVAL); return (-1); } if ((rv = nng_socket_set_bool(s, NNG_OPT_TCP_NODELAY, val)) != 0) { nn_seterror(rv); return (-1); } return (0); } static int nn_gettcpnodelay(nng_socket s, void *valp, size_t *szp) { bool val; int ival; int rv; if ((rv = nng_socket_get_bool(s, NNG_OPT_TCP_NODELAY, &val)) != 0) { nn_seterror(rv); return (-1); } ival = val ? 1 : 0; memcpy(valp, &ival, *szp < sizeof(ival) ? *szp : sizeof(ival)); *szp = sizeof(ival); return (0); } static int nn_getrcvbuf(nng_socket s, void *valp, size_t *szp) { int cnt; int rv; if ((rv = nng_socket_get_int(s, NNG_OPT_RECVBUF, &cnt)) != 0) { nn_seterror(rv); return (-1); } cnt *= 1024; memcpy(valp, &cnt, *szp < sizeof(cnt) ? *szp : sizeof(cnt)); *szp = sizeof(cnt); return (0); } static int nn_setrcvbuf(nng_socket s, const void *valp, size_t sz) { int cnt; int rv; if (sz != sizeof(cnt)) { errno = EINVAL; return (-1); } memcpy(&cnt, valp, sizeof(cnt)); // Round up to a whole number of kilobytes, then divide by kB to // go from buffer size in bytes to messages. This is a coarse // estimate, and assumes messages are 1kB on average. cnt += 1023; cnt /= 1024; if ((rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, cnt)) != 0) { nn_seterror(rv); return (-1); } return (0); } static int nn_getsndbuf(nng_socket s, void *valp, size_t *szp) { int cnt; int rv; if ((rv = nng_socket_get_int(s, NNG_OPT_SENDBUF, &cnt)) != 0) { nn_seterror(rv); return (-1); } cnt *= 1024; memcpy(valp, &cnt, *szp < sizeof(cnt) ? *szp : sizeof(cnt)); *szp = sizeof(cnt); return (0); } static int nn_setsndbuf(nng_socket s, const void *valp, size_t sz) { int cnt; int rv; if (sz != sizeof(cnt)) { errno = EINVAL; return (-1); } memcpy(&cnt, valp, sizeof(cnt)); // Round up to a whole number of kilobytes, then divide by kB to // go from buffer size in bytes to messages. This is a coarse // estimate, and assumes messages are 1kB on average. cnt += 1023; cnt /= 1024; if ((rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, cnt)) != 0) { nn_seterror(rv); return (-1); } return (0); } static int nn_setrcvmaxsz(nng_socket s, const void *valp, size_t sz) { int ival; size_t val; int rv; if (sz != sizeof(ival)) { errno = EINVAL; return (-1); } memcpy(&ival, valp, sizeof(ival)); if (ival == -1) { val = 0; } else if (ival >= 0) { // Note that if the user sets 0, it disables the limit. // This is a different semantic. val = (size_t) ival; } else { errno = EINVAL; return (-1); } if ((rv = nng_socket_set_size(s, NNG_OPT_RECVMAXSZ, val)) != 0) { nn_seterror(rv); return (-1); } return (0); } static int nn_getrcvmaxsz(nng_socket s, void *valp, size_t *szp) { int ival; int rv; size_t val; if ((rv = nng_socket_get_size(s, NNG_OPT_RECVMAXSZ, &val)) != 0) { nn_seterror(rv); return (-1); } // Legacy uses -1 to mean unlimited. New code uses 0. Note that // as a consequence, we can't set a message limit of zero. // We report any size beyond 2GB as effectively unlimited. // There is an implicit assumption here that ints are 32-bits, // but that's generally true of any platform we support. if ((val == 0) || (val > 0x7FFFFFFF)) { ival = -1; } else { ival = (int) val; } memcpy(valp, &ival, *szp < sizeof(ival) ? *szp : sizeof(ival)); *szp = sizeof(ival); return (0); } // options which we convert -- most of the array is initialized at run time. static const struct { int nnlevel; int nnopt; const char *opt; int (*get)(nng_socket, void *, size_t *); int (*set)(nng_socket, const void *, size_t); } options[] = { { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_LINGER, .get = nn_getzero, .set = nn_setignore, }, // review { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_DOMAIN, .get = nn_getdomain, .set = NULL, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVBUF, .get = nn_getrcvbuf, .set = nn_setrcvbuf, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDBUF, .get = nn_getsndbuf, .set = nn_setsndbuf, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RECONNECT_IVL, .opt = NNG_OPT_RECONNMINT, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RECONNECT_IVL_MAX, .opt = NNG_OPT_RECONNMAXT, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDFD, .opt = NNG_OPT_SENDFD, .get = nn_getsendfd, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVFD, .opt = NNG_OPT_RECVFD, .get = nn_getrecvfd, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVMAXSIZE, .get = nn_getrcvmaxsz, .set = nn_setrcvmaxsz, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_MAXTTL, .opt = NNG_OPT_MAXTTL, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVTIMEO, .opt = NNG_OPT_RECVTIMEO, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDTIMEO, .opt = NNG_OPT_SENDTIMEO, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_PROTOCOL, .opt = NNG_OPT_PROTO, }, { .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SOCKET_NAME, .opt = NNG_OPT_SOCKNAME, }, { .nnlevel = NN_REQ, .nnopt = NN_REQ_RESEND_IVL, .opt = NNG_OPT_REQ_RESENDTIME, }, { .nnlevel = NN_SUB, .nnopt = NN_SUB_SUBSCRIBE, .opt = NNG_OPT_SUB_SUBSCRIBE, }, { .nnlevel = NN_SUB, .nnopt = NN_SUB_UNSUBSCRIBE, .opt = NNG_OPT_SUB_UNSUBSCRIBE, }, { .nnlevel = NN_SURVEYOR, .nnopt = NN_SURVEYOR_DEADLINE, .opt = NNG_OPT_SURVEYOR_SURVEYTIME, }, { .nnlevel = NN_TCP, .nnopt = NN_TCP_NODELAY, .get = nn_gettcpnodelay, .set = nn_settcpnodelay, }, { .nnlevel = NN_WS, .nnopt = NN_WS_MSG_TYPE, .get = nn_getwsmsgtype, .set = nn_setwsmsgtype, } // XXX: IPV4ONLY, SNDPRIO, RCVPRIO }; int nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp) { const char *name = NULL; int (*get)(nng_socket, void *, size_t *) = NULL; int rv; nng_socket sid; sid.id = (uint32_t) s; for (unsigned i = 0; i < sizeof(options) / sizeof(options[0]); i++) { if ((options[i].nnlevel == nnlevel) && (options[i].nnopt == nnopt)) { get = options[i].get; name = options[i].opt; break; } } if (get != NULL) { return (get(sid, valp, szp)); } if (name == NULL) { errno = ENOPROTOOPT; return (-1); } if ((rv = nng_socket_get(sid, name, valp, szp)) != 0) { nn_seterror(rv); return (-1); } return (0); } int nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz) { nng_socket sid; const char *name = NULL; int (*set)(nng_socket, const void *, size_t) = NULL; int rv; sid.id = (uint32_t) s; for (unsigned i = 0; i < sizeof(options) / sizeof(options[0]); i++) { if ((options[i].nnlevel == nnlevel) && (options[i].nnopt == nnopt)) { set = options[i].set; name = options[i].opt; break; } } if (set != NULL) { return (set(sid, valp, sz)); } if (name == NULL) { errno = ENOPROTOOPT; return (-1); } if ((rv = nng_socket_set(sid, name, valp, sz)) != 0) { nn_seterror(rv); return (-1); } return (0); } struct nn_cmsghdr * nn_cmsg_next(struct nn_msghdr *mh, struct nn_cmsghdr *first) { size_t clen; char * data; // We only support SP headers, so there can be at most one header. if (first != NULL) { return (NULL); } if ((clen = mh->msg_controllen) == NN_MSG) { nng_msg *msg; data = *((void **) (mh->msg_control)); msg = *(nng_msg **) (data - sizeof(msg)); clen = nng_msg_len(msg); } else { data = mh->msg_control; } if (first == NULL) { first = (void *) data; } else { first = first + first->cmsg_len; } if (((char *) first + sizeof(*first)) > (data + clen)) { return (NULL); } return (first); } int nn_device(int s1, int s2) { int rv; nng_socket sid1; nng_socket sid2; sid1.id = (uint32_t) s1; sid2.id = (uint32_t) s2; rv = nng_device(sid1, sid2); // rv must always be nonzero nn_seterror(rv); return (-1); } // Windows stuff. #ifdef NNG_PLATFORM_WINDOWS #define poll WSAPoll #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #include #include #elif defined NNG_PLATFORM_POSIX #include #endif int nn_poll(struct nn_pollfd *fds, int nfds, int timeout) { // This function is rather unfortunate. poll() is available // on POSIX, and on Windows as WSAPoll. On other systems it might // not exist at all. We could also benefit from using a notification // that didn't have to access file descriptors... sort of access to // the pollable element on the socket. We don't have that, so we // just use poll. This function is definitely suboptimal compared to // using callbacks. #if defined(NNG_PLATFORM_WINDOWS) || defined(NNG_PLATFORM_POSIX) struct pollfd *pfd; int npfd; int rv; if ((pfd = NNI_ALLOC_STRUCTS(pfd, nfds * 2)) == NULL) { errno = ENOMEM; return (-1); } // First prepare the master polling structure. npfd = 0; for (int i = 0; i < nfds; i++) { int fd; if (fds[i].events & NN_POLLIN) { nng_socket s; s.id = fds[i].fd; if ((rv = nng_socket_get_int(s, NNG_OPT_RECVFD, &fd)) != 0) { nn_seterror(rv); NNI_FREE_STRUCTS(pfd, nfds * 2); return (-1); } #ifdef NNG_PLATFORM_WINDOWS pfd[npfd].fd = (SOCKET) fd; #else pfd[npfd].fd = fd; #endif pfd[npfd].events = POLLIN; npfd++; } if (fds[i].events & NN_POLLOUT) { nng_socket s; s.id = fds[i].fd; if ((rv = nng_socket_get_int(s, NNG_OPT_SENDFD, &fd)) != 0) { nn_seterror(rv); NNI_FREE_STRUCTS(pfd, nfds * 2); return (-1); } #ifdef NNG_PLATFORM_WINDOWS pfd[npfd].fd = (SOCKET) fd; #else pfd[npfd].fd = fd; #endif pfd[npfd].events = POLLIN; npfd++; } } rv = poll(pfd, npfd, timeout); if (rv < 0) { int e = errno; NNI_FREE_STRUCTS(pfd, nfds * 2); errno = e; return (-1); } // Now update the nn_poll from the system poll. npfd = 0; rv = 0; for (int i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].events & NN_POLLIN) { if (pfd[npfd].revents & POLLIN) { fds[i].revents |= NN_POLLIN; } npfd++; } if (fds[i].events & NN_POLLOUT) { if (pfd[npfd].revents & POLLIN) { fds[i].revents |= NN_POLLOUT; } npfd++; } if (fds[i].revents) { rv++; } } NNI_FREE_STRUCTS(pfd, nfds * 2); return (rv); #else // NNG_PLATFORM_WINDOWS or NNG_PLATFORM_POSIX NNI_ARG_UNUSED(fds); NNI_ARG_UNUSED(nfds); NNI_ARG_UNUSED(timeout); errno = ENOTSUP; return (-1); #endif } // nn_term is suitable only for shutting down the entire library, // and is not thread-safe with other functions. void nn_term(void) { // This function is relatively toxic, since it can affect // all sockets in the process, including those // in use by libraries, etc. Accordingly, do not use this // in a library -- only e.g. atexit() and similar. nni_sock_closeall(); } uint64_t nn_get_statistic(int x, int y) { (void) x; (void) y; return (0); }