// // Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include "core/nng_impl.h" #include struct nni_aio_expire_q { nni_mtx eq_mtx; nni_cv eq_cv; nni_list eq_list; uint32_t eq_len; nni_thr eq_thr; nni_time eq_next; // next expiration bool eq_exit; }; static nni_aio_expire_q **nni_aio_expire_q_list; static int nni_aio_expire_q_cnt; // Design notes. // // AIOs are only ever "completed" by the provider, which must call // one of the nni_aio_finish variants. Until this occurs, the provider // guarantees that the AIO is valid. The provider must guarantee that // an AIO will be "completed" (with a call to nni_aio_finish & friends) // exactly once. // // Note that the cancellation routine may be called by the framework // several times. The framework (or the consumer) guarantees that the // AIO will remain valid across these calls, so that the provider is // free to examine the aio for list membership, etc. The provider must // not call finish more than once though. // // We use an array of expiration queues, each with it's own lock and // condition variable, and expiration thread. By default this is one // per CPU core present -- the goal being to reduce overall pressure // caused by a single lock. The number of queues (and threads) can // be tuned using the NNG_EXPIRE_THREADS tunable. // // We will not permit an AIO // to be marked done if an expiration is outstanding. // // In order to synchronize with the expiration, we record the aio as // expiring, and wait for that record to be cleared (or at least not // equal to the aio) before destroying it. // // The aio framework is tightly bound up with the task framework. We // "prepare" the task for an aio when a caller marks an aio as starting // (with nni_aio_begin), and that marks the task as busy. Then, all we have // to do is wait for the task to complete (the busy flag to be cleared) // when we want to know if the operation itself is complete. // // In order to guard against aio reuse during teardown, we set the a_stop // flag. Any attempt to initialize for a new operation after that point // will fail and the caller will get NNG_ECANCELED indicating this. The // provider that calls nni_aio_begin() MUST check the return value, and // if it comes back nonzero (NNG_ECANCELED) then it must simply discard the // request and return. // // Calling nni_aio_wait waits for the current outstanding operation to // complete, but does not block another one from being started on the // same aio. To synchronously stop the aio and prevent any further // operations from starting on it, call nni_aio_stop. To prevent the // operations from starting, without waiting for any existing one to // complete, call nni_aio_close. // In some places we want to check that an aio is not in use. // Technically if these checks pass, then they should not need // to be done with a lock, because the caller should have the only // references to them. However, race detectors can't necessarily // know about this semantic, and may complain about potential data // races. To suppress false positives, define NNG_RACE_DETECTOR. // Note that this will cause extra locks to be acquired, affecting // performance, so don't use it in production. #ifdef __has_feature #if __has_feature(thread_sanitizer) #define NNG_RACE_DETECTOR #endif #endif #ifdef NNG_RACE_DETECTOR #define aio_safe_lock(l) nni_mtx_lock(l) #define aio_safe_unlock(l) nni_mtx_unlock(l) #else #define aio_safe_lock(l) #define aio_safe_unlock(l) #endif static nni_reap_list aio_reap_list = { .rl_offset = offsetof(nni_aio, a_reap_node), .rl_func = (nni_cb) nni_aio_free, }; static void nni_aio_expire_add(nni_aio *); static void nni_aio_expire_rm(nni_aio *); void nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { memset(aio, 0, sizeof(*aio)); nni_task_init(&aio->a_task, NULL, cb, arg); aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; aio->a_expire_q = nni_aio_expire_q_list[nni_random() % nni_aio_expire_q_cnt]; } void nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; void *arg; nni_aio_expire_q *eq = aio->a_expire_q; // This is like aio_close, but we don't want to dispatch // the task. And unlike aio_stop, we don't want to wait // for the task. (Because we implicitly do task_fini.) // We also wait if the aio is being expired. nni_mtx_lock(&eq->eq_mtx); aio->a_stop = true; while (aio->a_expiring) { nni_cv_wait(&eq->eq_cv); } nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { fn(aio, arg, NNG_ECLOSED); } nni_task_fini(&aio->a_task); } int nni_aio_alloc(nni_aio **aio_p, nni_cb cb, void *arg) { nni_aio *aio; if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { return (NNG_ENOMEM); } nni_aio_init(aio, cb, arg); *aio_p = aio; return (0); } void nni_aio_free(nni_aio *aio) { if (aio != NULL) { nni_aio_fini(aio); NNI_FREE_STRUCT(aio); } } void nni_aio_reap(nni_aio *aio) { if (aio != NULL) { nni_reap(&aio_reap_list, aio); } } int nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov) { if (nio > NNI_NUM_ELEMENTS((aio->a_iov))) { return (NNG_EINVAL); } // Sometimes we are resubmitting our own io vector, with // just a smaller count. We copy them only if we are not. if (iov != &aio->a_iov[0]) { for (unsigned i = 0; i < nio; i++) { aio->a_iov[i] = iov[i]; } } aio->a_nio = nio; return (0); } // nni_aio_stop cancels any outstanding operation, and waits for the // callback to complete, if still running. It also marks the AIO as // stopped, preventing further calls to nni_aio_begin from succeeding. // To correctly tear down an AIO, call stop, and make sure any other // callers are not also stopped, before calling nni_aio_free to release // actual memory. void nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; aio->a_stop = true; nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { fn(aio, arg, NNG_ECANCELED); } nni_aio_wait(aio); } } void nni_aio_close(nni_aio *aio) { if (aio != NULL) { nni_aio_cancel_fn fn; void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; aio->a_stop = true; nni_mtx_unlock(&eq->eq_mtx); if (fn != NULL) { fn(aio, arg, NNG_ECLOSED); } } } void nni_aio_set_timeout(nni_aio *aio, nni_duration when) { aio->a_timeout = when; } void nni_aio_set_msg(nni_aio *aio, nni_msg *msg) { aio->a_msg = msg; } nni_msg * nni_aio_get_msg(nni_aio *aio) { return (aio->a_msg); } void nni_aio_set_input(nni_aio *aio, unsigned index, void *data) { if (index < NNI_NUM_ELEMENTS(aio->a_inputs)) { aio->a_inputs[index] = data; } } void * nni_aio_get_input(nni_aio *aio, unsigned index) { if (index < NNI_NUM_ELEMENTS(aio->a_inputs)) { return (aio->a_inputs[index]); } return (NULL); } void nni_aio_set_output(nni_aio *aio, unsigned index, void *data) { if (index < NNI_NUM_ELEMENTS(aio->a_outputs)) { aio->a_outputs[index] = data; } } void * nni_aio_get_output(nni_aio *aio, unsigned index) { if (index < NNI_NUM_ELEMENTS(aio->a_outputs)) { return (aio->a_outputs[index]); } return (NULL); } int nni_aio_result(nni_aio *aio) { return (aio->a_result); } size_t nni_aio_count(nni_aio *aio) { return (aio->a_count); } void nni_aio_wait(nni_aio *aio) { nni_task_wait(&aio->a_task); } int nni_aio_begin(nni_aio *aio) { // If any of these triggers then the caller has a defect because // it means that the aio is already in use. This is always // a bug in the caller. These checks are not technically thread // safe in the event that they are false. Users of race detectors // checks may wish ignore or suppress these checks. nni_aio_expire_q *eq = aio->a_expire_q; aio_safe_lock(&eq->eq_mtx); NNI_ASSERT(!nni_aio_list_active(aio)); NNI_ASSERT(aio->a_cancel_fn == NULL); NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node)); // Some initialization can be done outside of the lock, because // we must have exclusive access to the aio. for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } aio->a_result = 0; aio->a_count = 0; aio->a_cancel_fn = NULL; aio_safe_unlock(&eq->eq_mtx); nni_mtx_lock(&eq->eq_mtx); // We should not reschedule anything at this point. if (aio->a_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; aio->a_expire_ok = false; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECANCELED); } nni_task_prep(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); return (0); } int nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { nni_aio_expire_q *eq = aio->a_expire_q; if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: nni_task_abort(&aio->a_task); return (NNG_ETIMEDOUT); case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; break; default: aio->a_expire = nni_clock() + aio->a_timeout; break; } } nni_mtx_lock(&eq->eq_mtx); if (aio->a_stop) { nni_task_abort(&aio->a_task); nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECLOSED); } NNI_ASSERT(aio->a_cancel_fn == NULL); aio->a_cancel_fn = cancel; aio->a_cancel_arg = data; // We only schedule expiration if we have a way for the expiration // handler to actively cancel it. if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { nni_aio_expire_add(aio); } nni_mtx_unlock(&eq->eq_mtx); return (0); } // nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); nni_aio_expire_rm(aio); fn = aio->a_cancel_fn; arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; nni_mtx_unlock(&eq->eq_mtx); // Stop any I/O at the provider level. if (fn != NULL) { fn(aio, arg, rv); } } // I/O provider related functions. static void nni_aio_finish_impl( nni_aio *aio, int rv, size_t count, nni_msg *msg, bool sync) { nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); nni_aio_expire_rm(aio); aio->a_result = rv; aio->a_count = count; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; if (msg) { aio->a_msg = msg; } aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; nni_mtx_unlock(&eq->eq_mtx); if (sync) { nni_task_exec(&aio->a_task); } else { nni_task_dispatch(&aio->a_task); } } void nni_aio_finish(nni_aio *aio, int result, size_t count) { nni_aio_finish_impl(aio, result, count, NULL, false); } void nni_aio_finish_sync(nni_aio *aio, int result, size_t count) { nni_aio_finish_impl(aio, result, count, NULL, true); } void nni_aio_finish_error(nni_aio *aio, int result) { nni_aio_finish_impl(aio, result, 0, NULL, false); } void nni_aio_finish_msg(nni_aio *aio, nni_msg *msg) { NNI_ASSERT(msg != NULL); nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg, false); } void nni_aio_list_init(nni_list *list) { NNI_LIST_INIT(list, nni_aio, a_prov_node); } void nni_aio_list_append(nni_list *list, nni_aio *aio) { nni_aio_list_remove(aio); nni_list_append(list, aio); } void nni_aio_list_remove(nni_aio *aio) { nni_list_node_remove(&aio->a_prov_node); } int nni_aio_list_active(nni_aio *aio) { return (nni_list_node_active(&aio->a_prov_node)); } static void nni_aio_expire_add(nni_aio *aio) { nni_aio_expire_q *eq = aio->a_expire_q; nni_list_append(&eq->eq_list, aio); if (eq->eq_next > aio->a_expire) { eq->eq_next = aio->a_expire; nni_cv_wake(&eq->eq_cv); } } static void nni_aio_expire_rm(nni_aio *aio) { nni_list_node_remove(&aio->a_expire_node); // If this item is the one that is going to wake the loop, // don't worry about it. It will wake up normally, or when we // add a new aio to it. Worst case is just one spurious wake up, // which we'd need to do anyway. } static void nni_aio_expire_loop(void *arg) { nni_aio_expire_q *q = arg; nni_mtx *mtx = &q->eq_mtx; nni_cv *cv = &q->eq_cv; nni_time now; uint32_t exp_idx; nni_aio *expires[NNI_EXPIRE_BATCH]; nni_thr_set_name(NULL, "nng:aio:expire"); nni_mtx_lock(mtx); for (;;) { nni_aio *aio; int rv; nni_time next; next = q->eq_next; now = nni_clock(); // Each time we wake up, we scan the entire list of elements. // We scan forward, moving up to NNI_EXPIRE_Q_SIZE elements // (a batch) to a saved array of things we are going to cancel. // This mostly runs in O(n), provided you don't have many // elements (> NNI_EXPIRE_Q_SIZE) all expiring simultaneously. aio = nni_list_first(&q->eq_list); if ((aio == NULL) && (q->eq_exit)) { nni_mtx_unlock(mtx); return; } if (now < next) { // Early wake up (just to reschedule), no need to // rescan the list. This is an optimization. nni_cv_until(cv, next); continue; } q->eq_next = NNI_TIME_NEVER; exp_idx = 0; while (aio != NULL) { if ((aio->a_expire < now) && (exp_idx < NNI_EXPIRE_BATCH)) { nni_aio *nxt; // This one is expiring. expires[exp_idx++] = aio; // save the next node nxt = nni_list_next(&q->eq_list, aio); nni_list_remove(&q->eq_list, aio); // Place a temporary hold on the aio. // This prevents it from being destroyed. aio->a_expiring = true; aio = nxt; continue; } if (aio->a_expire < q->eq_next) { q->eq_next = aio->a_expire; } aio = nni_list_next(&q->eq_list, aio); } for (uint32_t i = 0; i < exp_idx; i++) { aio = expires[i]; rv = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; nni_aio_cancel_fn cancel_fn = aio->a_cancel_fn; void *cancel_arg = aio->a_cancel_arg; aio->a_cancel_fn = NULL; aio->a_cancel_arg = NULL; // We let the cancel function handle the completion. // If there is no cancellation function, then we cannot // terminate the aio - we've tried, but it has to run // to its natural conclusion. if (cancel_fn != NULL) { nni_mtx_unlock(mtx); cancel_fn(aio, cancel_arg, rv); nni_mtx_lock(mtx); } aio->a_expiring = false; } nni_cv_wake(cv); if (now < q->eq_next) { nni_cv_until(cv, q->eq_next); } } } void * nni_aio_get_prov_extra(nni_aio *aio, unsigned index) { return (aio->a_prov_extra[index]); } void nni_aio_set_prov_extra(nni_aio *aio, unsigned index, void *data) { aio->a_prov_extra[index] = data; } void nni_aio_get_iov(nni_aio *aio, unsigned *nio_p, nni_iov **iov_p) { *nio_p = aio->a_nio; *iov_p = aio->a_iov; } void nni_aio_normalize_timeout(nni_aio *aio, nng_duration dur) { if (aio->a_timeout == NNG_DURATION_DEFAULT) { aio->a_timeout = dur; } } void nni_aio_bump_count(nni_aio *aio, size_t n) { aio->a_count += n; } size_t nni_aio_iov_count(nni_aio *aio) { size_t residual = 0; for (unsigned i = 0; i < aio->a_nio; i++) { residual += aio->a_iov[i].iov_len; } return (residual); } size_t nni_aio_iov_advance(nni_aio *aio, size_t n) { size_t residual = n; while (n) { NNI_ASSERT(aio->a_nio != 0); if (aio->a_iov[0].iov_len > n) { aio->a_iov[0].iov_len -= n; NNI_INCPTR(aio->a_iov[0].iov_buf, n); return (0); // we used all of "n" } residual -= aio->a_iov[0].iov_len; n -= aio->a_iov[0].iov_len; aio->a_nio--; for (unsigned i = 0; i < aio->a_nio; i++) { aio->a_iov[i] = aio->a_iov[i + 1]; } } return (residual); // we might not have used all of n for this iov } static void nni_sleep_cancel(nng_aio *aio, void *arg, int rv) { NNI_ARG_UNUSED(arg); nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); if (!aio->a_sleep) { nni_mtx_unlock(&eq->eq_mtx); return; } aio->a_sleep = false; nni_aio_expire_rm(aio); nni_mtx_unlock(&eq->eq_mtx); nni_aio_finish_error(aio, rv); } void nni_sleep_aio(nng_duration ms, nng_aio *aio) { int rv; if (nni_aio_begin(aio) != 0) { return; } aio->a_expire_ok = true; aio->a_sleep = true; switch (aio->a_timeout) { case NNG_DURATION_DEFAULT: case NNG_DURATION_INFINITE: // No premature timeout, honor our expected values. break; default: // If the timeout on the aio is shorter than our sleep time, // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { aio->a_expire_ok = false; ms = aio->a_timeout; } } aio->a_expire = nni_clock() + ms; if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { nni_aio_finish_error(aio, rv); } } static void nni_aio_expire_q_free(nni_aio_expire_q *eq) { if (eq == NULL) { return; } if (!eq->eq_exit) { nni_mtx_lock(&eq->eq_mtx); eq->eq_exit = true; nni_cv_wake(&eq->eq_cv); nni_mtx_unlock(&eq->eq_mtx); } nni_thr_fini(&eq->eq_thr); nni_cv_fini(&eq->eq_cv); nni_mtx_fini(&eq->eq_mtx); NNI_FREE_STRUCT(eq); } static nni_aio_expire_q * nni_aio_expire_q_alloc(void) { nni_aio_expire_q *eq; if ((eq = NNI_ALLOC_STRUCT(eq)) == NULL) { return (NULL); } nni_mtx_init(&eq->eq_mtx); nni_cv_init(&eq->eq_cv, &eq->eq_mtx); NNI_LIST_INIT(&eq->eq_list, nni_aio, a_expire_node); eq->eq_next = NNI_TIME_NEVER; eq->eq_exit = false; if (nni_thr_init(&eq->eq_thr, nni_aio_expire_loop, eq) != 0) { nni_aio_expire_q_free(eq); return (NULL); } nni_thr_run(&eq->eq_thr); return (eq); } void nni_aio_sys_fini(void) { for (int i = 0; i < nni_aio_expire_q_cnt; i++) { nni_aio_expire_q_free(nni_aio_expire_q_list[i]); } nni_free(nni_aio_expire_q_list, sizeof(nni_aio_expire_q *) * nni_aio_expire_q_cnt); nni_aio_expire_q_cnt = 0; nni_aio_expire_q_list = NULL; } int nni_aio_sys_init(void) { int num_thr; // We create a thread per CPU core for expiration by default. #ifndef NNG_EXPIRE_THREADS num_thr = nni_plat_ncpu(); #else num_thr = NNG_EXPIRE_THREADS; #endif if (num_thr > 256) { num_thr = 256; } nni_aio_expire_q_list = nni_zalloc(sizeof(nni_aio_expire_q *) * num_thr); nni_aio_expire_q_cnt = num_thr; for (int i = 0; i < num_thr; i++) { nni_aio_expire_q *eq; if ((eq = nni_aio_expire_q_alloc()) == NULL) { nni_aio_sys_fini(); return (NNG_ENOMEM); } nni_aio_expire_q_list[i] = eq; } return (0); }