The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_workqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: subr_workqueue.c,v 1.26.4.1 2009/04/04 16:58:25 snj Exp $      */
    2 
    3 /*-
    4  * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.26.4.1 2009/04/04 16:58:25 snj Exp $");
   31 
   32 #include <sys/param.h>
   33 #include <sys/cpu.h>
   34 #include <sys/systm.h>
   35 #include <sys/kthread.h>
   36 #include <sys/kmem.h>
   37 #include <sys/proc.h>
   38 #include <sys/workqueue.h>
   39 #include <sys/mutex.h>
   40 #include <sys/condvar.h>
   41 #include <sys/queue.h>
   42 
   43 #include <uvm/uvm_extern.h>
   44 
   45 typedef struct work_impl {
   46         SIMPLEQ_ENTRY(work_impl) wk_entry;
   47 } work_impl_t;
   48 
   49 SIMPLEQ_HEAD(workqhead, work_impl);
   50 
   51 struct workqueue_queue {
   52         kmutex_t q_mutex;
   53         kcondvar_t q_cv;
   54         struct workqhead q_queue;
   55         struct lwp *q_worker;
   56 };
   57 
   58 struct workqueue {
   59         void (*wq_func)(struct work *, void *);
   60         void *wq_arg;
   61         int wq_flags;
   62 
   63         const char *wq_name;
   64         pri_t wq_prio;
   65         void *wq_ptr;
   66 };
   67 
   68 #define WQ_SIZE         (roundup2(sizeof(struct workqueue), coherency_unit))
   69 #define WQ_QUEUE_SIZE   (roundup2(sizeof(struct workqueue_queue), coherency_unit))
   70 
   71 #define POISON  0xaabbccdd
   72 
   73 static size_t
   74 workqueue_size(int flags)
   75 {
   76 
   77         return WQ_SIZE
   78             + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE
   79             + coherency_unit;
   80 }
   81 
   82 static struct workqueue_queue *
   83 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci)
   84 {
   85         u_int idx = 0;
   86 
   87         if (wq->wq_flags & WQ_PERCPU) {
   88                 idx = ci ? cpu_index(ci) : cpu_index(curcpu());
   89         }
   90 
   91         return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE));
   92 }
   93 
   94 static void
   95 workqueue_runlist(struct workqueue *wq, struct workqhead *list)
   96 {
   97         work_impl_t *wk;
   98         work_impl_t *next;
   99 
  100         /*
  101          * note that "list" is not a complete SIMPLEQ.
  102          */
  103 
  104         for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
  105                 next = SIMPLEQ_NEXT(wk, wk_entry);
  106                 (*wq->wq_func)((void *)wk, wq->wq_arg);
  107         }
  108 }
  109 
  110 static void
  111 workqueue_worker(void *cookie)
  112 {
  113         struct workqueue *wq = cookie;
  114         struct workqueue_queue *q;
  115 
  116         /* find the workqueue of this kthread */
  117         q = workqueue_queue_lookup(wq, curlwp->l_cpu);
  118 
  119         for (;;) {
  120                 struct workqhead tmp;
  121 
  122                 /*
  123                  * we violate abstraction of SIMPLEQ.
  124                  */
  125 
  126 #if defined(DIAGNOSTIC)
  127                 tmp.sqh_last = (void *)POISON;
  128 #endif /* defined(DIAGNOSTIC) */
  129 
  130                 mutex_enter(&q->q_mutex);
  131                 while (SIMPLEQ_EMPTY(&q->q_queue))
  132                         cv_wait(&q->q_cv, &q->q_mutex);
  133                 tmp.sqh_first = q->q_queue.sqh_first; /* XXX */
  134                 SIMPLEQ_INIT(&q->q_queue);
  135                 mutex_exit(&q->q_mutex);
  136 
  137                 workqueue_runlist(wq, &tmp);
  138         }
  139 }
  140 
  141 static void
  142 workqueue_init(struct workqueue *wq, const char *name,
  143     void (*callback_func)(struct work *, void *), void *callback_arg,
  144     pri_t prio, int ipl)
  145 {
  146 
  147         wq->wq_prio = prio;
  148         wq->wq_name = name;
  149         wq->wq_func = callback_func;
  150         wq->wq_arg = callback_arg;
  151 }
  152 
  153 static int
  154 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
  155     int ipl, struct cpu_info *ci)
  156 {
  157         int error, ktf;
  158 
  159         KASSERT(q->q_worker == NULL);
  160 
  161         mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
  162         cv_init(&q->q_cv, wq->wq_name);
  163         SIMPLEQ_INIT(&q->q_queue);
  164         ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
  165         if (ci) {
  166                 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
  167                     wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index);
  168         } else {
  169                 error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
  170                     wq, &q->q_worker, "%s", wq->wq_name);
  171         }
  172         if (error != 0) {
  173                 mutex_destroy(&q->q_mutex);
  174                 cv_destroy(&q->q_cv);
  175                 KASSERT(q->q_worker == NULL);
  176         }
  177         return error;
  178 }
  179 
  180 struct workqueue_exitargs {
  181         work_impl_t wqe_wk;
  182         struct workqueue_queue *wqe_q;
  183 };
  184 
  185 static void
  186 workqueue_exit(struct work *wk, void *arg)
  187 {
  188         struct workqueue_exitargs *wqe = (void *)wk;
  189         struct workqueue_queue *q = wqe->wqe_q;
  190 
  191         /*
  192          * only competition at this point is workqueue_finiqueue.
  193          */
  194 
  195         KASSERT(q->q_worker == curlwp);
  196         KASSERT(SIMPLEQ_EMPTY(&q->q_queue));
  197         mutex_enter(&q->q_mutex);
  198         q->q_worker = NULL;
  199         cv_signal(&q->q_cv);
  200         mutex_exit(&q->q_mutex);
  201         kthread_exit(0);
  202 }
  203 
  204 static void
  205 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
  206 {
  207         struct workqueue_exitargs wqe;
  208         lwp_t *l;
  209 
  210         KASSERT(wq->wq_func == workqueue_exit);
  211 
  212         wqe.wqe_q = q;
  213         KASSERT(SIMPLEQ_EMPTY(&q->q_queue));
  214         KASSERT(q->q_worker != NULL);
  215         l = curlwp;
  216         uvm_lwp_hold(l);        
  217         mutex_enter(&q->q_mutex);
  218         SIMPLEQ_INSERT_TAIL(&q->q_queue, &wqe.wqe_wk, wk_entry);
  219         cv_signal(&q->q_cv);
  220         while (q->q_worker != NULL) {
  221                 cv_wait(&q->q_cv, &q->q_mutex);
  222         }
  223         mutex_exit(&q->q_mutex);
  224         uvm_lwp_rele(l);        
  225         mutex_destroy(&q->q_mutex);
  226         cv_destroy(&q->q_cv);
  227 }
  228 
  229 /* --- */
  230 
  231 int
  232 workqueue_create(struct workqueue **wqp, const char *name,
  233     void (*callback_func)(struct work *, void *), void *callback_arg,
  234     pri_t prio, int ipl, int flags)
  235 {
  236         struct workqueue *wq;
  237         struct workqueue_queue *q;
  238         void *ptr;
  239         int error = 0;
  240 
  241         CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
  242 
  243         ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
  244         wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
  245         wq->wq_ptr = ptr;
  246         wq->wq_flags = flags;
  247 
  248         workqueue_init(wq, name, callback_func, callback_arg, prio, ipl);
  249 
  250         if (flags & WQ_PERCPU) {
  251                 struct cpu_info *ci;
  252                 CPU_INFO_ITERATOR cii;
  253 
  254                 /* create the work-queue for each CPU */
  255                 for (CPU_INFO_FOREACH(cii, ci)) {
  256                         q = workqueue_queue_lookup(wq, ci);
  257                         error = workqueue_initqueue(wq, q, ipl, ci);
  258                         if (error) {
  259                                 break;
  260                         }
  261                 }
  262         } else {
  263                 /* initialize a work-queue */
  264                 q = workqueue_queue_lookup(wq, NULL);
  265                 error = workqueue_initqueue(wq, q, ipl, NULL);
  266         }
  267 
  268         if (error != 0) {
  269                 workqueue_destroy(wq);
  270         } else {
  271                 *wqp = wq;
  272         }
  273 
  274         return error;
  275 }
  276 
  277 void
  278 workqueue_destroy(struct workqueue *wq)
  279 {
  280         struct workqueue_queue *q;
  281         struct cpu_info *ci;
  282         CPU_INFO_ITERATOR cii;
  283 
  284         wq->wq_func = workqueue_exit;
  285         for (CPU_INFO_FOREACH(cii, ci)) {
  286                 q = workqueue_queue_lookup(wq, ci);
  287                 if (q->q_worker != NULL) {
  288                         workqueue_finiqueue(wq, q);
  289                 }
  290         }
  291         kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
  292 }
  293 
  294 void
  295 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
  296 {
  297         struct workqueue_queue *q;
  298         work_impl_t *wk = (void *)wk0;
  299 
  300         KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
  301         q = workqueue_queue_lookup(wq, ci);
  302 
  303         mutex_enter(&q->q_mutex);
  304         SIMPLEQ_INSERT_TAIL(&q->q_queue, wk, wk_entry);
  305         cv_signal(&q->q_cv);
  306         mutex_exit(&q->q_mutex);
  307 }

Cache object: ff690cf65e59f653e98e8cfed7b78fd2


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.