// // Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #ifdef _WIN32 #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #include #include #include #else #include #include #include #include #include #include #endif #include #include #include #include #include #define TEST_NO_MAIN #include "nuts.h" void nuts_scratch_addr(const char *scheme, size_t sz, char *addr) { if ((strcmp(scheme, "inproc") == 0) || (strcmp(scheme, "abstract") == 0)) { (void) snprintf(addr, sz, "%s://nuts%04x%04x%04x%04x", scheme, nng_random(), nng_random(), nng_random(), nng_random()); return; } if ((strncmp(scheme, "tcp", 3) == 0) || (strncmp(scheme, "tls", 3) == 0)) { (void) snprintf( addr, sz, "%s://127.0.0.1:%u", scheme, nuts_next_port()); return; } if (strncmp(scheme, "ws", 2) == 0) { (void) snprintf(addr, sz, "%s://127.0.0.1:%u/nuts%04x%04x%04x%04x", scheme, nuts_next_port(), nng_random(), nng_random(), nng_random(), nng_random()); return; } if ((strncmp(scheme, "ipc", 3) == 0) || (strncmp(scheme, "unix", 4) == 0)) { #ifdef _WIN32 // Windows doesn't place IPC names in the filesystem. (void) snprintf(addr, sz, "%s://nuts%04x%04x%04x%04x", scheme, nng_random(), nng_random(), nng_random(), nng_random()); return; #else char *tmpdir; if (((tmpdir = getenv("TMPDIR")) == NULL) && ((tmpdir = getenv("TEMP")) == NULL) && ((tmpdir = getenv("TMP")) == NULL)) { tmpdir = "/tmp"; } (void) snprintf(addr, sz, "%s://%s/nuts%04x%04x%04x%04x", scheme, tmpdir, nng_random(), nng_random(), nng_random(), nng_random()); return; #endif } // We should not be here. abort(); } // nuts_next_port returns a "next" allocation port. // Ports are chosen by starting from a random point within a // range (normally 38000-40000, but other good places to choose // might be 36000-37000, 42000-43000, 45000-47000, 48000-49000. // These are non-ephemeral ports. Successive calls to this function // will return the next port in the range (wrapping). This works even // across process boundaries, as the range is tracked in a file named // by $TEST_PORT_FILE. The range of ports can be configured by using // $TEST_PORT_RANGE (the range is specified as "lo:hi" where the actual // port will be in the range [lo,hi). uint16_t nuts_next_port(void) { char * name; FILE * f; uint16_t port; uint16_t base; uint16_t end; char * str; #ifdef _WIN32 OVERLAPPED olp; HANDLE h; #endif if ((name = getenv("NUTS_PORT_FILE")) == NULL) { name = ".nuts_ports"; } if (((str = getenv("NUTS_PORT_RANGE")) == NULL) || ((sscanf(str, "%hu:%hu", &base, &end)) != 1) || ((int) end - (int) base) < 1) { base = 38000; end = 40000; } if (((f = fopen(name, "r+")) == NULL) && ((f = fopen(name, "w+")) == NULL)) { return (0); } (void) fseek(f, 0, SEEK_SET); #ifdef _WIN32 h = (HANDLE) _get_osfhandle(_fileno(f)); // This contains the offset information for LockFileEx. ZeroMemory(&olp, sizeof(olp)); if (LockFileEx(h, LOCKFILE_EXCLUSIVE_LOCK, 0, MAXDWORD, MAXDWORD, &olp) == FALSE) { fclose(f); return (0); } #else if (lockf(fileno(f), 0, F_LOCK) != 0) { (void) fclose(f); return (0); } #endif if (fscanf(f, "%hu", &port) != 1) { unsigned seed = (unsigned) time(NULL); #ifdef _WIN32 port = base + rand_s(&seed) % (end - base); #else port = base + rand_r(&seed) % (end - base); #endif } port++; if ((port < base) || (port >= (base + end))) { port = base; } #ifdef _WIN32 fseek(f, 0, SEEK_SET); SetEndOfFile(h); (void) fprintf(f, "%u", port); ZeroMemory(&olp, sizeof(olp)); (void) UnlockFileEx(h, 0, MAXDWORD, MAXDWORD, &olp); #else fseek(f, 0, SEEK_SET); if (ftruncate(fileno(f), 0) != 0) { (void) fclose(f); return (0); } (void) fprintf(f, "%u", port); (void) lockf(fileno(f), 0, F_ULOCK); #endif (void) fclose(f); return (port); } struct marriage_notice { nng_mtx *mx; nng_cv * cv; int s1; int s2; int cnt1; int cnt2; nng_pipe p1; nng_pipe p2; }; static void married(nng_pipe p, nng_pipe_ev ev, void *arg) { struct marriage_notice *notice = arg; (void) ev; nng_mtx_lock(notice->mx); if (nng_socket_id(nng_pipe_socket(p)) == notice->s1) { notice->cnt1++; notice->p1 = p; } else if (nng_socket_id(nng_pipe_socket(p)) == notice->s2) { notice->cnt2++; notice->p2 = p; } nng_cv_wake(notice->cv); nng_mtx_unlock(notice->mx); } int nuts_marry(nng_socket s1, nng_socket s2) { return (nuts_marry_ex(s1, s2, NULL, NULL, NULL)); } // NB: This function is always called with sufficient space to // hold the resulting expansion. static void replace_port_zero(const char *addr, char *buf, int port) { int i; int j; bool colon = false; char c; for (i = 0, j = 0; (c = addr[i]) != '\0'; i++) { if (colon && c == '0') { char num[16]; (void) snprintf(num, sizeof(num), "%d", port); memcpy(&buf[j], num, strlen(num)); j += (int) strlen(num); colon = false; continue; } colon = c == ':'; buf[j++] = c; } buf[j] = '\0'; } int nuts_marry_ex( nng_socket s1, nng_socket s2, const char *url, nng_pipe *p1, nng_pipe *p2) { struct marriage_notice note; nng_time timeout; int rv; char addr[64]; nng_listener l; int port; if (url == NULL) { (void) snprintf(addr, sizeof(addr), "inproc://marry%04x%04x%04x%04x", nng_random(), nng_random(), nng_random(), nng_random()); url = addr; } note.cnt1 = 0; note.cnt2 = 0; note.s1 = nng_socket_id(s1); note.s2 = nng_socket_id(s2); timeout = nng_clock() + 1000; // 1 second if (((rv = nng_mtx_alloc(¬e.mx)) != 0) || ((rv = nng_cv_alloc(¬e.cv, note.mx)) != 0) || ((rv = nng_pipe_notify( s1, NNG_PIPE_EV_ADD_POST, married, ¬e)) != 0) || ((rv = nng_pipe_notify( s2, NNG_PIPE_EV_ADD_POST, married, ¬e)) != 0) || ((rv = nng_listen(s1, url, &l, 0)) != 0)) { goto done; } // If a TCP port of zero was selected, let's ask for the actual // port bound. if ((strstr(url, ":0") != NULL) && (nng_listener_get_int(l, NNG_OPT_TCP_BOUND_PORT, &port) == 0) && (port > 0)) { replace_port_zero(url, addr, port); url = addr; } if ((rv = nng_dial(s2, url, NULL, 0)) != 0) { goto done; } nng_mtx_lock(note.mx); while ((note.cnt1 == 0) || (note.cnt2 == 0)) { if ((rv = nng_cv_until(note.cv, timeout)) != 0) { break; } } nng_mtx_unlock(note.mx); if (p1 != NULL) { *p1 = note.p1; } if (p2 != NULL) { *p2 = note.p2; } done: nng_pipe_notify(s1, NNG_PIPE_EV_ADD_POST, NULL, NULL); nng_pipe_notify(s2, NNG_PIPE_EV_ADD_POST, NULL, NULL); if (note.cv != NULL) { nng_cv_free(note.cv); } if (note.mx != NULL) { nng_mtx_free(note.mx); } return (rv); }