The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_workqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: subr_workqueue.c,v 1.41 2022/10/29 11:41:00 riastradh Exp $    */
    2 
    3 /*-
    4  * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.41 2022/10/29 11:41:00 riastradh Exp $");
   31 
   32 #include <sys/param.h>
   33 #include <sys/cpu.h>
   34 #include <sys/systm.h>
   35 #include <sys/kthread.h>
   36 #include <sys/kmem.h>
   37 #include <sys/proc.h>
   38 #include <sys/workqueue.h>
   39 #include <sys/mutex.h>
   40 #include <sys/condvar.h>
   41 #include <sys/sdt.h>
   42 #include <sys/queue.h>
   43 
   44 typedef struct work_impl {
   45         SIMPLEQ_ENTRY(work_impl) wk_entry;
   46 } work_impl_t;
   47 
   48 SIMPLEQ_HEAD(workqhead, work_impl);
   49 
   50 struct workqueue_queue {
   51         kmutex_t q_mutex;
   52         kcondvar_t q_cv;
   53         struct workqhead q_queue_pending;
   54         struct workqhead q_queue_running;
   55         lwp_t *q_worker;
   56 };
   57 
   58 struct workqueue {
   59         void (*wq_func)(struct work *, void *);
   60         void *wq_arg;
   61         int wq_flags;
   62 
   63         char wq_name[MAXCOMLEN];
   64         pri_t wq_prio;
   65         void *wq_ptr;
   66 };
   67 
   68 #define WQ_SIZE         (roundup2(sizeof(struct workqueue), coherency_unit))
   69 #define WQ_QUEUE_SIZE   (roundup2(sizeof(struct workqueue_queue), coherency_unit))
   70 
   71 #define POISON  0xaabbccdd
   72 
   73 SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create,
   74     "struct workqueue *"/*wq*/,
   75     "const char *"/*name*/,
   76     "void (*)(struct work *, void *)"/*func*/,
   77     "void *"/*arg*/,
   78     "pri_t"/*prio*/,
   79     "int"/*ipl*/,
   80     "int"/*flags*/);
   81 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy,
   82     "struct workqueue *"/*wq*/);
   83 
   84 SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue,
   85     "struct workqueue *"/*wq*/,
   86     "struct work *"/*wk*/,
   87     "struct cpu_info *"/*ci*/);
   88 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry,
   89     "struct workqueue *"/*wq*/,
   90     "struct work *"/*wk*/,
   91     "void (*)(struct work *, void *)"/*func*/,
   92     "void *"/*arg*/);
   93 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return,
   94     "struct workqueue *"/*wq*/,
   95     "struct work *"/*wk*/,
   96     "void (*)(struct work *, void *)"/*func*/,
   97     "void *"/*arg*/);
   98 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start,
   99     "struct workqueue *"/*wq*/,
  100     "struct work *"/*wk*/);
  101 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done,
  102     "struct workqueue *"/*wq*/,
  103     "struct work *"/*wk*/);
  104 
  105 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start,
  106     "struct workqueue *"/*wq*/);
  107 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done,
  108     "struct workqueue *"/*wq*/);
  109 
  110 static size_t
  111 workqueue_size(int flags)
  112 {
  113 
  114         return WQ_SIZE
  115             + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE
  116             + coherency_unit;
  117 }
  118 
  119 static struct workqueue_queue *
  120 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci)
  121 {
  122         u_int idx = 0;
  123 
  124         if (wq->wq_flags & WQ_PERCPU) {
  125                 idx = ci ? cpu_index(ci) : cpu_index(curcpu());
  126         }
  127 
  128         return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE));
  129 }
  130 
  131 static void
  132 workqueue_runlist(struct workqueue *wq, struct workqhead *list)
  133 {
  134         work_impl_t *wk;
  135         work_impl_t *next;
  136 
  137         /*
  138          * note that "list" is not a complete SIMPLEQ.
  139          */
  140 
  141         for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
  142                 next = SIMPLEQ_NEXT(wk, wk_entry);
  143                 SDT_PROBE4(sdt, kernel, workqueue, entry,
  144                     wq, wk, wq->wq_func, wq->wq_arg);
  145                 (*wq->wq_func)((void *)wk, wq->wq_arg);
  146                 SDT_PROBE4(sdt, kernel, workqueue, return,
  147                     wq, wk, wq->wq_func, wq->wq_arg);
  148         }
  149 }
  150 
  151 static void
  152 workqueue_worker(void *cookie)
  153 {
  154         struct workqueue *wq = cookie;
  155         struct workqueue_queue *q;
  156         int s;
  157 
  158         /* find the workqueue of this kthread */
  159         q = workqueue_queue_lookup(wq, curlwp->l_cpu);
  160 
  161         if (wq->wq_flags & WQ_FPU)
  162                 s = kthread_fpu_enter();
  163         for (;;) {
  164                 /*
  165                  * we violate abstraction of SIMPLEQ.
  166                  */
  167 
  168                 mutex_enter(&q->q_mutex);
  169                 while (SIMPLEQ_EMPTY(&q->q_queue_pending))
  170                         cv_wait(&q->q_cv, &q->q_mutex);
  171                 KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
  172                 q->q_queue_running.sqh_first =
  173                     q->q_queue_pending.sqh_first; /* XXX */
  174                 SIMPLEQ_INIT(&q->q_queue_pending);
  175                 mutex_exit(&q->q_mutex);
  176 
  177                 workqueue_runlist(wq, &q->q_queue_running);
  178 
  179                 mutex_enter(&q->q_mutex);
  180                 KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
  181                 SIMPLEQ_INIT(&q->q_queue_running);
  182                 /* Wake up workqueue_wait */
  183                 cv_broadcast(&q->q_cv);
  184                 mutex_exit(&q->q_mutex);
  185         }
  186         if (wq->wq_flags & WQ_FPU)
  187                 kthread_fpu_exit(s);
  188 }
  189 
  190 static void
  191 workqueue_init(struct workqueue *wq, const char *name,
  192     void (*callback_func)(struct work *, void *), void *callback_arg,
  193     pri_t prio, int ipl)
  194 {
  195 
  196         KASSERT(sizeof(wq->wq_name) > strlen(name));
  197         strncpy(wq->wq_name, name, sizeof(wq->wq_name));
  198 
  199         wq->wq_prio = prio;
  200         wq->wq_func = callback_func;
  201         wq->wq_arg = callback_arg;
  202 }
  203 
  204 static int
  205 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
  206     int ipl, struct cpu_info *ci)
  207 {
  208         int error, ktf;
  209 
  210         KASSERT(q->q_worker == NULL);
  211 
  212         mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
  213         cv_init(&q->q_cv, wq->wq_name);
  214         SIMPLEQ_INIT(&q->q_queue_pending);
  215         SIMPLEQ_INIT(&q->q_queue_running);
  216         ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
  217         if (wq->wq_prio < PRI_KERNEL)
  218                 ktf |= KTHREAD_TS;
  219         if (ci) {
  220                 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
  221                     wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index);
  222         } else {
  223                 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
  224                     wq, &q->q_worker, "%s", wq->wq_name);
  225         }
  226         if (error != 0) {
  227                 mutex_destroy(&q->q_mutex);
  228                 cv_destroy(&q->q_cv);
  229                 KASSERT(q->q_worker == NULL);
  230         }
  231         return error;
  232 }
  233 
  234 struct workqueue_exitargs {
  235         work_impl_t wqe_wk;
  236         struct workqueue_queue *wqe_q;
  237 };
  238 
  239 static void
  240 workqueue_exit(struct work *wk, void *arg)
  241 {
  242         struct workqueue_exitargs *wqe = (void *)wk;
  243         struct workqueue_queue *q = wqe->wqe_q;
  244 
  245         /*
  246          * only competition at this point is workqueue_finiqueue.
  247          */
  248 
  249         KASSERT(q->q_worker == curlwp);
  250         KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
  251         mutex_enter(&q->q_mutex);
  252         q->q_worker = NULL;
  253         cv_broadcast(&q->q_cv);
  254         mutex_exit(&q->q_mutex);
  255         kthread_exit(0);
  256 }
  257 
  258 static void
  259 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
  260 {
  261         struct workqueue_exitargs wqe;
  262 
  263         KASSERT(wq->wq_func == workqueue_exit);
  264 
  265         wqe.wqe_q = q;
  266         KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
  267         KASSERT(q->q_worker != NULL);
  268         mutex_enter(&q->q_mutex);
  269         SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
  270         cv_broadcast(&q->q_cv);
  271         while (q->q_worker != NULL) {
  272                 cv_wait(&q->q_cv, &q->q_mutex);
  273         }
  274         mutex_exit(&q->q_mutex);
  275         mutex_destroy(&q->q_mutex);
  276         cv_destroy(&q->q_cv);
  277 }
  278 
  279 /* --- */
  280 
  281 int
  282 workqueue_create(struct workqueue **wqp, const char *name,
  283     void (*callback_func)(struct work *, void *), void *callback_arg,
  284     pri_t prio, int ipl, int flags)
  285 {
  286         struct workqueue *wq;
  287         struct workqueue_queue *q;
  288         void *ptr;
  289         int error = 0;
  290 
  291         CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
  292 
  293         ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
  294         wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
  295         wq->wq_ptr = ptr;
  296         wq->wq_flags = flags;
  297 
  298         workqueue_init(wq, name, callback_func, callback_arg, prio, ipl);
  299 
  300         if (flags & WQ_PERCPU) {
  301                 struct cpu_info *ci;
  302                 CPU_INFO_ITERATOR cii;
  303 
  304                 /* create the work-queue for each CPU */
  305                 for (CPU_INFO_FOREACH(cii, ci)) {
  306                         q = workqueue_queue_lookup(wq, ci);
  307                         error = workqueue_initqueue(wq, q, ipl, ci);
  308                         if (error) {
  309                                 break;
  310                         }
  311                 }
  312         } else {
  313                 /* initialize a work-queue */
  314                 q = workqueue_queue_lookup(wq, NULL);
  315                 error = workqueue_initqueue(wq, q, ipl, NULL);
  316         }
  317 
  318         if (error != 0) {
  319                 workqueue_destroy(wq);
  320         } else {
  321                 *wqp = wq;
  322         }
  323 
  324         return error;
  325 }
  326 
  327 static bool
  328 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target)
  329 {
  330         work_impl_t *wk;
  331         bool found = false;
  332 
  333         mutex_enter(&q->q_mutex);
  334         if (q->q_worker == curlwp)
  335                 goto out;
  336     again:
  337         SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
  338                 if (wk == wk_target)
  339                         goto found;
  340         }
  341         SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) {
  342                 if (wk == wk_target)
  343                         goto found;
  344         }
  345     found:
  346         if (wk != NULL) {
  347                 found = true;
  348                 cv_wait(&q->q_cv, &q->q_mutex);
  349                 goto again;
  350         }
  351     out:
  352         mutex_exit(&q->q_mutex);
  353 
  354         return found;
  355 }
  356 
  357 /*
  358  * Wait for a specified work to finish.  The caller must ensure that no new
  359  * work will be enqueued before calling workqueue_wait.  Note that if the
  360  * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue
  361  * other than the waiting queue.
  362  */
  363 void
  364 workqueue_wait(struct workqueue *wq, struct work *wk)
  365 {
  366         struct workqueue_queue *q;
  367         bool found;
  368 
  369         ASSERT_SLEEPABLE();
  370 
  371         SDT_PROBE2(sdt, kernel, workqueue, wait__start,  wq, wk);
  372         if (ISSET(wq->wq_flags, WQ_PERCPU)) {
  373                 struct cpu_info *ci;
  374                 CPU_INFO_ITERATOR cii;
  375                 for (CPU_INFO_FOREACH(cii, ci)) {
  376                         q = workqueue_queue_lookup(wq, ci);
  377                         found = workqueue_q_wait(q, (work_impl_t *)wk);
  378                         if (found)
  379                                 break;
  380                 }
  381         } else {
  382                 q = workqueue_queue_lookup(wq, NULL);
  383                 (void) workqueue_q_wait(q, (work_impl_t *)wk);
  384         }
  385         SDT_PROBE2(sdt, kernel, workqueue, wait__done,  wq, wk);
  386 }
  387 
  388 void
  389 workqueue_destroy(struct workqueue *wq)
  390 {
  391         struct workqueue_queue *q;
  392         struct cpu_info *ci;
  393         CPU_INFO_ITERATOR cii;
  394 
  395         ASSERT_SLEEPABLE();
  396 
  397         SDT_PROBE1(sdt, kernel, workqueue, exit__start,  wq);
  398         wq->wq_func = workqueue_exit;
  399         for (CPU_INFO_FOREACH(cii, ci)) {
  400                 q = workqueue_queue_lookup(wq, ci);
  401                 if (q->q_worker != NULL) {
  402                         workqueue_finiqueue(wq, q);
  403                 }
  404         }
  405         SDT_PROBE1(sdt, kernel, workqueue, exit__done,  wq);
  406         kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
  407 }
  408 
  409 #ifdef DEBUG
  410 static void
  411 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk)
  412 {
  413         work_impl_t *_wk;
  414 
  415         SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) {
  416                 if (_wk == wk)
  417                         panic("%s: tried to enqueue a queued work", __func__);
  418         }
  419 }
  420 #endif
  421 
  422 void
  423 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
  424 {
  425         struct workqueue_queue *q;
  426         work_impl_t *wk = (void *)wk0;
  427 
  428         SDT_PROBE3(sdt, kernel, workqueue, enqueue,  wq, wk0, ci);
  429 
  430         KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
  431         q = workqueue_queue_lookup(wq, ci);
  432 
  433         mutex_enter(&q->q_mutex);
  434 #ifdef DEBUG
  435         workqueue_check_duplication(q, wk);
  436 #endif
  437         SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
  438         cv_broadcast(&q->q_cv);
  439         mutex_exit(&q->q_mutex);
  440 }

Cache object: 64fd94bf0f453886a8225c13d12088eb


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.