The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD: releng/9.2/sys/kern/subr_taskqueue.c 243850 2012-12-04 00:54:49Z kib $");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/interrupt.h>
   34 #include <sys/kernel.h>
   35 #include <sys/kthread.h>
   36 #include <sys/limits.h>
   37 #include <sys/lock.h>
   38 #include <sys/malloc.h>
   39 #include <sys/mutex.h>
   40 #include <sys/proc.h>
   41 #include <sys/sched.h>
   42 #include <sys/taskqueue.h>
   43 #include <sys/unistd.h>
   44 #include <machine/stdarg.h>
   45 
   46 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   47 static void     *taskqueue_giant_ih;
   48 static void     *taskqueue_ih;
   49 
   50 struct taskqueue_busy {
   51         struct task     *tb_running;
   52         TAILQ_ENTRY(taskqueue_busy) tb_link;
   53 };
   54 
   55 struct taskqueue {
   56         STAILQ_HEAD(, task)     tq_queue;
   57         taskqueue_enqueue_fn    tq_enqueue;
   58         void                    *tq_context;
   59         TAILQ_HEAD(, taskqueue_busy) tq_active;
   60         struct mtx              tq_mutex;
   61         struct thread           **tq_threads;
   62         int                     tq_tcount;
   63         int                     tq_spin;
   64         int                     tq_flags;
   65         int                     tq_callouts;
   66 };
   67 
   68 #define TQ_FLAGS_ACTIVE         (1 << 0)
   69 #define TQ_FLAGS_BLOCKED        (1 << 1)
   70 #define TQ_FLAGS_PENDING        (1 << 2)
   71 
   72 #define DT_CALLOUT_ARMED        (1 << 0)
   73 
   74 #define TQ_LOCK(tq)                                                     \
   75         do {                                                            \
   76                 if ((tq)->tq_spin)                                      \
   77                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   78                 else                                                    \
   79                         mtx_lock(&(tq)->tq_mutex);                      \
   80         } while (0)
   81 
   82 #define TQ_UNLOCK(tq)                                                   \
   83         do {                                                            \
   84                 if ((tq)->tq_spin)                                      \
   85                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   86                 else                                                    \
   87                         mtx_unlock(&(tq)->tq_mutex);                    \
   88         } while (0)
   89 
   90 void
   91 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
   92     int priority, task_fn_t func, void *context)
   93 {
   94 
   95         TASK_INIT(&timeout_task->t, priority, func, context);
   96         callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0);
   97         timeout_task->q = queue;
   98         timeout_task->f = 0;
   99 }
  100 
  101 static __inline int
  102 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  103     int t)
  104 {
  105         if (tq->tq_spin)
  106                 return (msleep_spin(p, m, wm, t));
  107         return (msleep(p, m, pri, wm, t));
  108 }
  109 
  110 static struct taskqueue *
  111 _taskqueue_create(const char *name __unused, int mflags,
  112                  taskqueue_enqueue_fn enqueue, void *context,
  113                  int mtxflags, const char *mtxname)
  114 {
  115         struct taskqueue *queue;
  116 
  117         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  118         if (!queue)
  119                 return NULL;
  120 
  121         STAILQ_INIT(&queue->tq_queue);
  122         TAILQ_INIT(&queue->tq_active);
  123         queue->tq_enqueue = enqueue;
  124         queue->tq_context = context;
  125         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  126         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  127         mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
  128 
  129         return queue;
  130 }
  131 
  132 struct taskqueue *
  133 taskqueue_create(const char *name, int mflags,
  134                  taskqueue_enqueue_fn enqueue, void *context)
  135 {
  136         return _taskqueue_create(name, mflags, enqueue, context,
  137                         MTX_DEF, "taskqueue");
  138 }
  139 
  140 /*
  141  * Signal a taskqueue thread to terminate.
  142  */
  143 static void
  144 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  145 {
  146 
  147         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  148                 wakeup(tq);
  149                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  150         }
  151 }
  152 
  153 void
  154 taskqueue_free(struct taskqueue *queue)
  155 {
  156 
  157         TQ_LOCK(queue);
  158         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  159         taskqueue_terminate(queue->tq_threads, queue);
  160         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  161         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  162         mtx_destroy(&queue->tq_mutex);
  163         free(queue->tq_threads, M_TASKQUEUE);
  164         free(queue, M_TASKQUEUE);
  165 }
  166 
  167 static int
  168 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  169 {
  170         struct task *ins;
  171         struct task *prev;
  172 
  173         /*
  174          * Count multiple enqueues.
  175          */
  176         if (task->ta_pending) {
  177                 if (task->ta_pending < USHRT_MAX)
  178                         task->ta_pending++;
  179                 return (0);
  180         }
  181 
  182         /*
  183          * Optimise the case when all tasks have the same priority.
  184          */
  185         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  186         if (!prev || prev->ta_priority >= task->ta_priority) {
  187                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  188         } else {
  189                 prev = NULL;
  190                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  191                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  192                         if (ins->ta_priority < task->ta_priority)
  193                                 break;
  194 
  195                 if (prev)
  196                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  197                 else
  198                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  199         }
  200 
  201         task->ta_pending = 1;
  202         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  203                 queue->tq_enqueue(queue->tq_context);
  204         else
  205                 queue->tq_flags |= TQ_FLAGS_PENDING;
  206 
  207         return (0);
  208 }
  209 int
  210 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  211 {
  212         int res;
  213 
  214         TQ_LOCK(queue);
  215         res = taskqueue_enqueue_locked(queue, task);
  216         TQ_UNLOCK(queue);
  217 
  218         return (res);
  219 }
  220 
  221 static void
  222 taskqueue_timeout_func(void *arg)
  223 {
  224         struct taskqueue *queue;
  225         struct timeout_task *timeout_task;
  226 
  227         timeout_task = arg;
  228         queue = timeout_task->q;
  229         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  230         timeout_task->f &= ~DT_CALLOUT_ARMED;
  231         queue->tq_callouts--;
  232         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  233 }
  234 
  235 int
  236 taskqueue_enqueue_timeout(struct taskqueue *queue,
  237     struct timeout_task *timeout_task, int ticks)
  238 {
  239         int res;
  240 
  241         TQ_LOCK(queue);
  242         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  243             ("Migrated queue"));
  244         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  245         timeout_task->q = queue;
  246         res = timeout_task->t.ta_pending;
  247         if (ticks == 0) {
  248                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  249         } else {
  250                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  251                         res++;
  252                 } else {
  253                         queue->tq_callouts++;
  254                         timeout_task->f |= DT_CALLOUT_ARMED;
  255                         if (ticks < 0)
  256                                 ticks = -ticks; /* Ignore overflow. */
  257                 }
  258                 if (ticks > 0) {
  259                         callout_reset(&timeout_task->c, ticks,
  260                             taskqueue_timeout_func, timeout_task);
  261                 }
  262         }
  263         TQ_UNLOCK(queue);
  264         return (res);
  265 }
  266 
  267 void
  268 taskqueue_block(struct taskqueue *queue)
  269 {
  270 
  271         TQ_LOCK(queue);
  272         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  273         TQ_UNLOCK(queue);
  274 }
  275 
  276 void
  277 taskqueue_unblock(struct taskqueue *queue)
  278 {
  279 
  280         TQ_LOCK(queue);
  281         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  282         if (queue->tq_flags & TQ_FLAGS_PENDING) {
  283                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
  284                 queue->tq_enqueue(queue->tq_context);
  285         }
  286         TQ_UNLOCK(queue);
  287 }
  288 
  289 static void
  290 taskqueue_run_locked(struct taskqueue *queue)
  291 {
  292         struct taskqueue_busy tb;
  293         struct task *task;
  294         int pending;
  295 
  296         mtx_assert(&queue->tq_mutex, MA_OWNED);
  297         tb.tb_running = NULL;
  298         TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  299 
  300         while (STAILQ_FIRST(&queue->tq_queue)) {
  301                 /*
  302                  * Carefully remove the first task from the queue and
  303                  * zero its pending count.
  304                  */
  305                 task = STAILQ_FIRST(&queue->tq_queue);
  306                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  307                 pending = task->ta_pending;
  308                 task->ta_pending = 0;
  309                 tb.tb_running = task;
  310                 TQ_UNLOCK(queue);
  311 
  312                 task->ta_func(task->ta_context, pending);
  313 
  314                 TQ_LOCK(queue);
  315                 tb.tb_running = NULL;
  316                 wakeup(task);
  317         }
  318         TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  319 }
  320 
  321 void
  322 taskqueue_run(struct taskqueue *queue)
  323 {
  324 
  325         TQ_LOCK(queue);
  326         taskqueue_run_locked(queue);
  327         TQ_UNLOCK(queue);
  328 }
  329 
  330 static int
  331 task_is_running(struct taskqueue *queue, struct task *task)
  332 {
  333         struct taskqueue_busy *tb;
  334 
  335         mtx_assert(&queue->tq_mutex, MA_OWNED);
  336         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  337                 if (tb->tb_running == task)
  338                         return (1);
  339         }
  340         return (0);
  341 }
  342 
  343 static int
  344 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  345     u_int *pendp)
  346 {
  347 
  348         if (task->ta_pending > 0)
  349                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  350         if (pendp != NULL)
  351                 *pendp = task->ta_pending;
  352         task->ta_pending = 0;
  353         return (task_is_running(queue, task) ? EBUSY : 0);
  354 }
  355 
  356 int
  357 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  358 {
  359         u_int pending;
  360         int error;
  361 
  362         TQ_LOCK(queue);
  363         pending = task->ta_pending;
  364         error = taskqueue_cancel_locked(queue, task, pendp);
  365         TQ_UNLOCK(queue);
  366 
  367         return (error);
  368 }
  369 
  370 int
  371 taskqueue_cancel_timeout(struct taskqueue *queue,
  372     struct timeout_task *timeout_task, u_int *pendp)
  373 {
  374         u_int pending, pending1;
  375         int error;
  376 
  377         TQ_LOCK(queue);
  378         pending = !!callout_stop(&timeout_task->c);
  379         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  380         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  381                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  382                 queue->tq_callouts--;
  383         }
  384         TQ_UNLOCK(queue);
  385 
  386         if (pendp != NULL)
  387                 *pendp = pending + pending1;
  388         return (error);
  389 }
  390 
  391 void
  392 taskqueue_drain(struct taskqueue *queue, struct task *task)
  393 {
  394 
  395         if (!queue->tq_spin)
  396                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  397 
  398         TQ_LOCK(queue);
  399         while (task->ta_pending != 0 || task_is_running(queue, task))
  400                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  401         TQ_UNLOCK(queue);
  402 }
  403 
  404 void
  405 taskqueue_drain_timeout(struct taskqueue *queue,
  406     struct timeout_task *timeout_task)
  407 {
  408 
  409         callout_drain(&timeout_task->c);
  410         taskqueue_drain(queue, &timeout_task->t);
  411 }
  412 
  413 static void
  414 taskqueue_swi_enqueue(void *context)
  415 {
  416         swi_sched(taskqueue_ih, 0);
  417 }
  418 
  419 static void
  420 taskqueue_swi_run(void *dummy)
  421 {
  422         taskqueue_run(taskqueue_swi);
  423 }
  424 
  425 static void
  426 taskqueue_swi_giant_enqueue(void *context)
  427 {
  428         swi_sched(taskqueue_giant_ih, 0);
  429 }
  430 
  431 static void
  432 taskqueue_swi_giant_run(void *dummy)
  433 {
  434         taskqueue_run(taskqueue_swi_giant);
  435 }
  436 
  437 int
  438 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  439                         const char *name, ...)
  440 {
  441         va_list ap;
  442         struct thread *td;
  443         struct taskqueue *tq;
  444         int i, error;
  445         char ktname[MAXCOMLEN + 1];
  446 
  447         if (count <= 0)
  448                 return (EINVAL);
  449 
  450         tq = *tqp;
  451 
  452         va_start(ap, name);
  453         vsnprintf(ktname, sizeof(ktname), name, ap);
  454         va_end(ap);
  455 
  456         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  457             M_NOWAIT | M_ZERO);
  458         if (tq->tq_threads == NULL) {
  459                 printf("%s: no memory for %s threads\n", __func__, ktname);
  460                 return (ENOMEM);
  461         }
  462 
  463         for (i = 0; i < count; i++) {
  464                 if (count == 1)
  465                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  466                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  467                 else
  468                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  469                             &tq->tq_threads[i], RFSTOPPED, 0,
  470                             "%s_%d", ktname, i);
  471                 if (error) {
  472                         /* should be ok to continue, taskqueue_free will dtrt */
  473                         printf("%s: kthread_add(%s): error %d", __func__,
  474                             ktname, error);
  475                         tq->tq_threads[i] = NULL;               /* paranoid */
  476                 } else
  477                         tq->tq_tcount++;
  478         }
  479         for (i = 0; i < count; i++) {
  480                 if (tq->tq_threads[i] == NULL)
  481                         continue;
  482                 td = tq->tq_threads[i];
  483                 thread_lock(td);
  484                 sched_prio(td, pri);
  485                 sched_add(td, SRQ_BORING);
  486                 thread_unlock(td);
  487         }
  488 
  489         return (0);
  490 }
  491 
  492 void
  493 taskqueue_thread_loop(void *arg)
  494 {
  495         struct taskqueue **tqp, *tq;
  496 
  497         tqp = arg;
  498         tq = *tqp;
  499         TQ_LOCK(tq);
  500         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  501                 taskqueue_run_locked(tq);
  502                 /*
  503                  * Because taskqueue_run() can drop tq_mutex, we need to
  504                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  505                  * meantime, which means we missed a wakeup.
  506                  */
  507                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  508                         break;
  509                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  510         }
  511         taskqueue_run_locked(tq);
  512 
  513         /* rendezvous with thread that asked us to terminate */
  514         tq->tq_tcount--;
  515         wakeup_one(tq->tq_threads);
  516         TQ_UNLOCK(tq);
  517         kthread_exit();
  518 }
  519 
  520 void
  521 taskqueue_thread_enqueue(void *context)
  522 {
  523         struct taskqueue **tqp, *tq;
  524 
  525         tqp = context;
  526         tq = *tqp;
  527 
  528         mtx_assert(&tq->tq_mutex, MA_OWNED);
  529         wakeup_one(tq);
  530 }
  531 
  532 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  533                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  534                      INTR_MPSAFE, &taskqueue_ih)); 
  535 
  536 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  537                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  538                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 
  539 
  540 TASKQUEUE_DEFINE_THREAD(thread);
  541 
  542 struct taskqueue *
  543 taskqueue_create_fast(const char *name, int mflags,
  544                  taskqueue_enqueue_fn enqueue, void *context)
  545 {
  546         return _taskqueue_create(name, mflags, enqueue, context,
  547                         MTX_SPIN, "fast_taskqueue");
  548 }
  549 
  550 /* NB: for backwards compatibility */
  551 int
  552 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
  553 {
  554         return taskqueue_enqueue(queue, task);
  555 }
  556 
  557 static void     *taskqueue_fast_ih;
  558 
  559 static void
  560 taskqueue_fast_enqueue(void *context)
  561 {
  562         swi_sched(taskqueue_fast_ih, 0);
  563 }
  564 
  565 static void
  566 taskqueue_fast_run(void *dummy)
  567 {
  568         taskqueue_run(taskqueue_fast);
  569 }
  570 
  571 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  572         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  573         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  574 
  575 int
  576 taskqueue_member(struct taskqueue *queue, struct thread *td)
  577 {
  578         int i, j, ret = 0;
  579 
  580         TQ_LOCK(queue);
  581         for (i = 0, j = 0; ; i++) {
  582                 if (queue->tq_threads[i] == NULL)
  583                         continue;
  584                 if (queue->tq_threads[i] == td) {
  585                         ret = 1;
  586                         break;
  587                 }
  588                 if (++j >= queue->tq_tcount)
  589                         break;
  590         }
  591         TQ_UNLOCK(queue);
  592         return (ret);
  593 }

Cache object: 95f7a8a5cf31091b5f6e197d0d46f47c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.