The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/contrib/openzfs/lib/libtpool/thread_pool.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * CDDL HEADER START
    3  *
    4  * The contents of this file are subject to the terms of the
    5  * Common Development and Distribution License (the "License").
    6  * You may not use this file except in compliance with the License.
    7  *
    8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
    9  * or https://opensource.org/licenses/CDDL-1.0.
   10  * See the License for the specific language governing permissions
   11  * and limitations under the License.
   12  *
   13  * When distributing Covered Code, include this CDDL HEADER in each
   14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
   15  * If applicable, add the following below this CDDL HEADER, with the
   16  * fields enclosed by brackets "[]" replaced with your own identifying
   17  * information: Portions Copyright [yyyy] [name of copyright owner]
   18  *
   19  * CDDL HEADER END
   20  */
   21 
   22 /*
   23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
   24  * Use is subject to license terms.
   25  */
   26 
   27 #include <stdlib.h>
   28 #include <signal.h>
   29 #include <errno.h>
   30 #include <assert.h>
   31 #include <limits.h>
   32 #include "thread_pool_impl.h"
   33 
   34 static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
   35 static tpool_t *thread_pools = NULL;
   36 
   37 static void
   38 delete_pool(tpool_t *tpool)
   39 {
   40         tpool_job_t *job;
   41 
   42         ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
   43 
   44         /*
   45          * Unlink the pool from the global list of all pools.
   46          */
   47         (void) pthread_mutex_lock(&thread_pool_lock);
   48         if (thread_pools == tpool)
   49                 thread_pools = tpool->tp_forw;
   50         if (thread_pools == tpool)
   51                 thread_pools = NULL;
   52         else {
   53                 tpool->tp_back->tp_forw = tpool->tp_forw;
   54                 tpool->tp_forw->tp_back = tpool->tp_back;
   55         }
   56         pthread_mutex_unlock(&thread_pool_lock);
   57 
   58         /*
   59          * There should be no pending jobs, but just in case...
   60          */
   61         for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
   62                 tpool->tp_head = job->tpj_next;
   63                 free(job);
   64         }
   65         (void) pthread_attr_destroy(&tpool->tp_attr);
   66         free(tpool);
   67 }
   68 
   69 /*
   70  * Worker thread is terminating.
   71  */
   72 static void
   73 worker_cleanup(void *arg)
   74 {
   75         tpool_t *tpool = (tpool_t *)arg;
   76 
   77         if (--tpool->tp_current == 0 &&
   78             (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
   79                 if (tpool->tp_flags & TP_ABANDON) {
   80                         pthread_mutex_unlock(&tpool->tp_mutex);
   81                         delete_pool(tpool);
   82                         return;
   83                 }
   84                 if (tpool->tp_flags & TP_DESTROY)
   85                         (void) pthread_cond_broadcast(&tpool->tp_busycv);
   86         }
   87         pthread_mutex_unlock(&tpool->tp_mutex);
   88 }
   89 
   90 static void
   91 notify_waiters(tpool_t *tpool)
   92 {
   93         if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
   94                 tpool->tp_flags &= ~TP_WAIT;
   95                 (void) pthread_cond_broadcast(&tpool->tp_waitcv);
   96         }
   97 }
   98 
   99 /*
  100  * Called by a worker thread on return from a tpool_dispatch()d job.
  101  */
  102 static void
  103 job_cleanup(void *arg)
  104 {
  105         tpool_t *tpool = (tpool_t *)arg;
  106 
  107         pthread_t my_tid = pthread_self();
  108         tpool_active_t *activep;
  109         tpool_active_t **activepp;
  110 
  111         pthread_mutex_lock(&tpool->tp_mutex);
  112         for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
  113                 activep = *activepp;
  114                 if (activep->tpa_tid == my_tid) {
  115                         *activepp = activep->tpa_next;
  116                         break;
  117                 }
  118         }
  119         if (tpool->tp_flags & TP_WAIT)
  120                 notify_waiters(tpool);
  121 }
  122 
  123 static void *
  124 tpool_worker(void *arg)
  125 {
  126         tpool_t *tpool = (tpool_t *)arg;
  127         int elapsed;
  128         tpool_job_t *job;
  129         void (*func)(void *);
  130         tpool_active_t active;
  131 
  132         pthread_mutex_lock(&tpool->tp_mutex);
  133         pthread_cleanup_push(worker_cleanup, tpool);
  134 
  135         /*
  136          * This is the worker's main loop.
  137          * It will only be left if a timeout or an error has occurred.
  138          */
  139         active.tpa_tid = pthread_self();
  140         for (;;) {
  141                 elapsed = 0;
  142                 tpool->tp_idle++;
  143                 if (tpool->tp_flags & TP_WAIT)
  144                         notify_waiters(tpool);
  145                 while ((tpool->tp_head == NULL ||
  146                     (tpool->tp_flags & TP_SUSPEND)) &&
  147                     !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
  148                         if (tpool->tp_current <= tpool->tp_minimum ||
  149                             tpool->tp_linger == 0) {
  150                                 (void) pthread_cond_wait(&tpool->tp_workcv,
  151                                     &tpool->tp_mutex);
  152                         } else {
  153                                 struct timespec ts;
  154 
  155                                 clock_gettime(CLOCK_REALTIME, &ts);
  156                                 ts.tv_sec += tpool->tp_linger;
  157 
  158                                 if (pthread_cond_timedwait(&tpool->tp_workcv,
  159                                     &tpool->tp_mutex, &ts) != 0) {
  160                                         elapsed = 1;
  161                                         break;
  162                                 }
  163                         }
  164                 }
  165                 tpool->tp_idle--;
  166                 if (tpool->tp_flags & TP_DESTROY)
  167                         break;
  168                 if (tpool->tp_flags & TP_ABANDON) {
  169                         /* can't abandon a suspended pool */
  170                         if (tpool->tp_flags & TP_SUSPEND) {
  171                                 tpool->tp_flags &= ~TP_SUSPEND;
  172                                 (void) pthread_cond_broadcast(
  173                                     &tpool->tp_workcv);
  174                         }
  175                         if (tpool->tp_head == NULL)
  176                                 break;
  177                 }
  178                 if ((job = tpool->tp_head) != NULL &&
  179                     !(tpool->tp_flags & TP_SUSPEND)) {
  180                         elapsed = 0;
  181                         func = job->tpj_func;
  182                         arg = job->tpj_arg;
  183                         tpool->tp_head = job->tpj_next;
  184                         if (job == tpool->tp_tail)
  185                                 tpool->tp_tail = NULL;
  186                         tpool->tp_njobs--;
  187                         active.tpa_next = tpool->tp_active;
  188                         tpool->tp_active = &active;
  189                         pthread_mutex_unlock(&tpool->tp_mutex);
  190                         pthread_cleanup_push(job_cleanup, tpool);
  191                         free(job);
  192 
  193                         sigset_t maskset;
  194                         (void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
  195 
  196                         /*
  197                          * Call the specified function.
  198                          */
  199                         func(arg);
  200                         /*
  201                          * We don't know what this thread has been doing,
  202                          * so we reset its signal mask and cancellation
  203                          * state back to the values prior to calling func().
  204                          */
  205                         (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
  206                         (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
  207                             NULL);
  208                         (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
  209                             NULL);
  210                         pthread_cleanup_pop(1);
  211                 }
  212                 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
  213                         /*
  214                          * We timed out and there is no work to be done
  215                          * and the number of workers exceeds the minimum.
  216                          * Exit now to reduce the size of the pool.
  217                          */
  218                         break;
  219                 }
  220         }
  221         pthread_cleanup_pop(1);
  222         return (arg);
  223 }
  224 
  225 /*
  226  * Create a worker thread, with default signals blocked.
  227  */
  228 static int
  229 create_worker(tpool_t *tpool)
  230 {
  231         pthread_t thread;
  232         sigset_t oset;
  233         int error;
  234 
  235         (void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
  236         error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
  237         (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
  238         return (error);
  239 }
  240 
  241 
  242 /*
  243  * pthread_attr_clone: make a copy of a pthread_attr_t.  When old_attr
  244  * is NULL initialize the cloned attr using default values.
  245  */
  246 static int
  247 pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
  248 {
  249         int error;
  250 
  251         error = pthread_attr_init(attr);
  252         if (error || (old_attr == NULL))
  253                 return (error);
  254 
  255 #ifdef __GLIBC__
  256         cpu_set_t cpuset;
  257         size_t cpusetsize = sizeof (cpuset);
  258         error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
  259         if (error == 0)
  260                 error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
  261         if (error)
  262                 goto error;
  263 #endif /* __GLIBC__ */
  264 
  265         int detachstate;
  266         error = pthread_attr_getdetachstate(old_attr, &detachstate);
  267         if (error == 0)
  268                 error = pthread_attr_setdetachstate(attr, detachstate);
  269         if (error)
  270                 goto error;
  271 
  272         size_t guardsize;
  273         error = pthread_attr_getguardsize(old_attr, &guardsize);
  274         if (error == 0)
  275                 error = pthread_attr_setguardsize(attr, guardsize);
  276         if (error)
  277                 goto error;
  278 
  279         int inheritsched;
  280         error = pthread_attr_getinheritsched(old_attr, &inheritsched);
  281         if (error == 0)
  282                 error = pthread_attr_setinheritsched(attr, inheritsched);
  283         if (error)
  284                 goto error;
  285 
  286         struct sched_param param;
  287         error = pthread_attr_getschedparam(old_attr, &param);
  288         if (error == 0)
  289                 error = pthread_attr_setschedparam(attr, &param);
  290         if (error)
  291                 goto error;
  292 
  293         int policy;
  294         error = pthread_attr_getschedpolicy(old_attr, &policy);
  295         if (error == 0)
  296                 error = pthread_attr_setschedpolicy(attr, policy);
  297         if (error)
  298                 goto error;
  299 
  300         int scope;
  301         error = pthread_attr_getscope(old_attr, &scope);
  302         if (error == 0)
  303                 error = pthread_attr_setscope(attr, scope);
  304         if (error)
  305                 goto error;
  306 
  307         void *stackaddr;
  308         size_t stacksize;
  309         error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
  310         if (error == 0)
  311                 error = pthread_attr_setstack(attr, stackaddr, stacksize);
  312         if (error)
  313                 goto error;
  314 
  315         return (0);
  316 error:
  317         pthread_attr_destroy(attr);
  318         return (error);
  319 }
  320 
  321 tpool_t *
  322 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
  323     pthread_attr_t *attr)
  324 {
  325         tpool_t *tpool;
  326         void *stackaddr;
  327         size_t stacksize;
  328         size_t minstack;
  329         int error;
  330 
  331         if (min_threads > max_threads || max_threads < 1) {
  332                 errno = EINVAL;
  333                 return (NULL);
  334         }
  335         if (attr != NULL) {
  336                 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
  337                         errno = EINVAL;
  338                         return (NULL);
  339                 }
  340                 /*
  341                  * Allow only one thread in the pool with a specified stack.
  342                  * Require threads to have at least the minimum stack size.
  343                  */
  344                 minstack = PTHREAD_STACK_MIN;
  345                 if (stackaddr != NULL) {
  346                         if (stacksize < minstack || max_threads != 1) {
  347                                 errno = EINVAL;
  348                                 return (NULL);
  349                         }
  350                 } else if (stacksize != 0 && stacksize < minstack) {
  351                         errno = EINVAL;
  352                         return (NULL);
  353                 }
  354         }
  355 
  356         tpool = calloc(1, sizeof (*tpool));
  357         if (tpool == NULL) {
  358                 errno = ENOMEM;
  359                 return (NULL);
  360         }
  361         (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
  362         (void) pthread_cond_init(&tpool->tp_busycv, NULL);
  363         (void) pthread_cond_init(&tpool->tp_workcv, NULL);
  364         (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
  365         tpool->tp_minimum = min_threads;
  366         tpool->tp_maximum = max_threads;
  367         tpool->tp_linger = linger;
  368 
  369         /*
  370          * We cannot just copy the attribute pointer.
  371          * We need to initialize a new pthread_attr_t structure
  372          * with the values from the user-supplied pthread_attr_t.
  373          * If the attribute pointer is NULL, we need to initialize
  374          * the new pthread_attr_t structure with default values.
  375          */
  376         error = pthread_attr_clone(&tpool->tp_attr, attr);
  377         if (error) {
  378                 free(tpool);
  379                 errno = error;
  380                 return (NULL);
  381         }
  382 
  383         /* make all pool threads be detached daemon threads */
  384         (void) pthread_attr_setdetachstate(&tpool->tp_attr,
  385             PTHREAD_CREATE_DETACHED);
  386 
  387         /* insert into the global list of all thread pools */
  388         pthread_mutex_lock(&thread_pool_lock);
  389         if (thread_pools == NULL) {
  390                 tpool->tp_forw = tpool;
  391                 tpool->tp_back = tpool;
  392                 thread_pools = tpool;
  393         } else {
  394                 thread_pools->tp_back->tp_forw = tpool;
  395                 tpool->tp_forw = thread_pools;
  396                 tpool->tp_back = thread_pools->tp_back;
  397                 thread_pools->tp_back = tpool;
  398         }
  399         pthread_mutex_unlock(&thread_pool_lock);
  400 
  401         return (tpool);
  402 }
  403 
  404 /*
  405  * Dispatch a work request to the thread pool.
  406  * If there are idle workers, awaken one.
  407  * Else, if the maximum number of workers has
  408  * not been reached, spawn a new worker thread.
  409  * Else just return with the job added to the queue.
  410  */
  411 int
  412 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
  413 {
  414         tpool_job_t *job;
  415 
  416         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  417 
  418         if ((job = calloc(1, sizeof (*job))) == NULL)
  419                 return (-1);
  420         job->tpj_next = NULL;
  421         job->tpj_func = func;
  422         job->tpj_arg = arg;
  423 
  424         pthread_mutex_lock(&tpool->tp_mutex);
  425 
  426         if (tpool->tp_head == NULL)
  427                 tpool->tp_head = job;
  428         else
  429                 tpool->tp_tail->tpj_next = job;
  430         tpool->tp_tail = job;
  431         tpool->tp_njobs++;
  432 
  433         if (!(tpool->tp_flags & TP_SUSPEND)) {
  434                 if (tpool->tp_idle > 0)
  435                         (void) pthread_cond_signal(&tpool->tp_workcv);
  436                 else if (tpool->tp_current < tpool->tp_maximum &&
  437                     create_worker(tpool) == 0)
  438                         tpool->tp_current++;
  439         }
  440 
  441         pthread_mutex_unlock(&tpool->tp_mutex);
  442         return (0);
  443 }
  444 
  445 static void
  446 tpool_cleanup(void *arg)
  447 {
  448         tpool_t *tpool = (tpool_t *)arg;
  449 
  450         pthread_mutex_unlock(&tpool->tp_mutex);
  451 }
  452 
  453 /*
  454  * Assumes: by the time tpool_destroy() is called no one will use this
  455  * thread pool in any way and no one will try to dispatch entries to it.
  456  * Calling tpool_destroy() from a job in the pool will cause deadlock.
  457  */
  458 void
  459 tpool_destroy(tpool_t *tpool)
  460 {
  461         tpool_active_t *activep;
  462 
  463         ASSERT(!tpool_member(tpool));
  464         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  465 
  466         pthread_mutex_lock(&tpool->tp_mutex);
  467         pthread_cleanup_push(tpool_cleanup, tpool);
  468 
  469         /* mark the pool as being destroyed; wakeup idle workers */
  470         tpool->tp_flags |= TP_DESTROY;
  471         tpool->tp_flags &= ~TP_SUSPEND;
  472         (void) pthread_cond_broadcast(&tpool->tp_workcv);
  473 
  474         /* cancel all active workers */
  475         for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
  476                 (void) pthread_cancel(activep->tpa_tid);
  477 
  478         /* wait for all active workers to finish */
  479         while (tpool->tp_active != NULL) {
  480                 tpool->tp_flags |= TP_WAIT;
  481                 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
  482         }
  483 
  484         /* the last worker to terminate will wake us up */
  485         while (tpool->tp_current != 0)
  486                 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
  487 
  488         pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
  489         delete_pool(tpool);
  490 }
  491 
  492 /*
  493  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
  494  * The last worker to terminate will delete the pool.
  495  */
  496 void
  497 tpool_abandon(tpool_t *tpool)
  498 {
  499         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  500 
  501         pthread_mutex_lock(&tpool->tp_mutex);
  502         if (tpool->tp_current == 0) {
  503                 /* no workers, just delete the pool */
  504                 pthread_mutex_unlock(&tpool->tp_mutex);
  505                 delete_pool(tpool);
  506         } else {
  507                 /* wake up all workers, last one will delete the pool */
  508                 tpool->tp_flags |= TP_ABANDON;
  509                 tpool->tp_flags &= ~TP_SUSPEND;
  510                 (void) pthread_cond_broadcast(&tpool->tp_workcv);
  511                 pthread_mutex_unlock(&tpool->tp_mutex);
  512         }
  513 }
  514 
  515 /*
  516  * Wait for all jobs to complete.
  517  * Calling tpool_wait() from a job in the pool will cause deadlock.
  518  */
  519 void
  520 tpool_wait(tpool_t *tpool)
  521 {
  522         ASSERT(!tpool_member(tpool));
  523         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  524 
  525         pthread_mutex_lock(&tpool->tp_mutex);
  526         pthread_cleanup_push(tpool_cleanup, tpool);
  527         while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
  528                 tpool->tp_flags |= TP_WAIT;
  529                 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
  530                 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  531         }
  532         pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
  533 }
  534 
  535 void
  536 tpool_suspend(tpool_t *tpool)
  537 {
  538         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  539 
  540         pthread_mutex_lock(&tpool->tp_mutex);
  541         tpool->tp_flags |= TP_SUSPEND;
  542         pthread_mutex_unlock(&tpool->tp_mutex);
  543 }
  544 
  545 int
  546 tpool_suspended(tpool_t *tpool)
  547 {
  548         int suspended;
  549 
  550         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  551 
  552         pthread_mutex_lock(&tpool->tp_mutex);
  553         suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
  554         pthread_mutex_unlock(&tpool->tp_mutex);
  555 
  556         return (suspended);
  557 }
  558 
  559 void
  560 tpool_resume(tpool_t *tpool)
  561 {
  562         int excess;
  563 
  564         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  565 
  566         pthread_mutex_lock(&tpool->tp_mutex);
  567         if (!(tpool->tp_flags & TP_SUSPEND)) {
  568                 pthread_mutex_unlock(&tpool->tp_mutex);
  569                 return;
  570         }
  571         tpool->tp_flags &= ~TP_SUSPEND;
  572         (void) pthread_cond_broadcast(&tpool->tp_workcv);
  573         excess = tpool->tp_njobs - tpool->tp_idle;
  574         while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
  575                 if (create_worker(tpool) != 0)
  576                         break;          /* pthread_create() failed */
  577                 tpool->tp_current++;
  578         }
  579         pthread_mutex_unlock(&tpool->tp_mutex);
  580 }
  581 
  582 int
  583 tpool_member(tpool_t *tpool)
  584 {
  585         pthread_t my_tid = pthread_self();
  586         tpool_active_t *activep;
  587 
  588         ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
  589 
  590         pthread_mutex_lock(&tpool->tp_mutex);
  591         for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
  592                 if (activep->tpa_tid == my_tid) {
  593                         pthread_mutex_unlock(&tpool->tp_mutex);
  594                         return (1);
  595                 }
  596         }
  597         pthread_mutex_unlock(&tpool->tp_mutex);
  598         return (0);
  599 }

Cache object: 004e0634afcb28c16f14f228474d0978


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.