The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/contrib/openzfs/lib/libzpool/taskq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * CDDL HEADER START
    3  *
    4  * The contents of this file are subject to the terms of the
    5  * Common Development and Distribution License (the "License").
    6  * You may not use this file except in compliance with the License.
    7  *
    8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
    9  * or https://opensource.org/licenses/CDDL-1.0.
   10  * See the License for the specific language governing permissions
   11  * and limitations under the License.
   12  *
   13  * When distributing Covered Code, include this CDDL HEADER in each
   14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
   15  * If applicable, add the following below this CDDL HEADER, with the
   16  * fields enclosed by brackets "[]" replaced with your own identifying
   17  * information: Portions Copyright [yyyy] [name of copyright owner]
   18  *
   19  * CDDL HEADER END
   20  */
   21 /*
   22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
   23  * Use is subject to license terms.
   24  */
   25 /*
   26  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
   27  * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
   28  * Copyright (c) 2014 by Delphix. All rights reserved.
   29  */
   30 
   31 #include <sys/zfs_context.h>
   32 
   33 int taskq_now;
   34 taskq_t *system_taskq;
   35 taskq_t *system_delay_taskq;
   36 
   37 static pthread_key_t taskq_tsd;
   38 
   39 #define TASKQ_ACTIVE    0x00010000
   40 
   41 static taskq_ent_t *
   42 task_alloc(taskq_t *tq, int tqflags)
   43 {
   44         taskq_ent_t *t;
   45         int rv;
   46 
   47 again:  if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
   48                 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
   49                 tq->tq_freelist = t->tqent_next;
   50         } else {
   51                 if (tq->tq_nalloc >= tq->tq_maxalloc) {
   52                         if (!(tqflags & KM_SLEEP))
   53                                 return (NULL);
   54 
   55                         /*
   56                          * We don't want to exceed tq_maxalloc, but we can't
   57                          * wait for other tasks to complete (and thus free up
   58                          * task structures) without risking deadlock with
   59                          * the caller.  So, we just delay for one second
   60                          * to throttle the allocation rate. If we have tasks
   61                          * complete before one second timeout expires then
   62                          * taskq_ent_free will signal us and we will
   63                          * immediately retry the allocation.
   64                          */
   65                         tq->tq_maxalloc_wait++;
   66                         rv = cv_timedwait(&tq->tq_maxalloc_cv,
   67                             &tq->tq_lock, ddi_get_lbolt() + hz);
   68                         tq->tq_maxalloc_wait--;
   69                         if (rv > 0)
   70                                 goto again;             /* signaled */
   71                 }
   72                 mutex_exit(&tq->tq_lock);
   73 
   74                 t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
   75 
   76                 mutex_enter(&tq->tq_lock);
   77                 if (t != NULL) {
   78                         /* Make sure we start without any flags */
   79                         t->tqent_flags = 0;
   80                         tq->tq_nalloc++;
   81                 }
   82         }
   83         return (t);
   84 }
   85 
   86 static void
   87 task_free(taskq_t *tq, taskq_ent_t *t)
   88 {
   89         if (tq->tq_nalloc <= tq->tq_minalloc) {
   90                 t->tqent_next = tq->tq_freelist;
   91                 tq->tq_freelist = t;
   92         } else {
   93                 tq->tq_nalloc--;
   94                 mutex_exit(&tq->tq_lock);
   95                 kmem_free(t, sizeof (taskq_ent_t));
   96                 mutex_enter(&tq->tq_lock);
   97         }
   98 
   99         if (tq->tq_maxalloc_wait)
  100                 cv_signal(&tq->tq_maxalloc_cv);
  101 }
  102 
  103 taskqid_t
  104 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
  105 {
  106         taskq_ent_t *t;
  107 
  108         if (taskq_now) {
  109                 func(arg);
  110                 return (1);
  111         }
  112 
  113         mutex_enter(&tq->tq_lock);
  114         ASSERT(tq->tq_flags & TASKQ_ACTIVE);
  115         if ((t = task_alloc(tq, tqflags)) == NULL) {
  116                 mutex_exit(&tq->tq_lock);
  117                 return (0);
  118         }
  119         if (tqflags & TQ_FRONT) {
  120                 t->tqent_next = tq->tq_task.tqent_next;
  121                 t->tqent_prev = &tq->tq_task;
  122         } else {
  123                 t->tqent_next = &tq->tq_task;
  124                 t->tqent_prev = tq->tq_task.tqent_prev;
  125         }
  126         t->tqent_next->tqent_prev = t;
  127         t->tqent_prev->tqent_next = t;
  128         t->tqent_func = func;
  129         t->tqent_arg = arg;
  130         t->tqent_flags = 0;
  131         cv_signal(&tq->tq_dispatch_cv);
  132         mutex_exit(&tq->tq_lock);
  133         return (1);
  134 }
  135 
  136 taskqid_t
  137 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags,
  138     clock_t expire_time)
  139 {
  140         (void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time;
  141         return (0);
  142 }
  143 
  144 int
  145 taskq_empty_ent(taskq_ent_t *t)
  146 {
  147         return (t->tqent_next == NULL);
  148 }
  149 
  150 void
  151 taskq_init_ent(taskq_ent_t *t)
  152 {
  153         t->tqent_next = NULL;
  154         t->tqent_prev = NULL;
  155         t->tqent_func = NULL;
  156         t->tqent_arg = NULL;
  157         t->tqent_flags = 0;
  158 }
  159 
  160 void
  161 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
  162     taskq_ent_t *t)
  163 {
  164         ASSERT(func != NULL);
  165 
  166         /*
  167          * Mark it as a prealloc'd task.  This is important
  168          * to ensure that we don't free it later.
  169          */
  170         t->tqent_flags |= TQENT_FLAG_PREALLOC;
  171         /*
  172          * Enqueue the task to the underlying queue.
  173          */
  174         mutex_enter(&tq->tq_lock);
  175 
  176         if (flags & TQ_FRONT) {
  177                 t->tqent_next = tq->tq_task.tqent_next;
  178                 t->tqent_prev = &tq->tq_task;
  179         } else {
  180                 t->tqent_next = &tq->tq_task;
  181                 t->tqent_prev = tq->tq_task.tqent_prev;
  182         }
  183         t->tqent_next->tqent_prev = t;
  184         t->tqent_prev->tqent_next = t;
  185         t->tqent_func = func;
  186         t->tqent_arg = arg;
  187         cv_signal(&tq->tq_dispatch_cv);
  188         mutex_exit(&tq->tq_lock);
  189 }
  190 
  191 void
  192 taskq_wait(taskq_t *tq)
  193 {
  194         mutex_enter(&tq->tq_lock);
  195         while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
  196                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
  197         mutex_exit(&tq->tq_lock);
  198 }
  199 
  200 void
  201 taskq_wait_id(taskq_t *tq, taskqid_t id)
  202 {
  203         (void) id;
  204         taskq_wait(tq);
  205 }
  206 
  207 void
  208 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
  209 {
  210         (void) id;
  211         taskq_wait(tq);
  212 }
  213 
  214 static __attribute__((noreturn)) void
  215 taskq_thread(void *arg)
  216 {
  217         taskq_t *tq = arg;
  218         taskq_ent_t *t;
  219         boolean_t prealloc;
  220 
  221         VERIFY0(pthread_setspecific(taskq_tsd, tq));
  222 
  223         mutex_enter(&tq->tq_lock);
  224         while (tq->tq_flags & TASKQ_ACTIVE) {
  225                 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
  226                         if (--tq->tq_active == 0)
  227                                 cv_broadcast(&tq->tq_wait_cv);
  228                         cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
  229                         tq->tq_active++;
  230                         continue;
  231                 }
  232                 t->tqent_prev->tqent_next = t->tqent_next;
  233                 t->tqent_next->tqent_prev = t->tqent_prev;
  234                 t->tqent_next = NULL;
  235                 t->tqent_prev = NULL;
  236                 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
  237                 mutex_exit(&tq->tq_lock);
  238 
  239                 rw_enter(&tq->tq_threadlock, RW_READER);
  240                 t->tqent_func(t->tqent_arg);
  241                 rw_exit(&tq->tq_threadlock);
  242 
  243                 mutex_enter(&tq->tq_lock);
  244                 if (!prealloc)
  245                         task_free(tq, t);
  246         }
  247         tq->tq_nthreads--;
  248         cv_broadcast(&tq->tq_wait_cv);
  249         mutex_exit(&tq->tq_lock);
  250         thread_exit();
  251 }
  252 
  253 taskq_t *
  254 taskq_create(const char *name, int nthreads, pri_t pri,
  255     int minalloc, int maxalloc, uint_t flags)
  256 {
  257         (void) pri;
  258         taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
  259         int t;
  260 
  261         if (flags & TASKQ_THREADS_CPU_PCT) {
  262                 int pct;
  263                 ASSERT3S(nthreads, >=, 0);
  264                 ASSERT3S(nthreads, <=, 100);
  265                 pct = MIN(nthreads, 100);
  266                 pct = MAX(pct, 0);
  267 
  268                 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
  269                 nthreads = MAX(nthreads, 1);    /* need at least 1 thread */
  270         } else {
  271                 ASSERT3S(nthreads, >=, 1);
  272         }
  273 
  274         rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
  275         mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
  276         cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
  277         cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
  278         cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
  279         (void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name));
  280         tq->tq_flags = flags | TASKQ_ACTIVE;
  281         tq->tq_active = nthreads;
  282         tq->tq_nthreads = nthreads;
  283         tq->tq_minalloc = minalloc;
  284         tq->tq_maxalloc = maxalloc;
  285         tq->tq_task.tqent_next = &tq->tq_task;
  286         tq->tq_task.tqent_prev = &tq->tq_task;
  287         tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *),
  288             KM_SLEEP);
  289 
  290         if (flags & TASKQ_PREPOPULATE) {
  291                 mutex_enter(&tq->tq_lock);
  292                 while (minalloc-- > 0)
  293                         task_free(tq, task_alloc(tq, KM_SLEEP));
  294                 mutex_exit(&tq->tq_lock);
  295         }
  296 
  297         for (t = 0; t < nthreads; t++)
  298                 VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0,
  299                     taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL);
  300 
  301         return (tq);
  302 }
  303 
  304 void
  305 taskq_destroy(taskq_t *tq)
  306 {
  307         int nthreads = tq->tq_nthreads;
  308 
  309         taskq_wait(tq);
  310 
  311         mutex_enter(&tq->tq_lock);
  312 
  313         tq->tq_flags &= ~TASKQ_ACTIVE;
  314         cv_broadcast(&tq->tq_dispatch_cv);
  315 
  316         while (tq->tq_nthreads != 0)
  317                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
  318 
  319         tq->tq_minalloc = 0;
  320         while (tq->tq_nalloc != 0) {
  321                 ASSERT(tq->tq_freelist != NULL);
  322                 taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next;
  323                 task_free(tq, tq->tq_freelist);
  324                 tq->tq_freelist = tqent_nexttq;
  325         }
  326 
  327         mutex_exit(&tq->tq_lock);
  328 
  329         kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));
  330 
  331         rw_destroy(&tq->tq_threadlock);
  332         mutex_destroy(&tq->tq_lock);
  333         cv_destroy(&tq->tq_dispatch_cv);
  334         cv_destroy(&tq->tq_wait_cv);
  335         cv_destroy(&tq->tq_maxalloc_cv);
  336 
  337         kmem_free(tq, sizeof (taskq_t));
  338 }
  339 
  340 int
  341 taskq_member(taskq_t *tq, kthread_t *t)
  342 {
  343         int i;
  344 
  345         if (taskq_now)
  346                 return (1);
  347 
  348         for (i = 0; i < tq->tq_nthreads; i++)
  349                 if (tq->tq_threadlist[i] == t)
  350                         return (1);
  351 
  352         return (0);
  353 }
  354 
  355 taskq_t *
  356 taskq_of_curthread(void)
  357 {
  358         return (pthread_getspecific(taskq_tsd));
  359 }
  360 
  361 int
  362 taskq_cancel_id(taskq_t *tq, taskqid_t id)
  363 {
  364         (void) tq, (void) id;
  365         return (ENOENT);
  366 }
  367 
  368 void
  369 system_taskq_init(void)
  370 {
  371         VERIFY0(pthread_key_create(&taskq_tsd, NULL));
  372         system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512,
  373             TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
  374         system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4,
  375             512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
  376 }
  377 
  378 void
  379 system_taskq_fini(void)
  380 {
  381         taskq_destroy(system_taskq);
  382         system_taskq = NULL; /* defensive */
  383         taskq_destroy(system_delay_taskq);
  384         system_delay_taskq = NULL;
  385         VERIFY0(pthread_key_delete(taskq_tsd));
  386 }

Cache object: c90112e28364b7f327ee655041b4148f


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.