The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_threadpool.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $   */
    2 
    3 /*-
    4  * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
    5  * All rights reserved.
    6  *
    7  * This code is derived from software contributed to The NetBSD Foundation
    8  * by Taylor R. Campbell and Jason R. Thorpe.
    9  *
   10  * Redistribution and use in source and binary forms, with or without
   11  * modification, are permitted provided that the following conditions
   12  * are met:
   13  * 1. Redistributions of source code must retain the above copyright
   14  *    notice, this list of conditions and the following disclaimer.
   15  * 2. Redistributions in binary form must reproduce the above copyright
   16  *    notice, this list of conditions and the following disclaimer in the
   17  *    documentation and/or other materials provided with the distribution.
   18  *
   19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
   20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
   21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
   22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
   23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   29  * POSSIBILITY OF SUCH DAMAGE.
   30  */
   31 
   32 /*
   33  * Thread pools.
   34  *
   35  * A thread pool is a collection of worker threads idle or running
   36  * jobs, together with a dispatcher thread that does not run jobs but
   37  * can be given jobs to assign to a worker thread.  Scheduling a job in
   38  * a thread pool does not allocate or even sleep at all, except perhaps
   39  * on an adaptive lock, unlike kthread_create.  Jobs reuse threads, so
   40  * they do not incur the expense of creating and destroying kthreads
   41  * unless there is not much work to be done.
   42  *
   43  * A per-CPU thread pool (threadpool_percpu) is a collection of thread
   44  * pools, one per CPU bound to that CPU.  For each priority level in
   45  * use, there is one shared unbound thread pool (i.e., pool of threads
   46  * not bound to any CPU) and one shared per-CPU thread pool.
   47  *
   48  * To use the unbound thread pool at priority pri, call
   49  * threadpool_get(&pool, pri).  When you're done, call
   50  * threadpool_put(pool, pri).
   51  *
   52  * To use the per-CPU thread pools at priority pri, call
   53  * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
   54  * pool returned by threadpool_percpu_ref(pool_percpu) for the current
   55  * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
   56  * CPU.  When you're done, call threadpool_percpu_put(pool_percpu,
   57  * pri).
   58  *
   59  * +--MACHINE-----------------------------------------------------+
   60  * | +--CPU 0---------+ +--CPU 1---------+     +--CPU n---------+ |
   61  * | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | |
   62  * | | <idle 0a>      | | <running 1a>   | ... | <idle na>      | |
   63  * | | <running 0b>   | | <running 1b>   | ... | <idle nb>      | |
   64  * | | .              | | .              | ... | .              | |
   65  * | | .              | | .              | ... | .              | |
   66  * | | .              | | .              | ... | .              | |
   67  * | +----------------+ +----------------+     +----------------+ |
   68  * |            +--unbound-----------+                            |
   69  * |            | <dispatcher n+1>   |                            |
   70  * |            | <idle (n+1)a>      |                            |
   71  * |            | <running (n+1)b>   |                            |
   72  * |            +--------------------+                            |
   73  * +--------------------------------------------------------------+
   74  *
   75  * XXX Why one dispatcher per CPU?  I did that originally to avoid
   76  * touching remote CPUs' memory when scheduling a job, but that still
   77  * requires interprocessor synchronization.  Perhaps we could get by
   78  * with a single dispatcher thread, at the expense of another pointer
   79  * in struct threadpool_job to identify the CPU on which it must run in
   80  * order for the dispatcher to schedule it correctly.
   81  */
   82 
   83 #include <sys/cdefs.h>
   84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $");
   85 
   86 #include <sys/types.h>
   87 #include <sys/param.h>
   88 #include <sys/atomic.h>
   89 #include <sys/condvar.h>
   90 #include <sys/cpu.h>
   91 #include <sys/kernel.h>
   92 #include <sys/kmem.h>
   93 #include <sys/kthread.h>
   94 #include <sys/mutex.h>
   95 #include <sys/once.h>
   96 #include <sys/percpu.h>
   97 #include <sys/pool.h>
   98 #include <sys/proc.h>
   99 #include <sys/queue.h>
  100 #include <sys/sdt.h>
  101 #include <sys/sysctl.h>
  102 #include <sys/systm.h>
  103 #include <sys/threadpool.h>
  104 
  105 /* Probes */
  106 
  107 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get,
  108     "pri_t"/*pri*/);
  109 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create,
  110     "pri_t"/*pri*/);
  111 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race,
  112     "pri_t"/*pri*/);
  113 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put,
  114     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
  115 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy,
  116     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
  117 
  118 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get,
  119     "pri_t"/*pri*/);
  120 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create,
  121     "pri_t"/*pri*/);
  122 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race,
  123     "pri_t"/*pri*/);
  124 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put,
  125     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
  126 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy,
  127     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
  128 
  129 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create,
  130     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/);
  131 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success,
  132     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/);
  133 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure,
  134     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/);
  135 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy,
  136     "struct threadpool *"/*pool*/);
  137 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait,
  138     "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/);
  139 
  140 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job,
  141     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
  142 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running,
  143     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
  144 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher,
  145     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
  146 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread,
  147     "struct threadpool *"/*pool*/,
  148     "struct threadpool_job *"/*job*/,
  149     "struct lwp *"/*thread*/);
  150 
  151 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start,
  152     "struct threadpool *"/*pool*/);
  153 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying,
  154     "struct threadpool *"/*pool*/);
  155 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn,
  156     "struct threadpool *"/*pool*/);
  157 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race,
  158     "struct threadpool *"/*pool*/,
  159     "struct threadpool_job *"/*job*/);
  160 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign,
  161     "struct threadpool *"/*pool*/,
  162     "struct threadpool_job *"/*job*/,
  163     "struct lwp *"/*thread*/);
  164 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit,
  165     "struct threadpool *"/*pool*/);
  166 
  167 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start,
  168     "struct threadpool *"/*pool*/);
  169 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying,
  170     "struct threadpool *"/*pool*/);
  171 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job,
  172     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
  173 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit,
  174     "struct threadpool *"/*pool*/);
  175 
  176 /* Data structures */
  177 
  178 TAILQ_HEAD(job_head, threadpool_job);
  179 TAILQ_HEAD(thread_head, threadpool_thread);
  180 
  181 struct threadpool_thread {
  182         struct lwp                      *tpt_lwp;
  183         char                            *tpt_lwp_savedname;
  184         struct threadpool               *tpt_pool;
  185         struct threadpool_job           *tpt_job;
  186         kcondvar_t                      tpt_cv;
  187         TAILQ_ENTRY(threadpool_thread)  tpt_entry;
  188 };
  189 
  190 struct threadpool {
  191         kmutex_t                        tp_lock;
  192         struct threadpool_thread        tp_dispatcher;
  193         struct job_head                 tp_jobs;
  194         struct thread_head              tp_idle_threads;
  195         uint64_t                        tp_refcnt;
  196         int                             tp_flags;
  197 #define THREADPOOL_DYING        0x01
  198         struct cpu_info                 *tp_cpu;
  199         pri_t                           tp_pri;
  200 };
  201 
  202 static void     threadpool_hold(struct threadpool *);
  203 static void     threadpool_rele(struct threadpool *);
  204 
  205 static int      threadpool_percpu_create(struct threadpool_percpu **, pri_t);
  206 static void     threadpool_percpu_destroy(struct threadpool_percpu *);
  207 static void     threadpool_percpu_init(void *, void *, struct cpu_info *);
  208 static void     threadpool_percpu_ok(void *, void *, struct cpu_info *);
  209 static void     threadpool_percpu_fini(void *, void *, struct cpu_info *);
  210 
  211 static threadpool_job_fn_t threadpool_job_dead;
  212 
  213 static void     threadpool_job_hold(struct threadpool_job *);
  214 static void     threadpool_job_rele(struct threadpool_job *);
  215 
  216 static void     threadpool_dispatcher_thread(void *) __dead;
  217 static void     threadpool_thread(void *) __dead;
  218 
  219 static pool_cache_t     threadpool_thread_pc __read_mostly;
  220 
  221 static kmutex_t         threadpools_lock __cacheline_aligned;
  222 
  223         /* Default to 30 second idle timeout for pool threads. */
  224 static int      threadpool_idle_time_ms = 30 * 1000;
  225 
  226 struct threadpool_unbound {
  227         struct threadpool               tpu_pool;
  228 
  229         /* protected by threadpools_lock */
  230         LIST_ENTRY(threadpool_unbound)  tpu_link;
  231         uint64_t                        tpu_refcnt;
  232 };
  233 
  234 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
  235 
  236 static struct threadpool_unbound *
  237 threadpool_lookup_unbound(pri_t pri)
  238 {
  239         struct threadpool_unbound *tpu;
  240 
  241         LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
  242                 if (tpu->tpu_pool.tp_pri == pri)
  243                         return tpu;
  244         }
  245         return NULL;
  246 }
  247 
  248 static void
  249 threadpool_insert_unbound(struct threadpool_unbound *tpu)
  250 {
  251         KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
  252         LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
  253 }
  254 
  255 static void
  256 threadpool_remove_unbound(struct threadpool_unbound *tpu)
  257 {
  258         KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
  259         LIST_REMOVE(tpu, tpu_link);
  260 }
  261 
  262 struct threadpool_percpu {
  263         percpu_t *                      tpp_percpu;
  264         pri_t                           tpp_pri;
  265 
  266         /* protected by threadpools_lock */
  267         LIST_ENTRY(threadpool_percpu)   tpp_link;
  268         uint64_t                        tpp_refcnt;
  269 };
  270 
  271 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
  272 
  273 static struct threadpool_percpu *
  274 threadpool_lookup_percpu(pri_t pri)
  275 {
  276         struct threadpool_percpu *tpp;
  277 
  278         LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
  279                 if (tpp->tpp_pri == pri)
  280                         return tpp;
  281         }
  282         return NULL;
  283 }
  284 
  285 static void
  286 threadpool_insert_percpu(struct threadpool_percpu *tpp)
  287 {
  288         KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
  289         LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
  290 }
  291 
  292 static void
  293 threadpool_remove_percpu(struct threadpool_percpu *tpp)
  294 {
  295         KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
  296         LIST_REMOVE(tpp, tpp_link);
  297 }
  298 
  299 static int
  300 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)
  301 {
  302         struct sysctlnode node;
  303         int val, error;
  304 
  305         node = *rnode;
  306 
  307         val = threadpool_idle_time_ms;
  308         node.sysctl_data = &val;
  309         error = sysctl_lookup(SYSCTLFN_CALL(&node));
  310         if (error == 0 && newp != NULL) {
  311                 /* Disallow negative values and 0 (forever). */
  312                 if (val < 1)
  313                         error = EINVAL;
  314                 else
  315                         threadpool_idle_time_ms = val;
  316         }
  317 
  318         return error;
  319 }
  320 
  321 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup);
  322 
  323 SYSCTL_SETUP(sysctl_threadpool_setup,
  324     "sysctl kern.threadpool subtree setup")
  325 {
  326         const struct sysctlnode *rnode, *cnode;
  327         int error __diagused;
  328 
  329         error = sysctl_createv(clog, 0, NULL, &rnode,
  330             CTLFLAG_PERMANENT,
  331             CTLTYPE_NODE, "threadpool",
  332             SYSCTL_DESCR("threadpool subsystem options"),
  333             NULL, 0, NULL, 0,
  334             CTL_KERN, CTL_CREATE, CTL_EOL);
  335         KASSERT(error == 0);
  336 
  337         error = sysctl_createv(clog, 0, &rnode, &cnode,
  338             CTLFLAG_PERMANENT | CTLFLAG_READWRITE,
  339             CTLTYPE_INT, "idle_ms",
  340             SYSCTL_DESCR("idle thread timeout in ms"),
  341             sysctl_kern_threadpool_idle_ms, 0, NULL, 0,
  342             CTL_CREATE, CTL_EOL);
  343         KASSERT(error == 0);
  344 }
  345 
  346 void
  347 threadpools_init(void)
  348 {
  349 
  350         threadpool_thread_pc =
  351             pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
  352                 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
  353 
  354         LIST_INIT(&unbound_threadpools);
  355         LIST_INIT(&percpu_threadpools);
  356         mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
  357 }
  358 
  359 static void
  360 threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri)
  361 {
  362 
  363         buf[0] = '\0';
  364         if (ci)
  365                 snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d",
  366                     cpu_index(ci));
  367         if (pri != PRI_NONE)
  368                 snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri);
  369 }
  370 
  371 /* Thread pool creation */
  372 
  373 static bool
  374 threadpool_pri_is_valid(pri_t pri)
  375 {
  376         return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
  377 }
  378 
  379 static int
  380 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
  381     pri_t pri)
  382 {
  383         struct lwp *lwp;
  384         char suffix[16];
  385         int ktflags;
  386         int error;
  387 
  388         KASSERT(threadpool_pri_is_valid(pri));
  389 
  390         SDT_PROBE2(sdt, kernel, threadpool, create,  ci, pri);
  391 
  392         mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
  393         /* XXX dispatcher */
  394         TAILQ_INIT(&pool->tp_jobs);
  395         TAILQ_INIT(&pool->tp_idle_threads);
  396         pool->tp_refcnt = 1;            /* dispatcher's reference */
  397         pool->tp_flags = 0;
  398         pool->tp_cpu = ci;
  399         pool->tp_pri = pri;
  400 
  401         pool->tp_dispatcher.tpt_lwp = NULL;
  402         pool->tp_dispatcher.tpt_pool = pool;
  403         pool->tp_dispatcher.tpt_job = NULL;
  404         cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp");
  405 
  406         ktflags = 0;
  407         ktflags |= KTHREAD_MPSAFE;
  408         if (pri < PRI_KERNEL)
  409                 ktflags |= KTHREAD_TS;
  410         threadnamesuffix(suffix, sizeof(suffix), ci, pri);
  411         error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread,
  412             &pool->tp_dispatcher, &lwp, "pooldisp%s", suffix);
  413         if (error)
  414                 goto fail0;
  415 
  416         mutex_spin_enter(&pool->tp_lock);
  417         pool->tp_dispatcher.tpt_lwp = lwp;
  418         cv_broadcast(&pool->tp_dispatcher.tpt_cv);
  419         mutex_spin_exit(&pool->tp_lock);
  420 
  421         SDT_PROBE3(sdt, kernel, threadpool, create__success,  ci, pri, pool);
  422         return 0;
  423 
  424 fail0:  KASSERT(error);
  425         KASSERT(pool->tp_dispatcher.tpt_job == NULL);
  426         KASSERT(pool->tp_dispatcher.tpt_pool == pool);
  427         KASSERT(pool->tp_flags == 0);
  428         KASSERT(pool->tp_refcnt == 0);
  429         KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
  430         KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
  431         KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
  432         cv_destroy(&pool->tp_dispatcher.tpt_cv);
  433         mutex_destroy(&pool->tp_lock);
  434         SDT_PROBE3(sdt, kernel, threadpool, create__failure,  ci, pri, error);
  435         return error;
  436 }
  437 
  438 /* Thread pool destruction */
  439 
  440 static void
  441 threadpool_destroy(struct threadpool *pool)
  442 {
  443         struct threadpool_thread *thread;
  444 
  445         SDT_PROBE1(sdt, kernel, threadpool, destroy,  pool);
  446 
  447         /* Mark the pool dying and wait for threads to commit suicide.  */
  448         mutex_spin_enter(&pool->tp_lock);
  449         KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
  450         pool->tp_flags |= THREADPOOL_DYING;
  451         cv_broadcast(&pool->tp_dispatcher.tpt_cv);
  452         TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
  453                 cv_broadcast(&thread->tpt_cv);
  454         while (0 < pool->tp_refcnt) {
  455                 SDT_PROBE2(sdt, kernel, threadpool, destroy__wait,
  456                     pool, pool->tp_refcnt);
  457                 cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock);
  458         }
  459         mutex_spin_exit(&pool->tp_lock);
  460 
  461         KASSERT(pool->tp_dispatcher.tpt_job == NULL);
  462         KASSERT(pool->tp_dispatcher.tpt_pool == pool);
  463         KASSERT(pool->tp_flags == THREADPOOL_DYING);
  464         KASSERT(pool->tp_refcnt == 0);
  465         KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
  466         KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
  467         KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
  468         cv_destroy(&pool->tp_dispatcher.tpt_cv);
  469         mutex_destroy(&pool->tp_lock);
  470 }
  471 
  472 static void
  473 threadpool_hold(struct threadpool *pool)
  474 {
  475 
  476         KASSERT(mutex_owned(&pool->tp_lock));
  477         pool->tp_refcnt++;
  478         KASSERT(pool->tp_refcnt != 0);
  479 }
  480 
  481 static void
  482 threadpool_rele(struct threadpool *pool)
  483 {
  484 
  485         KASSERT(mutex_owned(&pool->tp_lock));
  486         KASSERT(0 < pool->tp_refcnt);
  487         if (--pool->tp_refcnt == 0)
  488                 cv_broadcast(&pool->tp_dispatcher.tpt_cv);
  489 }
  490 
  491 /* Unbound thread pools */
  492 
  493 int
  494 threadpool_get(struct threadpool **poolp, pri_t pri)
  495 {
  496         struct threadpool_unbound *tpu, *tmp = NULL;
  497         int error;
  498 
  499         ASSERT_SLEEPABLE();
  500 
  501         SDT_PROBE1(sdt, kernel, threadpool, get,  pri);
  502 
  503         if (! threadpool_pri_is_valid(pri))
  504                 return EINVAL;
  505 
  506         mutex_enter(&threadpools_lock);
  507         tpu = threadpool_lookup_unbound(pri);
  508         if (tpu == NULL) {
  509                 mutex_exit(&threadpools_lock);
  510                 SDT_PROBE1(sdt, kernel, threadpool, get__create,  pri);
  511                 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
  512                 error = threadpool_create(&tmp->tpu_pool, NULL, pri);
  513                 if (error) {
  514                         kmem_free(tmp, sizeof(*tmp));
  515                         return error;
  516                 }
  517                 mutex_enter(&threadpools_lock);
  518                 tpu = threadpool_lookup_unbound(pri);
  519                 if (tpu == NULL) {
  520                         tpu = tmp;
  521                         tmp = NULL;
  522                         threadpool_insert_unbound(tpu);
  523                 } else {
  524                         SDT_PROBE1(sdt, kernel, threadpool, get__race,  pri);
  525                 }
  526         }
  527         KASSERT(tpu != NULL);
  528         tpu->tpu_refcnt++;
  529         KASSERT(tpu->tpu_refcnt != 0);
  530         mutex_exit(&threadpools_lock);
  531 
  532         if (tmp != NULL) {
  533                 threadpool_destroy(&tmp->tpu_pool);
  534                 kmem_free(tmp, sizeof(*tmp));
  535         }
  536         KASSERT(tpu != NULL);
  537         *poolp = &tpu->tpu_pool;
  538         return 0;
  539 }
  540 
  541 void
  542 threadpool_put(struct threadpool *pool, pri_t pri)
  543 {
  544         struct threadpool_unbound *tpu =
  545             container_of(pool, struct threadpool_unbound, tpu_pool);
  546 
  547         ASSERT_SLEEPABLE();
  548         KASSERT(threadpool_pri_is_valid(pri));
  549 
  550         SDT_PROBE2(sdt, kernel, threadpool, put,  pool, pri);
  551 
  552         mutex_enter(&threadpools_lock);
  553         KASSERT(tpu == threadpool_lookup_unbound(pri));
  554         KASSERT(0 < tpu->tpu_refcnt);
  555         if (--tpu->tpu_refcnt == 0) {
  556                 SDT_PROBE2(sdt, kernel, threadpool, put__destroy,  pool, pri);
  557                 threadpool_remove_unbound(tpu);
  558         } else {
  559                 tpu = NULL;
  560         }
  561         mutex_exit(&threadpools_lock);
  562 
  563         if (tpu) {
  564                 threadpool_destroy(&tpu->tpu_pool);
  565                 kmem_free(tpu, sizeof(*tpu));
  566         }
  567 }
  568 
  569 /* Per-CPU thread pools */
  570 
  571 int
  572 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
  573 {
  574         struct threadpool_percpu *pool_percpu, *tmp = NULL;
  575         int error;
  576 
  577         ASSERT_SLEEPABLE();
  578 
  579         SDT_PROBE1(sdt, kernel, threadpool, percpu__get,  pri);
  580 
  581         if (! threadpool_pri_is_valid(pri))
  582                 return EINVAL;
  583 
  584         mutex_enter(&threadpools_lock);
  585         pool_percpu = threadpool_lookup_percpu(pri);
  586         if (pool_percpu == NULL) {
  587                 mutex_exit(&threadpools_lock);
  588                 SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create,  pri);
  589                 error = threadpool_percpu_create(&tmp, pri);
  590                 if (error)
  591                         return error;
  592                 KASSERT(tmp != NULL);
  593                 mutex_enter(&threadpools_lock);
  594                 pool_percpu = threadpool_lookup_percpu(pri);
  595                 if (pool_percpu == NULL) {
  596                         pool_percpu = tmp;
  597                         tmp = NULL;
  598                         threadpool_insert_percpu(pool_percpu);
  599                 } else {
  600                         SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race,
  601                             pri);
  602                 }
  603         }
  604         KASSERT(pool_percpu != NULL);
  605         pool_percpu->tpp_refcnt++;
  606         KASSERT(pool_percpu->tpp_refcnt != 0);
  607         mutex_exit(&threadpools_lock);
  608 
  609         if (tmp != NULL)
  610                 threadpool_percpu_destroy(tmp);
  611         KASSERT(pool_percpu != NULL);
  612         *pool_percpup = pool_percpu;
  613         return 0;
  614 }
  615 
  616 void
  617 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
  618 {
  619 
  620         ASSERT_SLEEPABLE();
  621 
  622         KASSERT(threadpool_pri_is_valid(pri));
  623 
  624         SDT_PROBE2(sdt, kernel, threadpool, percpu__put,  pool_percpu, pri);
  625 
  626         mutex_enter(&threadpools_lock);
  627         KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
  628         KASSERT(0 < pool_percpu->tpp_refcnt);
  629         if (--pool_percpu->tpp_refcnt == 0) {
  630                 SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy,
  631                     pool_percpu, pri);
  632                 threadpool_remove_percpu(pool_percpu);
  633         } else {
  634                 pool_percpu = NULL;
  635         }
  636         mutex_exit(&threadpools_lock);
  637 
  638         if (pool_percpu)
  639                 threadpool_percpu_destroy(pool_percpu);
  640 }
  641 
  642 struct threadpool *
  643 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
  644 {
  645         struct threadpool **poolp, *pool;
  646 
  647         poolp = percpu_getref(pool_percpu->tpp_percpu);
  648         pool = *poolp;
  649         percpu_putref(pool_percpu->tpp_percpu);
  650 
  651         return pool;
  652 }
  653 
  654 struct threadpool *
  655 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
  656     struct cpu_info *ci)
  657 {
  658         struct threadpool **poolp, *pool;
  659 
  660         /*
  661          * As long as xcalls are blocked -- e.g., by kpreempt_disable
  662          * -- the percpu object will not be swapped and destroyed.  We
  663          * can't write to it, because the data may have already been
  664          * moved to a new buffer, but we can safely read from it.
  665          */
  666         kpreempt_disable();
  667         poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
  668         pool = *poolp;
  669         kpreempt_enable();
  670 
  671         return pool;
  672 }
  673 
  674 static int
  675 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
  676 {
  677         struct threadpool_percpu *pool_percpu;
  678         bool ok = true;
  679 
  680         pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
  681         pool_percpu->tpp_pri = pri;
  682         pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *),
  683             threadpool_percpu_init, threadpool_percpu_fini,
  684             (void *)(intptr_t)pri);
  685 
  686         /*
  687          * Verify that all of the CPUs were initialized.
  688          *
  689          * XXX What to do if we add CPU hotplug?
  690          */
  691         percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok);
  692         if (!ok)
  693                 goto fail;
  694 
  695         /* Success!  */
  696         *pool_percpup = (struct threadpool_percpu *)pool_percpu;
  697         return 0;
  698 
  699 fail:   percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
  700         kmem_free(pool_percpu, sizeof(*pool_percpu));
  701         return ENOMEM;
  702 }
  703 
  704 static void
  705 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
  706 {
  707 
  708         percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
  709         kmem_free(pool_percpu, sizeof(*pool_percpu));
  710 }
  711 
  712 static void
  713 threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci)
  714 {
  715         struct threadpool **const poolp = vpoolp;
  716         pri_t pri = (intptr_t)(void *)vpri;
  717         int error;
  718 
  719         *poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP);
  720         error = threadpool_create(*poolp, ci, pri);
  721         if (error) {
  722                 KASSERT(error == ENOMEM);
  723                 kmem_free(*poolp, sizeof(**poolp));
  724                 *poolp = NULL;
  725         }
  726 }
  727 
  728 static void
  729 threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci)
  730 {
  731         struct threadpool **const poolp = vpoolp;
  732         bool *okp = vokp;
  733 
  734         if (*poolp == NULL)
  735                 atomic_store_relaxed(okp, false);
  736 }
  737 
  738 static void
  739 threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci)
  740 {
  741         struct threadpool **const poolp = vpoolp;
  742 
  743         if (*poolp == NULL)     /* initialization failed */
  744                 return;
  745         threadpool_destroy(*poolp);
  746         kmem_free(*poolp, sizeof(**poolp));
  747 }
  748 
  749 /* Thread pool jobs */
  750 
  751 void __printflike(4,5)
  752 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
  753     kmutex_t *lock, const char *fmt, ...)
  754 {
  755         va_list ap;
  756 
  757         va_start(ap, fmt);
  758         (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
  759         va_end(ap);
  760 
  761         job->job_lock = lock;
  762         job->job_thread = NULL;
  763         job->job_refcnt = 0;
  764         cv_init(&job->job_cv, job->job_name);
  765         job->job_fn = fn;
  766 }
  767 
  768 static void
  769 threadpool_job_dead(struct threadpool_job *job)
  770 {
  771 
  772         panic("threadpool job %p ran after destruction", job);
  773 }
  774 
  775 void
  776 threadpool_job_destroy(struct threadpool_job *job)
  777 {
  778 
  779         ASSERT_SLEEPABLE();
  780 
  781         KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
  782 
  783         mutex_enter(job->job_lock);
  784         while (0 < atomic_load_relaxed(&job->job_refcnt))
  785                 cv_wait(&job->job_cv, job->job_lock);
  786         mutex_exit(job->job_lock);
  787 
  788         job->job_lock = NULL;
  789         KASSERT(job->job_thread == NULL);
  790         KASSERT(job->job_refcnt == 0);
  791         KASSERT(!cv_has_waiters(&job->job_cv));
  792         cv_destroy(&job->job_cv);
  793         job->job_fn = threadpool_job_dead;
  794         (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
  795 }
  796 
  797 static void
  798 threadpool_job_hold(struct threadpool_job *job)
  799 {
  800         unsigned int refcnt __diagused;
  801 
  802         refcnt = atomic_inc_uint_nv(&job->job_refcnt);
  803         KASSERT(refcnt != 0);
  804 }
  805 
  806 static void
  807 threadpool_job_rele(struct threadpool_job *job)
  808 {
  809         unsigned int refcnt;
  810 
  811         KASSERT(mutex_owned(job->job_lock));
  812 
  813         refcnt = atomic_dec_uint_nv(&job->job_refcnt);
  814         KASSERT(refcnt != UINT_MAX);
  815         if (refcnt == 0)
  816                 cv_broadcast(&job->job_cv);
  817 }
  818 
  819 void
  820 threadpool_job_done(struct threadpool_job *job)
  821 {
  822 
  823         KASSERT(mutex_owned(job->job_lock));
  824         KASSERT(job->job_thread != NULL);
  825         KASSERT(job->job_thread->tpt_lwp == curlwp);
  826 
  827         /*
  828          * We can safely read this field; it's only modified right before
  829          * we call the job work function, and we are only preserving it
  830          * to use here; no one cares if it contains junk afterward.
  831          */
  832         lwp_lock(curlwp);
  833         curlwp->l_name = job->job_thread->tpt_lwp_savedname;
  834         lwp_unlock(curlwp);
  835 
  836         /*
  837          * Inline the work of threadpool_job_rele(); the job is already
  838          * locked, the most likely scenario (XXXJRT only scenario?) is
  839          * that we're dropping the last reference (the one taken in
  840          * threadpool_schedule_job()), and we always do the cv_broadcast()
  841          * anyway.
  842          */
  843         KASSERT(0 < atomic_load_relaxed(&job->job_refcnt));
  844         unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
  845         KASSERT(refcnt != UINT_MAX);
  846         cv_broadcast(&job->job_cv);
  847         job->job_thread = NULL;
  848 }
  849 
  850 void
  851 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
  852 {
  853 
  854         KASSERT(mutex_owned(job->job_lock));
  855 
  856         SDT_PROBE2(sdt, kernel, threadpool, schedule__job,  pool, job);
  857 
  858         /*
  859          * If the job's already running, let it keep running.  The job
  860          * is guaranteed by the interlock not to end early -- if it had
  861          * ended early, threadpool_job_done would have set job_thread
  862          * to NULL under the interlock.
  863          */
  864         if (__predict_true(job->job_thread != NULL)) {
  865                 SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running,
  866                     pool, job);
  867                 return;
  868         }
  869 
  870         threadpool_job_hold(job);
  871 
  872         /* Otherwise, try to assign a thread to the job.  */
  873         mutex_spin_enter(&pool->tp_lock);
  874         if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
  875                 /* Nobody's idle.  Give it to the dispatcher.  */
  876                 SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher,
  877                     pool, job);
  878                 job->job_thread = &pool->tp_dispatcher;
  879                 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
  880         } else {
  881                 /* Assign it to the first idle thread.  */
  882                 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
  883                 SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread,
  884                     pool, job, job->job_thread->tpt_lwp);
  885                 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
  886                     tpt_entry);
  887                 job->job_thread->tpt_job = job;
  888         }
  889 
  890         /* Notify whomever we gave it to, dispatcher or idle thread.  */
  891         KASSERT(job->job_thread != NULL);
  892         cv_broadcast(&job->job_thread->tpt_cv);
  893         mutex_spin_exit(&pool->tp_lock);
  894 }
  895 
  896 bool
  897 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
  898 {
  899 
  900         KASSERT(mutex_owned(job->job_lock));
  901 
  902         /*
  903          * XXXJRT This fails (albeit safely) when all of the following
  904          * are true:
  905          *
  906          *      => "pool" is something other than what the job was
  907          *         scheduled on.  This can legitimately occur if,
  908          *         for example, a job is percpu-scheduled on CPU0
  909          *         and then CPU1 attempts to cancel it without taking
  910          *         a remote pool reference.  (this might happen by
  911          *         "luck of the draw").
  912          *
  913          *      => "job" is not yet running, but is assigned to the
  914          *         dispatcher.
  915          *
  916          * When this happens, this code makes the determination that
  917          * the job is already running.  The failure mode is that the
  918          * caller is told the job is running, and thus has to wait.
  919          * The dispatcher will eventually get to it and the job will
  920          * proceed as if it had been already running.
  921          */
  922 
  923         if (job->job_thread == NULL) {
  924                 /* Nothing to do.  Guaranteed not running.  */
  925                 return true;
  926         } else if (job->job_thread == &pool->tp_dispatcher) {
  927                 /* Take it off the list to guarantee it won't run.  */
  928                 job->job_thread = NULL;
  929                 mutex_spin_enter(&pool->tp_lock);
  930                 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
  931                 mutex_spin_exit(&pool->tp_lock);
  932                 threadpool_job_rele(job);
  933                 return true;
  934         } else {
  935                 /* Too late -- already running.  */
  936                 return false;
  937         }
  938 }
  939 
  940 void
  941 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
  942 {
  943 
  944         /*
  945          * We may sleep here, but we can't ASSERT_SLEEPABLE() because
  946          * the job lock (used to interlock the cv_wait()) may in fact
  947          * legitimately be a spin lock, so the assertion would fire
  948          * as a false-positive.
  949          */
  950 
  951         KASSERT(mutex_owned(job->job_lock));
  952 
  953         if (threadpool_cancel_job_async(pool, job))
  954                 return;
  955 
  956         /* Already running.  Wait for it to complete.  */
  957         while (job->job_thread != NULL)
  958                 cv_wait(&job->job_cv, job->job_lock);
  959 }
  960 
  961 /* Thread pool dispatcher thread */
  962 
  963 static void __dead
  964 threadpool_dispatcher_thread(void *arg)
  965 {
  966         struct threadpool_thread *const dispatcher = arg;
  967         struct threadpool *const pool = dispatcher->tpt_pool;
  968         struct lwp *lwp = NULL;
  969         int ktflags;
  970         char suffix[16];
  971         int error;
  972 
  973         KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
  974         KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
  975 
  976         /* Wait until we're initialized.  */
  977         mutex_spin_enter(&pool->tp_lock);
  978         while (dispatcher->tpt_lwp == NULL)
  979                 cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
  980 
  981         SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start,  pool);
  982 
  983         for (;;) {
  984                 /* Wait until there's a job.  */
  985                 while (TAILQ_EMPTY(&pool->tp_jobs)) {
  986                         if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
  987                                 SDT_PROBE1(sdt, kernel, threadpool,
  988                                     dispatcher__dying,  pool);
  989                                 break;
  990                         }
  991                         cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
  992                 }
  993                 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
  994                         break;
  995 
  996                 /* If there are no threads, we'll have to try to start one.  */
  997                 if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
  998                         SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn,
  999                             pool);
 1000                         threadpool_hold(pool);
 1001                         mutex_spin_exit(&pool->tp_lock);
 1002 
 1003                         struct threadpool_thread *const thread =
 1004                             pool_cache_get(threadpool_thread_pc, PR_WAITOK);
 1005                         thread->tpt_lwp = NULL;
 1006                         thread->tpt_pool = pool;
 1007                         thread->tpt_job = NULL;
 1008                         cv_init(&thread->tpt_cv, "pooljob");
 1009 
 1010                         ktflags = 0;
 1011                         ktflags |= KTHREAD_MPSAFE;
 1012                         if (pool->tp_pri < PRI_KERNEL)
 1013                                 ktflags |= KTHREAD_TS;
 1014                         threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu,
 1015                             pool->tp_pri);
 1016                         error = kthread_create(pool->tp_pri, ktflags,
 1017                             pool->tp_cpu, &threadpool_thread, thread, &lwp,
 1018                             "poolthread%s", suffix);
 1019 
 1020                         mutex_spin_enter(&pool->tp_lock);
 1021                         if (error) {
 1022                                 pool_cache_put(threadpool_thread_pc, thread);
 1023                                 threadpool_rele(pool);
 1024                                 /* XXX What to do to wait for memory?  */
 1025                                 (void)kpause("thrdplcr", false, hz,
 1026                                     &pool->tp_lock);
 1027                                 continue;
 1028                         }
 1029                         /*
 1030                          * New kthread now owns the reference to the pool
 1031                          * taken above.
 1032                          */
 1033                         KASSERT(lwp != NULL);
 1034                         TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
 1035                             tpt_entry);
 1036                         thread->tpt_lwp = lwp;
 1037                         lwp = NULL;
 1038                         cv_broadcast(&thread->tpt_cv);
 1039                         continue;
 1040                 }
 1041 
 1042                 /* There are idle threads, so try giving one a job.  */
 1043                 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
 1044 
 1045                 /*
 1046                  * Take an extra reference on the job temporarily so that
 1047                  * it won't disappear on us while we have both locks dropped.
 1048                  */
 1049                 threadpool_job_hold(job);
 1050                 mutex_spin_exit(&pool->tp_lock);
 1051 
 1052                 mutex_enter(job->job_lock);
 1053                 /* If the job was cancelled, we'll no longer be its thread.  */
 1054                 if (__predict_true(job->job_thread == dispatcher)) {
 1055                         mutex_spin_enter(&pool->tp_lock);
 1056                         TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
 1057                         if (__predict_false(
 1058                                     TAILQ_EMPTY(&pool->tp_idle_threads))) {
 1059                                 /*
 1060                                  * Someone else snagged the thread
 1061                                  * first.  We'll have to try again.
 1062                                  */
 1063                                 SDT_PROBE2(sdt, kernel, threadpool,
 1064                                     dispatcher__race,  pool, job);
 1065                                 TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
 1066                                     job_entry);
 1067                         } else {
 1068                                 /*
 1069                                  * Assign the job to the thread and
 1070                                  * wake the thread so it starts work.
 1071                                  */
 1072                                 struct threadpool_thread *const thread =
 1073                                     TAILQ_FIRST(&pool->tp_idle_threads);
 1074 
 1075                                 SDT_PROBE2(sdt, kernel, threadpool,
 1076                                     dispatcher__assign,  job, thread->tpt_lwp);
 1077                                 KASSERT(thread->tpt_job == NULL);
 1078                                 TAILQ_REMOVE(&pool->tp_idle_threads, thread,
 1079                                     tpt_entry);
 1080                                 thread->tpt_job = job;
 1081                                 job->job_thread = thread;
 1082                                 cv_broadcast(&thread->tpt_cv);
 1083                         }
 1084                         mutex_spin_exit(&pool->tp_lock);
 1085                 }
 1086                 threadpool_job_rele(job);
 1087                 mutex_exit(job->job_lock);
 1088 
 1089                 mutex_spin_enter(&pool->tp_lock);
 1090         }
 1091         threadpool_rele(pool);
 1092         mutex_spin_exit(&pool->tp_lock);
 1093 
 1094         SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit,  pool);
 1095 
 1096         kthread_exit(0);
 1097 }
 1098 
 1099 /* Thread pool thread */
 1100 
 1101 static void __dead
 1102 threadpool_thread(void *arg)
 1103 {
 1104         struct threadpool_thread *const thread = arg;
 1105         struct threadpool *const pool = thread->tpt_pool;
 1106 
 1107         KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
 1108         KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
 1109 
 1110         /* Wait until we're initialized and on the queue.  */
 1111         mutex_spin_enter(&pool->tp_lock);
 1112         while (thread->tpt_lwp == NULL)
 1113                 cv_wait(&thread->tpt_cv, &pool->tp_lock);
 1114 
 1115         SDT_PROBE1(sdt, kernel, threadpool, thread__start,  pool);
 1116 
 1117         KASSERT(thread->tpt_lwp == curlwp);
 1118         for (;;) {
 1119                 /* Wait until we are assigned a job.  */
 1120                 while (thread->tpt_job == NULL) {
 1121                         if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
 1122                                 SDT_PROBE1(sdt, kernel, threadpool,
 1123                                     thread__dying,  pool);
 1124                                 break;
 1125                         }
 1126                         if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
 1127                                 mstohz(threadpool_idle_time_ms)))
 1128                                 break;
 1129                 }
 1130                 if (__predict_false(thread->tpt_job == NULL)) {
 1131                         TAILQ_REMOVE(&pool->tp_idle_threads, thread,
 1132                             tpt_entry);
 1133                         break;
 1134                 }
 1135 
 1136                 struct threadpool_job *const job = thread->tpt_job;
 1137                 KASSERT(job != NULL);
 1138 
 1139                 /* Set our lwp name to reflect what job we're doing.  */
 1140                 lwp_lock(curlwp);
 1141                 char *const lwp_name __diagused = curlwp->l_name;
 1142                 thread->tpt_lwp_savedname = curlwp->l_name;
 1143                 curlwp->l_name = job->job_name;
 1144                 lwp_unlock(curlwp);
 1145 
 1146                 mutex_spin_exit(&pool->tp_lock);
 1147 
 1148                 SDT_PROBE2(sdt, kernel, threadpool, thread__job,  pool, job);
 1149 
 1150                 /* Run the job.  */
 1151                 (*job->job_fn)(job);
 1152 
 1153                 /* lwp name restored in threadpool_job_done(). */
 1154                 KASSERTMSG((curlwp->l_name == lwp_name),
 1155                     "someone forgot to call threadpool_job_done()!");
 1156 
 1157                 /*
 1158                  * We can compare pointers, but we can no longer deference
 1159                  * job after this because threadpool_job_done() drops the
 1160                  * last reference on the job while the job is locked.
 1161                  */
 1162 
 1163                 mutex_spin_enter(&pool->tp_lock);
 1164                 KASSERT(thread->tpt_job == job);
 1165                 thread->tpt_job = NULL;
 1166                 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
 1167         }
 1168         threadpool_rele(pool);
 1169         mutex_spin_exit(&pool->tp_lock);
 1170 
 1171         SDT_PROBE1(sdt, kernel, threadpool, thread__exit,  pool);
 1172 
 1173         KASSERT(!cv_has_waiters(&thread->tpt_cv));
 1174         cv_destroy(&thread->tpt_cv);
 1175         pool_cache_put(threadpool_thread_pc, thread);
 1176         kthread_exit(0);
 1177 }

Cache object: f4c85c9e182c62ca07e158e24d51a5b5


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.