The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD: releng/9.1/sys/kern/subr_taskqueue.c 225570 2011-09-15 08:42:06Z adrian $");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/interrupt.h>
   34 #include <sys/kernel.h>
   35 #include <sys/kthread.h>
   36 #include <sys/limits.h>
   37 #include <sys/lock.h>
   38 #include <sys/malloc.h>
   39 #include <sys/mutex.h>
   40 #include <sys/proc.h>
   41 #include <sys/sched.h>
   42 #include <sys/taskqueue.h>
   43 #include <sys/unistd.h>
   44 #include <machine/stdarg.h>
   45 
   46 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   47 static void     *taskqueue_giant_ih;
   48 static void     *taskqueue_ih;
   49 
   50 struct taskqueue_busy {
   51         struct task     *tb_running;
   52         TAILQ_ENTRY(taskqueue_busy) tb_link;
   53 };
   54 
   55 struct taskqueue {
   56         STAILQ_HEAD(, task)     tq_queue;
   57         taskqueue_enqueue_fn    tq_enqueue;
   58         void                    *tq_context;
   59         TAILQ_HEAD(, taskqueue_busy) tq_active;
   60         struct mtx              tq_mutex;
   61         struct thread           **tq_threads;
   62         int                     tq_tcount;
   63         int                     tq_spin;
   64         int                     tq_flags;
   65         int                     tq_callouts;
   66 };
   67 
   68 #define TQ_FLAGS_ACTIVE         (1 << 0)
   69 #define TQ_FLAGS_BLOCKED        (1 << 1)
   70 #define TQ_FLAGS_PENDING        (1 << 2)
   71 
   72 #define DT_CALLOUT_ARMED        (1 << 0)
   73 
   74 #define TQ_LOCK(tq)                                                     \
   75         do {                                                            \
   76                 if ((tq)->tq_spin)                                      \
   77                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   78                 else                                                    \
   79                         mtx_lock(&(tq)->tq_mutex);                      \
   80         } while (0)
   81 
   82 #define TQ_UNLOCK(tq)                                                   \
   83         do {                                                            \
   84                 if ((tq)->tq_spin)                                      \
   85                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   86                 else                                                    \
   87                         mtx_unlock(&(tq)->tq_mutex);                    \
   88         } while (0)
   89 
   90 void
   91 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
   92     int priority, task_fn_t func, void *context)
   93 {
   94 
   95         TASK_INIT(&timeout_task->t, priority, func, context);
   96         callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0);
   97         timeout_task->q = queue;
   98         timeout_task->f = 0;
   99 }
  100 
  101 static __inline int
  102 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  103     int t)
  104 {
  105         if (tq->tq_spin)
  106                 return (msleep_spin(p, m, wm, t));
  107         return (msleep(p, m, pri, wm, t));
  108 }
  109 
  110 static struct taskqueue *
  111 _taskqueue_create(const char *name __unused, int mflags,
  112                  taskqueue_enqueue_fn enqueue, void *context,
  113                  int mtxflags, const char *mtxname)
  114 {
  115         struct taskqueue *queue;
  116 
  117         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  118         if (!queue)
  119                 return NULL;
  120 
  121         STAILQ_INIT(&queue->tq_queue);
  122         TAILQ_INIT(&queue->tq_active);
  123         queue->tq_enqueue = enqueue;
  124         queue->tq_context = context;
  125         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  126         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  127         mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
  128 
  129         return queue;
  130 }
  131 
  132 struct taskqueue *
  133 taskqueue_create(const char *name, int mflags,
  134                  taskqueue_enqueue_fn enqueue, void *context)
  135 {
  136         return _taskqueue_create(name, mflags, enqueue, context,
  137                         MTX_DEF, "taskqueue");
  138 }
  139 
  140 /*
  141  * Signal a taskqueue thread to terminate.
  142  */
  143 static void
  144 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  145 {
  146 
  147         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  148                 wakeup(tq);
  149                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  150         }
  151 }
  152 
  153 void
  154 taskqueue_free(struct taskqueue *queue)
  155 {
  156 
  157         TQ_LOCK(queue);
  158         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  159         taskqueue_terminate(queue->tq_threads, queue);
  160         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  161         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  162         mtx_destroy(&queue->tq_mutex);
  163         free(queue->tq_threads, M_TASKQUEUE);
  164         free(queue, M_TASKQUEUE);
  165 }
  166 
  167 static int
  168 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  169 {
  170         struct task *ins;
  171         struct task *prev;
  172 
  173         /*
  174          * Count multiple enqueues.
  175          */
  176         if (task->ta_pending) {
  177                 if (task->ta_pending < USHRT_MAX)
  178                         task->ta_pending++;
  179                 return (0);
  180         }
  181 
  182         /*
  183          * Optimise the case when all tasks have the same priority.
  184          */
  185         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  186         if (!prev || prev->ta_priority >= task->ta_priority) {
  187                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  188         } else {
  189                 prev = NULL;
  190                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  191                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  192                         if (ins->ta_priority < task->ta_priority)
  193                                 break;
  194 
  195                 if (prev)
  196                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  197                 else
  198                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  199         }
  200 
  201         task->ta_pending = 1;
  202         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  203                 queue->tq_enqueue(queue->tq_context);
  204         else
  205                 queue->tq_flags |= TQ_FLAGS_PENDING;
  206 
  207         return (0);
  208 }
  209 int
  210 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  211 {
  212         int res;
  213 
  214         TQ_LOCK(queue);
  215         res = taskqueue_enqueue_locked(queue, task);
  216         TQ_UNLOCK(queue);
  217 
  218         return (res);
  219 }
  220 
  221 static void
  222 taskqueue_timeout_func(void *arg)
  223 {
  224         struct taskqueue *queue;
  225         struct timeout_task *timeout_task;
  226 
  227         timeout_task = arg;
  228         queue = timeout_task->q;
  229         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  230         timeout_task->f &= ~DT_CALLOUT_ARMED;
  231         queue->tq_callouts--;
  232         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  233 }
  234 
  235 int
  236 taskqueue_enqueue_timeout(struct taskqueue *queue,
  237     struct timeout_task *timeout_task, int ticks)
  238 {
  239         int res;
  240 
  241         TQ_LOCK(queue);
  242         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  243             ("Migrated queue"));
  244         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  245         timeout_task->q = queue;
  246         res = timeout_task->t.ta_pending;
  247         if (ticks == 0) {
  248                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  249         } else {
  250                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  251                         res++;
  252                 } else {
  253                         queue->tq_callouts++;
  254                         timeout_task->f |= DT_CALLOUT_ARMED;
  255                 }
  256                 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
  257                     timeout_task);
  258         }
  259         TQ_UNLOCK(queue);
  260         return (res);
  261 }
  262 
  263 void
  264 taskqueue_block(struct taskqueue *queue)
  265 {
  266 
  267         TQ_LOCK(queue);
  268         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  269         TQ_UNLOCK(queue);
  270 }
  271 
  272 void
  273 taskqueue_unblock(struct taskqueue *queue)
  274 {
  275 
  276         TQ_LOCK(queue);
  277         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  278         if (queue->tq_flags & TQ_FLAGS_PENDING) {
  279                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
  280                 queue->tq_enqueue(queue->tq_context);
  281         }
  282         TQ_UNLOCK(queue);
  283 }
  284 
  285 static void
  286 taskqueue_run_locked(struct taskqueue *queue)
  287 {
  288         struct taskqueue_busy tb;
  289         struct task *task;
  290         int pending;
  291 
  292         mtx_assert(&queue->tq_mutex, MA_OWNED);
  293         tb.tb_running = NULL;
  294         TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  295 
  296         while (STAILQ_FIRST(&queue->tq_queue)) {
  297                 /*
  298                  * Carefully remove the first task from the queue and
  299                  * zero its pending count.
  300                  */
  301                 task = STAILQ_FIRST(&queue->tq_queue);
  302                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  303                 pending = task->ta_pending;
  304                 task->ta_pending = 0;
  305                 tb.tb_running = task;
  306                 TQ_UNLOCK(queue);
  307 
  308                 task->ta_func(task->ta_context, pending);
  309 
  310                 TQ_LOCK(queue);
  311                 tb.tb_running = NULL;
  312                 wakeup(task);
  313         }
  314         TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  315 }
  316 
  317 void
  318 taskqueue_run(struct taskqueue *queue)
  319 {
  320 
  321         TQ_LOCK(queue);
  322         taskqueue_run_locked(queue);
  323         TQ_UNLOCK(queue);
  324 }
  325 
  326 static int
  327 task_is_running(struct taskqueue *queue, struct task *task)
  328 {
  329         struct taskqueue_busy *tb;
  330 
  331         mtx_assert(&queue->tq_mutex, MA_OWNED);
  332         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  333                 if (tb->tb_running == task)
  334                         return (1);
  335         }
  336         return (0);
  337 }
  338 
  339 static int
  340 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  341     u_int *pendp)
  342 {
  343 
  344         if (task->ta_pending > 0)
  345                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  346         if (pendp != NULL)
  347                 *pendp = task->ta_pending;
  348         task->ta_pending = 0;
  349         return (task_is_running(queue, task) ? EBUSY : 0);
  350 }
  351 
  352 int
  353 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  354 {
  355         u_int pending;
  356         int error;
  357 
  358         TQ_LOCK(queue);
  359         pending = task->ta_pending;
  360         error = taskqueue_cancel_locked(queue, task, pendp);
  361         TQ_UNLOCK(queue);
  362 
  363         return (error);
  364 }
  365 
  366 int
  367 taskqueue_cancel_timeout(struct taskqueue *queue,
  368     struct timeout_task *timeout_task, u_int *pendp)
  369 {
  370         u_int pending, pending1;
  371         int error;
  372 
  373         TQ_LOCK(queue);
  374         pending = !!callout_stop(&timeout_task->c);
  375         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  376         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  377                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  378                 queue->tq_callouts--;
  379         }
  380         TQ_UNLOCK(queue);
  381 
  382         if (pendp != NULL)
  383                 *pendp = pending + pending1;
  384         return (error);
  385 }
  386 
  387 void
  388 taskqueue_drain(struct taskqueue *queue, struct task *task)
  389 {
  390 
  391         if (!queue->tq_spin)
  392                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  393 
  394         TQ_LOCK(queue);
  395         while (task->ta_pending != 0 || task_is_running(queue, task))
  396                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  397         TQ_UNLOCK(queue);
  398 }
  399 
  400 void
  401 taskqueue_drain_timeout(struct taskqueue *queue,
  402     struct timeout_task *timeout_task)
  403 {
  404 
  405         callout_drain(&timeout_task->c);
  406         taskqueue_drain(queue, &timeout_task->t);
  407 }
  408 
  409 static void
  410 taskqueue_swi_enqueue(void *context)
  411 {
  412         swi_sched(taskqueue_ih, 0);
  413 }
  414 
  415 static void
  416 taskqueue_swi_run(void *dummy)
  417 {
  418         taskqueue_run(taskqueue_swi);
  419 }
  420 
  421 static void
  422 taskqueue_swi_giant_enqueue(void *context)
  423 {
  424         swi_sched(taskqueue_giant_ih, 0);
  425 }
  426 
  427 static void
  428 taskqueue_swi_giant_run(void *dummy)
  429 {
  430         taskqueue_run(taskqueue_swi_giant);
  431 }
  432 
  433 int
  434 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  435                         const char *name, ...)
  436 {
  437         va_list ap;
  438         struct thread *td;
  439         struct taskqueue *tq;
  440         int i, error;
  441         char ktname[MAXCOMLEN + 1];
  442 
  443         if (count <= 0)
  444                 return (EINVAL);
  445 
  446         tq = *tqp;
  447 
  448         va_start(ap, name);
  449         vsnprintf(ktname, sizeof(ktname), name, ap);
  450         va_end(ap);
  451 
  452         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  453             M_NOWAIT | M_ZERO);
  454         if (tq->tq_threads == NULL) {
  455                 printf("%s: no memory for %s threads\n", __func__, ktname);
  456                 return (ENOMEM);
  457         }
  458 
  459         for (i = 0; i < count; i++) {
  460                 if (count == 1)
  461                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  462                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  463                 else
  464                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  465                             &tq->tq_threads[i], RFSTOPPED, 0,
  466                             "%s_%d", ktname, i);
  467                 if (error) {
  468                         /* should be ok to continue, taskqueue_free will dtrt */
  469                         printf("%s: kthread_add(%s): error %d", __func__,
  470                             ktname, error);
  471                         tq->tq_threads[i] = NULL;               /* paranoid */
  472                 } else
  473                         tq->tq_tcount++;
  474         }
  475         for (i = 0; i < count; i++) {
  476                 if (tq->tq_threads[i] == NULL)
  477                         continue;
  478                 td = tq->tq_threads[i];
  479                 thread_lock(td);
  480                 sched_prio(td, pri);
  481                 sched_add(td, SRQ_BORING);
  482                 thread_unlock(td);
  483         }
  484 
  485         return (0);
  486 }
  487 
  488 void
  489 taskqueue_thread_loop(void *arg)
  490 {
  491         struct taskqueue **tqp, *tq;
  492 
  493         tqp = arg;
  494         tq = *tqp;
  495         TQ_LOCK(tq);
  496         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  497                 taskqueue_run_locked(tq);
  498                 /*
  499                  * Because taskqueue_run() can drop tq_mutex, we need to
  500                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  501                  * meantime, which means we missed a wakeup.
  502                  */
  503                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  504                         break;
  505                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  506         }
  507         taskqueue_run_locked(tq);
  508 
  509         /* rendezvous with thread that asked us to terminate */
  510         tq->tq_tcount--;
  511         wakeup_one(tq->tq_threads);
  512         TQ_UNLOCK(tq);
  513         kthread_exit();
  514 }
  515 
  516 void
  517 taskqueue_thread_enqueue(void *context)
  518 {
  519         struct taskqueue **tqp, *tq;
  520 
  521         tqp = context;
  522         tq = *tqp;
  523 
  524         mtx_assert(&tq->tq_mutex, MA_OWNED);
  525         wakeup_one(tq);
  526 }
  527 
  528 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  529                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  530                      INTR_MPSAFE, &taskqueue_ih)); 
  531 
  532 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  533                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  534                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 
  535 
  536 TASKQUEUE_DEFINE_THREAD(thread);
  537 
  538 struct taskqueue *
  539 taskqueue_create_fast(const char *name, int mflags,
  540                  taskqueue_enqueue_fn enqueue, void *context)
  541 {
  542         return _taskqueue_create(name, mflags, enqueue, context,
  543                         MTX_SPIN, "fast_taskqueue");
  544 }
  545 
  546 /* NB: for backwards compatibility */
  547 int
  548 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
  549 {
  550         return taskqueue_enqueue(queue, task);
  551 }
  552 
  553 static void     *taskqueue_fast_ih;
  554 
  555 static void
  556 taskqueue_fast_enqueue(void *context)
  557 {
  558         swi_sched(taskqueue_fast_ih, 0);
  559 }
  560 
  561 static void
  562 taskqueue_fast_run(void *dummy)
  563 {
  564         taskqueue_run(taskqueue_fast);
  565 }
  566 
  567 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  568         swi_add(NULL, "Fast task queue", taskqueue_fast_run, NULL,
  569         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  570 
  571 int
  572 taskqueue_member(struct taskqueue *queue, struct thread *td)
  573 {
  574         int i, j, ret = 0;
  575 
  576         TQ_LOCK(queue);
  577         for (i = 0, j = 0; ; i++) {
  578                 if (queue->tq_threads[i] == NULL)
  579                         continue;
  580                 if (queue->tq_threads[i] == td) {
  581                         ret = 1;
  582                         break;
  583                 }
  584                 if (++j >= queue->tq_tcount)
  585                         break;
  586         }
  587         TQ_UNLOCK(queue);
  588         return (ret);
  589 }

Cache object: a2568d13f77fa8eafd6748c6a1534835


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.