The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/contrib/openzfs/module/os/freebsd/spl/spl_taskq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org>
    3  * All rights reserved.
    4  *
    5  * Copyright (c) 2012 Spectra Logic Corporation.  All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD$");
   31 
   32 #include <sys/param.h>
   33 #include <sys/ck.h>
   34 #include <sys/epoch.h>
   35 #include <sys/kernel.h>
   36 #include <sys/kmem.h>
   37 #include <sys/lock.h>
   38 #include <sys/mutex.h>
   39 #include <sys/queue.h>
   40 #include <sys/taskq.h>
   41 #include <sys/taskqueue.h>
   42 #include <sys/zfs_context.h>
   43 
   44 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__)
   45 #include <machine/pcb.h>
   46 #endif
   47 
   48 #include <vm/uma.h>
   49 
   50 #if __FreeBSD_version < 1201522
   51 #define taskqueue_start_threads_in_proc(tqp, count, pri, proc, name, ...) \
   52     taskqueue_start_threads(tqp, count, pri, name, __VA_ARGS__)
   53 #endif
   54 
   55 static uint_t taskq_tsd;
   56 static uma_zone_t taskq_zone;
   57 
   58 /*
   59  * Global system-wide dynamic task queue available for all consumers. This
   60  * taskq is not intended for long-running tasks; instead, a dedicated taskq
   61  * should be created.
   62  */
   63 taskq_t *system_taskq = NULL;
   64 taskq_t *system_delay_taskq = NULL;
   65 taskq_t *dynamic_taskq = NULL;
   66 
   67 proc_t *system_proc;
   68 
   69 extern int uma_align_cache;
   70 
   71 static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures");
   72 
   73 static CK_LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl;
   74 static unsigned long tqenthash;
   75 static unsigned long tqenthashlock;
   76 static struct sx *tqenthashtbl_lock;
   77 
   78 static taskqid_t tqidnext;
   79 
   80 #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash])
   81 #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)])
   82 
   83 #define TIMEOUT_TASK 1
   84 #define NORMAL_TASK 2
   85 
   86 static void
   87 system_taskq_init(void *arg)
   88 {
   89         int i;
   90 
   91         tsd_create(&taskq_tsd, NULL);
   92         tqenthashtbl = hashinit(mp_ncpus * 8, M_TASKQ, &tqenthash);
   93         tqenthashlock = (tqenthash + 1) / 8;
   94         if (tqenthashlock > 0)
   95                 tqenthashlock--;
   96         tqenthashtbl_lock =
   97             malloc(sizeof (*tqenthashtbl_lock) * (tqenthashlock + 1),
   98             M_TASKQ, M_WAITOK | M_ZERO);
   99         for (i = 0; i < tqenthashlock + 1; i++)
  100                 sx_init_flags(&tqenthashtbl_lock[i], "tqenthash", SX_DUPOK);
  101         taskq_zone = uma_zcreate("taskq_zone", sizeof (taskq_ent_t),
  102             NULL, NULL, NULL, NULL,
  103             UMA_ALIGN_CACHE, 0);
  104         system_taskq = taskq_create("system_taskq", mp_ncpus, minclsyspri,
  105             0, 0, 0);
  106         system_delay_taskq = taskq_create("system_delay_taskq", mp_ncpus,
  107             minclsyspri, 0, 0, 0);
  108 }
  109 SYSINIT(system_taskq_init, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_init,
  110     NULL);
  111 
  112 static void
  113 system_taskq_fini(void *arg)
  114 {
  115         int i;
  116 
  117         taskq_destroy(system_delay_taskq);
  118         taskq_destroy(system_taskq);
  119         uma_zdestroy(taskq_zone);
  120         tsd_destroy(&taskq_tsd);
  121         for (i = 0; i < tqenthashlock + 1; i++)
  122                 sx_destroy(&tqenthashtbl_lock[i]);
  123         for (i = 0; i < tqenthash + 1; i++)
  124                 VERIFY(CK_LIST_EMPTY(&tqenthashtbl[i]));
  125         free(tqenthashtbl_lock, M_TASKQ);
  126         free(tqenthashtbl, M_TASKQ);
  127 }
  128 SYSUNINIT(system_taskq_fini, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_fini,
  129     NULL);
  130 
  131 #ifdef __LP64__
  132 static taskqid_t
  133 __taskq_genid(void)
  134 {
  135         taskqid_t tqid;
  136 
  137         /*
  138          * Assume a 64-bit counter will not wrap in practice.
  139          */
  140         tqid = atomic_add_64_nv(&tqidnext, 1);
  141         VERIFY(tqid);
  142         return (tqid);
  143 }
  144 #else
  145 static taskqid_t
  146 __taskq_genid(void)
  147 {
  148         taskqid_t tqid;
  149 
  150         for (;;) {
  151                 tqid = atomic_add_32_nv(&tqidnext, 1);
  152                 if (__predict_true(tqid != 0))
  153                         break;
  154         }
  155         VERIFY(tqid);
  156         return (tqid);
  157 }
  158 #endif
  159 
  160 static taskq_ent_t *
  161 taskq_lookup(taskqid_t tqid)
  162 {
  163         taskq_ent_t *ent = NULL;
  164 
  165         sx_xlock(TQIDHASHLOCK(tqid));
  166         CK_LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) {
  167                 if (ent->tqent_id == tqid)
  168                         break;
  169         }
  170         if (ent != NULL)
  171                 refcount_acquire(&ent->tqent_rc);
  172         sx_xunlock(TQIDHASHLOCK(tqid));
  173         return (ent);
  174 }
  175 
  176 static taskqid_t
  177 taskq_insert(taskq_ent_t *ent)
  178 {
  179         taskqid_t tqid;
  180 
  181         tqid = __taskq_genid();
  182         ent->tqent_id = tqid;
  183         ent->tqent_registered = B_TRUE;
  184         sx_xlock(TQIDHASHLOCK(tqid));
  185         CK_LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash);
  186         sx_xunlock(TQIDHASHLOCK(tqid));
  187         return (tqid);
  188 }
  189 
  190 static void
  191 taskq_remove(taskq_ent_t *ent)
  192 {
  193         taskqid_t tqid = ent->tqent_id;
  194 
  195         if (!ent->tqent_registered)
  196                 return;
  197 
  198         sx_xlock(TQIDHASHLOCK(tqid));
  199         CK_LIST_REMOVE(ent, tqent_hash);
  200         sx_xunlock(TQIDHASHLOCK(tqid));
  201         ent->tqent_registered = B_FALSE;
  202 }
  203 
  204 static void
  205 taskq_tsd_set(void *context)
  206 {
  207         taskq_t *tq = context;
  208 
  209 #if defined(__amd64__) || defined(__aarch64__) 
  210         if (context != NULL && tsd_get(taskq_tsd) == NULL)
  211                 fpu_kern_thread(FPU_KERN_NORMAL);
  212 #endif
  213         tsd_set(taskq_tsd, tq);
  214 }
  215 
  216 static taskq_t *
  217 taskq_create_impl(const char *name, int nthreads, pri_t pri,
  218     proc_t *proc __maybe_unused, uint_t flags)
  219 {
  220         taskq_t *tq;
  221 
  222         if ((flags & TASKQ_THREADS_CPU_PCT) != 0)
  223                 nthreads = MAX((mp_ncpus * nthreads) / 100, 1);
  224 
  225         tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
  226         tq->tq_queue = taskqueue_create(name, M_WAITOK,
  227             taskqueue_thread_enqueue, &tq->tq_queue);
  228         taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
  229             taskq_tsd_set, tq);
  230         taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN,
  231             taskq_tsd_set, NULL);
  232         (void) taskqueue_start_threads_in_proc(&tq->tq_queue, nthreads, pri,
  233             proc, "%s", name);
  234 
  235         return ((taskq_t *)tq);
  236 }
  237 
  238 taskq_t *
  239 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc __unused,
  240     int maxalloc __unused, uint_t flags)
  241 {
  242         return (taskq_create_impl(name, nthreads, pri, system_proc, flags));
  243 }
  244 
  245 taskq_t *
  246 taskq_create_proc(const char *name, int nthreads, pri_t pri,
  247     int minalloc __unused, int maxalloc __unused, proc_t *proc, uint_t flags)
  248 {
  249         return (taskq_create_impl(name, nthreads, pri, proc, flags));
  250 }
  251 
  252 void
  253 taskq_destroy(taskq_t *tq)
  254 {
  255 
  256         taskqueue_free(tq->tq_queue);
  257         kmem_free(tq, sizeof (*tq));
  258 }
  259 
  260 int
  261 taskq_member(taskq_t *tq, kthread_t *thread)
  262 {
  263 
  264         return (taskqueue_member(tq->tq_queue, thread));
  265 }
  266 
  267 taskq_t *
  268 taskq_of_curthread(void)
  269 {
  270         return (tsd_get(taskq_tsd));
  271 }
  272 
  273 static void
  274 taskq_free(taskq_ent_t *task)
  275 {
  276         taskq_remove(task);
  277         if (refcount_release(&task->tqent_rc))
  278                 uma_zfree(taskq_zone, task);
  279 }
  280 
  281 int
  282 taskq_cancel_id(taskq_t *tq, taskqid_t tid)
  283 {
  284         uint32_t pend;
  285         int rc;
  286         taskq_ent_t *ent;
  287 
  288         if (tid == 0)
  289                 return (0);
  290 
  291         if ((ent = taskq_lookup(tid)) == NULL)
  292                 return (0);
  293 
  294         ent->tqent_cancelled = B_TRUE;
  295         if (ent->tqent_type == TIMEOUT_TASK) {
  296                 rc = taskqueue_cancel_timeout(tq->tq_queue,
  297                     &ent->tqent_timeout_task, &pend);
  298         } else
  299                 rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend);
  300         if (rc == EBUSY) {
  301                 taskqueue_drain(tq->tq_queue, &ent->tqent_task);
  302         } else if (pend) {
  303                 /*
  304                  * Tasks normally free themselves when run, but here the task
  305                  * was cancelled so it did not free itself.
  306                  */
  307                 taskq_free(ent);
  308         }
  309         /* Free the extra reference we added with taskq_lookup. */
  310         taskq_free(ent);
  311         return (rc);
  312 }
  313 
  314 static void
  315 taskq_run(void *arg, int pending __unused)
  316 {
  317         taskq_ent_t *task = arg;
  318 
  319         if (!task->tqent_cancelled)
  320                 task->tqent_func(task->tqent_arg);
  321         taskq_free(task);
  322 }
  323 
  324 taskqid_t
  325 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
  326     uint_t flags, clock_t expire_time)
  327 {
  328         taskq_ent_t *task;
  329         taskqid_t tqid;
  330         clock_t timo;
  331         int mflag;
  332 
  333         timo = expire_time - ddi_get_lbolt();
  334         if (timo <= 0)
  335                 return (taskq_dispatch(tq, func, arg, flags));
  336 
  337         if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP)
  338                 mflag = M_WAITOK;
  339         else
  340                 mflag = M_NOWAIT;
  341 
  342         task = uma_zalloc(taskq_zone, mflag);
  343         if (task == NULL)
  344                 return (0);
  345         task->tqent_func = func;
  346         task->tqent_arg = arg;
  347         task->tqent_type = TIMEOUT_TASK;
  348         task->tqent_cancelled = B_FALSE;
  349         refcount_init(&task->tqent_rc, 1);
  350         tqid = taskq_insert(task);
  351         TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0,
  352             taskq_run, task);
  353 
  354         taskqueue_enqueue_timeout(tq->tq_queue, &task->tqent_timeout_task,
  355             timo);
  356         return (tqid);
  357 }
  358 
  359 taskqid_t
  360 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
  361 {
  362         taskq_ent_t *task;
  363         int mflag, prio;
  364         taskqid_t tqid;
  365 
  366         if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP)
  367                 mflag = M_WAITOK;
  368         else
  369                 mflag = M_NOWAIT;
  370         /*
  371          * If TQ_FRONT is given, we want higher priority for this task, so it
  372          * can go at the front of the queue.
  373          */
  374         prio = !!(flags & TQ_FRONT);
  375 
  376         task = uma_zalloc(taskq_zone, mflag);
  377         if (task == NULL)
  378                 return (0);
  379         refcount_init(&task->tqent_rc, 1);
  380         task->tqent_func = func;
  381         task->tqent_arg = arg;
  382         task->tqent_cancelled = B_FALSE;
  383         task->tqent_type = NORMAL_TASK;
  384         tqid = taskq_insert(task);
  385         TASK_INIT(&task->tqent_task, prio, taskq_run, task);
  386         taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
  387         return (tqid);
  388 }
  389 
  390 static void
  391 taskq_run_ent(void *arg, int pending __unused)
  392 {
  393         taskq_ent_t *task = arg;
  394 
  395         task->tqent_func(task->tqent_arg);
  396 }
  397 
  398 void
  399 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags,
  400     taskq_ent_t *task)
  401 {
  402         int prio;
  403 
  404         /*
  405          * If TQ_FRONT is given, we want higher priority for this task, so it
  406          * can go at the front of the queue.
  407          */
  408         prio = !!(flags & TQ_FRONT);
  409         task->tqent_cancelled = B_FALSE;
  410         task->tqent_registered = B_FALSE;
  411         task->tqent_id = 0;
  412         task->tqent_func = func;
  413         task->tqent_arg = arg;
  414 
  415         TASK_INIT(&task->tqent_task, prio, taskq_run_ent, task);
  416         taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
  417 }
  418 
  419 void
  420 taskq_wait(taskq_t *tq)
  421 {
  422         taskqueue_quiesce(tq->tq_queue);
  423 }
  424 
  425 void
  426 taskq_wait_id(taskq_t *tq, taskqid_t tid)
  427 {
  428         taskq_ent_t *ent;
  429 
  430         if (tid == 0)
  431                 return;
  432         if ((ent = taskq_lookup(tid)) == NULL)
  433                 return;
  434 
  435         taskqueue_drain(tq->tq_queue, &ent->tqent_task);
  436         taskq_free(ent);
  437 }
  438 
  439 void
  440 taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused)
  441 {
  442         taskqueue_drain_all(tq->tq_queue);
  443 }
  444 
  445 int
  446 taskq_empty_ent(taskq_ent_t *t)
  447 {
  448         return (t->tqent_task.ta_pending == 0);
  449 }

Cache object: 1869a30a34232f5a6008cfa2626e6e33


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.