// // Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include "core/nng_impl.h" typedef struct nni_taskq_thr nni_taskq_thr; struct nni_taskq_thr { nni_taskq *tqt_tq; nni_thr tqt_thread; }; struct nni_taskq { nni_list tq_tasks; nni_mtx tq_mtx; nni_cv tq_sched_cv; nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; bool tq_run; }; static nni_taskq *nni_taskq_systq = NULL; static void nni_taskq_thread(void *self) { nni_taskq_thr *thr = self; nni_taskq * tq = thr->tqt_tq; nni_task * task; nni_thr_set_name(NULL, "nng:task"); nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { nni_list_remove(&tq->tq_tasks, task); nni_mtx_unlock(&tq->tq_mtx); task->task_cb(task->task_arg); nni_mtx_lock(&task->task_mtx); task->task_busy--; if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); nni_mtx_lock(&tq->tq_mtx); continue; } if (!tq->tq_run) { break; } nni_cv_wait(&tq->tq_sched_cv); } nni_mtx_unlock(&tq->tq_mtx); } int nni_taskq_init(nni_taskq **tqp, int nthr) { nni_taskq *tq; if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { return (NNG_ENOMEM); } if ((tq->tq_threads = NNI_ALLOC_STRUCTS(tq->tq_threads, nthr)) == NULL) { NNI_FREE_STRUCT(tq); return (NNG_ENOMEM); } tq->tq_nthreads = nthr; NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); nni_mtx_init(&tq->tq_mtx); nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx); nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx); for (int i = 0; i < nthr; i++) { int rv; tq->tq_threads[i].tqt_tq = tq; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { nni_taskq_fini(tq); return (rv); } } tq->tq_run = true; for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } *tqp = tq; return (0); } void nni_taskq_fini(nni_taskq *tq) { if (tq == NULL) { return; } if (tq->tq_run) { nni_mtx_lock(&tq->tq_mtx); tq->tq_run = false; nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_fini(&tq->tq_threads[i].tqt_thread); } nni_cv_fini(&tq->tq_wait_cv); nni_cv_fini(&tq->tq_sched_cv); nni_mtx_fini(&tq->tq_mtx); NNI_FREE_STRUCTS(tq->tq_threads, tq->tq_nthreads); NNI_FREE_STRUCT(tq); } void nni_task_exec(nni_task *task) { nni_mtx_lock(&task->task_mtx); if (task->task_prep) { task->task_prep = false; } else { task->task_busy++; } nni_mtx_unlock(&task->task_mtx); if (task->task_cb != NULL) { task->task_cb(task->task_arg); } nni_mtx_lock(&task->task_mtx); task->task_busy--; if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); } void nni_task_dispatch(nni_task *task) { nni_taskq *tq = task->task_tq; // If there is no callback to perform, then do nothing! // The user will be none the wiser. if (task->task_cb == NULL) { nni_task_exec(task); return; } nni_mtx_lock(&task->task_mtx); if (task->task_prep) { task->task_prep = false; } else { task->task_busy++; } nni_mtx_unlock(&task->task_mtx); nni_mtx_lock(&tq->tq_mtx); nni_list_append(&tq->tq_tasks, task); nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } void nni_task_prep(nni_task *task) { nni_mtx_lock(&task->task_mtx); task->task_busy++; task->task_prep = true; nni_mtx_unlock(&task->task_mtx); } void nni_task_abort(nni_task *task) { // This is called when unscheduling the task. nni_mtx_lock(&task->task_mtx); if (task->task_prep) { task->task_prep = false; task->task_busy--; if (task->task_busy == 0) { nni_cv_wake(&task->task_cv); } } nni_mtx_unlock(&task->task_mtx); } void nni_task_wait(nni_task *task) { nni_mtx_lock(&task->task_mtx); while (task->task_busy) { nni_cv_wait(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); } void nni_task_init(nni_task *task, nni_taskq *tq, nni_cb cb, void *arg) { NNI_LIST_NODE_INIT(&task->task_node); nni_mtx_init(&task->task_mtx); nni_cv_init(&task->task_cv, &task->task_mtx); task->task_prep = false; task->task_busy = 0; task->task_cb = cb; task->task_arg = arg; task->task_tq = tq != NULL ? tq : nni_taskq_systq; } void nni_task_fini(nni_task *task) { nni_mtx_lock(&task->task_mtx); while (task->task_busy) { nni_cv_wait(&task->task_cv); } nni_mtx_unlock(&task->task_mtx); nni_cv_fini(&task->task_cv); nni_mtx_fini(&task->task_mtx); } int nni_taskq_sys_init(void) { int nthrs; #ifndef NNG_NUM_TASKQ_THREADS nthrs = nni_plat_ncpu() * 2; #else nthrs = NNG_NUM_TASKQ_THREADS; #endif #if NNG_MAX_TASKQ_THREADS > 0 if (nthrs > NNG_MAX_TASKQ_THREADS) { nthrs = NNG_MAX_TASKQ_THREADS; } #endif return (nni_taskq_init(&nni_taskq_systq, nthrs)); } void nni_taskq_sys_fini(void) { nni_taskq_fini(nni_taskq_systq); nni_taskq_systq = NULL; }