The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD$");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/interrupt.h>
   34 #include <sys/kernel.h>
   35 #include <sys/kthread.h>
   36 #include <sys/limits.h>
   37 #include <sys/lock.h>
   38 #include <sys/malloc.h>
   39 #include <sys/mutex.h>
   40 #include <sys/proc.h>
   41 #include <sys/sched.h>
   42 #include <sys/taskqueue.h>
   43 #include <sys/unistd.h>
   44 #include <machine/stdarg.h>
   45 
   46 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   47 static void     *taskqueue_giant_ih;
   48 static void     *taskqueue_ih;
   49 
   50 struct taskqueue_busy {
   51         struct task     *tb_running;
   52         TAILQ_ENTRY(taskqueue_busy) tb_link;
   53 };
   54 
   55 struct taskqueue {
   56         STAILQ_HEAD(, task)     tq_queue;
   57         taskqueue_enqueue_fn    tq_enqueue;
   58         void                    *tq_context;
   59         TAILQ_HEAD(, taskqueue_busy) tq_active;
   60         struct mtx              tq_mutex;
   61         struct thread           **tq_threads;
   62         int                     tq_tcount;
   63         int                     tq_spin;
   64         int                     tq_flags;
   65 };
   66 
   67 #define TQ_FLAGS_ACTIVE         (1 << 0)
   68 #define TQ_FLAGS_BLOCKED        (1 << 1)
   69 #define TQ_FLAGS_PENDING        (1 << 2)
   70 
   71 static void     taskqueue_run_locked(struct taskqueue *);
   72 
   73 static __inline void
   74 TQ_LOCK(struct taskqueue *tq)
   75 {
   76         if (tq->tq_spin)
   77                 mtx_lock_spin(&tq->tq_mutex);
   78         else
   79                 mtx_lock(&tq->tq_mutex);
   80 }
   81 
   82 static __inline void
   83 TQ_UNLOCK(struct taskqueue *tq)
   84 {
   85         if (tq->tq_spin)
   86                 mtx_unlock_spin(&tq->tq_mutex);
   87         else
   88                 mtx_unlock(&tq->tq_mutex);
   89 }
   90 
   91 static __inline int
   92 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
   93     int t)
   94 {
   95         if (tq->tq_spin)
   96                 return (msleep_spin(p, m, wm, t));
   97         return (msleep(p, m, pri, wm, t));
   98 }
   99 
  100 static struct taskqueue *
  101 _taskqueue_create(const char *name __unused, int mflags,
  102                  taskqueue_enqueue_fn enqueue, void *context,
  103                  int mtxflags, const char *mtxname)
  104 {
  105         struct taskqueue *queue;
  106 
  107         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  108         if (!queue)
  109                 return NULL;
  110 
  111         STAILQ_INIT(&queue->tq_queue);
  112         TAILQ_INIT(&queue->tq_active);
  113         queue->tq_enqueue = enqueue;
  114         queue->tq_context = context;
  115         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  116         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  117         mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
  118 
  119         return queue;
  120 }
  121 
  122 struct taskqueue *
  123 taskqueue_create(const char *name, int mflags,
  124                  taskqueue_enqueue_fn enqueue, void *context)
  125 {
  126         return _taskqueue_create(name, mflags, enqueue, context,
  127                         MTX_DEF, "taskqueue");
  128 }
  129 
  130 /*
  131  * Signal a taskqueue thread to terminate.
  132  */
  133 static void
  134 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  135 {
  136 
  137         while (tq->tq_tcount > 0) {
  138                 wakeup(tq);
  139                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  140         }
  141 }
  142 
  143 void
  144 taskqueue_free(struct taskqueue *queue)
  145 {
  146 
  147         TQ_LOCK(queue);
  148         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  149         taskqueue_run_locked(queue);
  150         taskqueue_terminate(queue->tq_threads, queue);
  151         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  152         mtx_destroy(&queue->tq_mutex);
  153         free(queue->tq_threads, M_TASKQUEUE);
  154         free(queue, M_TASKQUEUE);
  155 }
  156 
  157 int
  158 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  159 {
  160         struct task *ins;
  161         struct task *prev;
  162 
  163         TQ_LOCK(queue);
  164 
  165         /*
  166          * Count multiple enqueues.
  167          */
  168         if (task->ta_pending) {
  169                 if (task->ta_pending < USHRT_MAX)
  170                         task->ta_pending++;
  171                 TQ_UNLOCK(queue);
  172                 return 0;
  173         }
  174 
  175         /*
  176          * Optimise the case when all tasks have the same priority.
  177          */
  178         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  179         if (!prev || prev->ta_priority >= task->ta_priority) {
  180                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  181         } else {
  182                 prev = NULL;
  183                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  184                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  185                         if (ins->ta_priority < task->ta_priority)
  186                                 break;
  187 
  188                 if (prev)
  189                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  190                 else
  191                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  192         }
  193 
  194         task->ta_pending = 1;
  195         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  196                 queue->tq_enqueue(queue->tq_context);
  197         else
  198                 queue->tq_flags |= TQ_FLAGS_PENDING;
  199 
  200         TQ_UNLOCK(queue);
  201 
  202         return 0;
  203 }
  204 
  205 static void
  206 taskqueue_drain_running(struct taskqueue *queue)
  207 {
  208 
  209         while (!TAILQ_EMPTY(&queue->tq_active))
  210                 TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex,
  211                     PWAIT, "-", 0);
  212 }
  213 
  214 void
  215 taskqueue_block(struct taskqueue *queue)
  216 {
  217 
  218         TQ_LOCK(queue);
  219         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  220         TQ_UNLOCK(queue);
  221 }
  222 
  223 void
  224 taskqueue_unblock(struct taskqueue *queue)
  225 {
  226 
  227         TQ_LOCK(queue);
  228         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  229         if (queue->tq_flags & TQ_FLAGS_PENDING) {
  230                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
  231                 queue->tq_enqueue(queue->tq_context);
  232         }
  233         TQ_UNLOCK(queue);
  234 }
  235 
  236 static void
  237 taskqueue_run_locked(struct taskqueue *queue)
  238 {
  239         struct taskqueue_busy tb;
  240         struct task *task;
  241         int pending;
  242 
  243         mtx_assert(&queue->tq_mutex, MA_OWNED);
  244         tb.tb_running = NULL;
  245         TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  246 
  247         while (STAILQ_FIRST(&queue->tq_queue)) {
  248                 /*
  249                  * Carefully remove the first task from the queue and
  250                  * zero its pending count.
  251                  */
  252                 task = STAILQ_FIRST(&queue->tq_queue);
  253                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  254                 pending = task->ta_pending;
  255                 task->ta_pending = 0;
  256                 tb.tb_running = task;
  257                 TQ_UNLOCK(queue);
  258 
  259                 task->ta_func(task->ta_context, pending);
  260 
  261                 TQ_LOCK(queue);
  262                 tb.tb_running = NULL;
  263                 wakeup(task);
  264         }
  265         TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  266         if (TAILQ_EMPTY(&queue->tq_active))
  267                 wakeup(&queue->tq_active);
  268 }
  269 
  270 void
  271 taskqueue_run(struct taskqueue *queue)
  272 {
  273 
  274         TQ_LOCK(queue);
  275         taskqueue_run_locked(queue);
  276         TQ_UNLOCK(queue);
  277 }
  278 
  279 static int
  280 task_is_running(struct taskqueue *queue, struct task *task)
  281 {
  282         struct taskqueue_busy *tb;
  283 
  284         mtx_assert(&queue->tq_mutex, MA_OWNED);
  285         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  286                 if (tb->tb_running == task)
  287                         return (1);
  288         }
  289         return (0);
  290 }
  291 
  292 void
  293 taskqueue_drain(struct taskqueue *queue, struct task *task)
  294 {
  295         if (queue->tq_spin) {           /* XXX */
  296                 mtx_lock_spin(&queue->tq_mutex);
  297                 while (task->ta_pending != 0 || task_is_running(queue, task))
  298                         msleep_spin(task, &queue->tq_mutex, "-", 0);
  299                 mtx_unlock_spin(&queue->tq_mutex);
  300         } else {
  301                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  302 
  303                 mtx_lock(&queue->tq_mutex);
  304                 while (task->ta_pending != 0 || task_is_running(queue, task))
  305                         msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
  306                 mtx_unlock(&queue->tq_mutex);
  307         }
  308 }
  309 
  310 void
  311 taskqueue_drain_all(struct taskqueue *queue)
  312 {
  313         struct task *task;
  314 
  315         if (!queue->tq_spin)
  316                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  317 
  318         TQ_LOCK(queue);
  319         task = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  320         if (task != NULL)
  321                 while (task->ta_pending != 0)
  322                         TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  323         taskqueue_drain_running(queue);
  324         KASSERT(STAILQ_EMPTY(&queue->tq_queue),
  325             ("taskqueue queue is not empty after draining"));
  326         TQ_UNLOCK(queue);
  327 }
  328 
  329 static void
  330 taskqueue_swi_enqueue(void *context)
  331 {
  332         swi_sched(taskqueue_ih, 0);
  333 }
  334 
  335 static void
  336 taskqueue_swi_run(void *dummy)
  337 {
  338         taskqueue_run(taskqueue_swi);
  339 }
  340 
  341 static void
  342 taskqueue_swi_giant_enqueue(void *context)
  343 {
  344         swi_sched(taskqueue_giant_ih, 0);
  345 }
  346 
  347 static void
  348 taskqueue_swi_giant_run(void *dummy)
  349 {
  350         taskqueue_run(taskqueue_swi_giant);
  351 }
  352 
  353 int
  354 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  355                         const char *name, ...)
  356 {
  357         va_list ap;
  358         struct thread *td;
  359         struct taskqueue *tq;
  360         int i, error;
  361         char ktname[MAXCOMLEN + 1];
  362 
  363         if (count <= 0)
  364                 return (EINVAL);
  365 
  366         tq = *tqp;
  367 
  368         va_start(ap, name);
  369         vsnprintf(ktname, sizeof(ktname), name, ap);
  370         va_end(ap);
  371 
  372         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  373             M_NOWAIT | M_ZERO);
  374         if (tq->tq_threads == NULL) {
  375                 printf("%s: no memory for %s threads\n", __func__, ktname);
  376                 return (ENOMEM);
  377         }
  378 
  379         for (i = 0; i < count; i++) {
  380                 if (count == 1)
  381                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  382                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  383                 else
  384                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  385                             &tq->tq_threads[i], RFSTOPPED, 0,
  386                             "%s_%d", ktname, i);
  387                 if (error) {
  388                         /* should be ok to continue, taskqueue_free will dtrt */
  389                         printf("%s: kthread_add(%s): error %d", __func__,
  390                             ktname, error);
  391                         tq->tq_threads[i] = NULL;               /* paranoid */
  392                 } else
  393                         tq->tq_tcount++;
  394         }
  395         for (i = 0; i < count; i++) {
  396                 if (tq->tq_threads[i] == NULL)
  397                         continue;
  398                 td = tq->tq_threads[i];
  399                 thread_lock(td);
  400                 sched_prio(td, pri);
  401                 sched_add(td, SRQ_BORING);
  402                 thread_unlock(td);
  403         }
  404 
  405         return (0);
  406 }
  407 
  408 void
  409 taskqueue_thread_loop(void *arg)
  410 {
  411         struct taskqueue **tqp, *tq;
  412 
  413         tqp = arg;
  414         tq = *tqp;
  415         TQ_LOCK(tq);
  416         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  417                 taskqueue_run_locked(tq);
  418                 /*
  419                  * Because taskqueue_run() can drop tq_mutex, we need to
  420                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  421                  * meantime, which means we missed a wakeup.
  422                  */
  423                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  424                         break;
  425                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  426         }
  427 
  428         /* rendezvous with thread that asked us to terminate */
  429         tq->tq_tcount--;
  430         wakeup_one(tq->tq_threads);
  431         TQ_UNLOCK(tq);
  432         kthread_exit();
  433 }
  434 
  435 void
  436 taskqueue_thread_enqueue(void *context)
  437 {
  438         struct taskqueue **tqp, *tq;
  439 
  440         tqp = context;
  441         tq = *tqp;
  442 
  443         mtx_assert(&tq->tq_mutex, MA_OWNED);
  444         wakeup_one(tq);
  445 }
  446 
  447 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  448                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  449                      INTR_MPSAFE, &taskqueue_ih)); 
  450 
  451 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  452                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  453                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 
  454 
  455 TASKQUEUE_DEFINE_THREAD(thread);
  456 
  457 struct taskqueue *
  458 taskqueue_create_fast(const char *name, int mflags,
  459                  taskqueue_enqueue_fn enqueue, void *context)
  460 {
  461         return _taskqueue_create(name, mflags, enqueue, context,
  462                         MTX_SPIN, "fast_taskqueue");
  463 }
  464 
  465 /* NB: for backwards compatibility */
  466 int
  467 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
  468 {
  469         return taskqueue_enqueue(queue, task);
  470 }
  471 
  472 static void     *taskqueue_fast_ih;
  473 
  474 static void
  475 taskqueue_fast_enqueue(void *context)
  476 {
  477         swi_sched(taskqueue_fast_ih, 0);
  478 }
  479 
  480 static void
  481 taskqueue_fast_run(void *dummy)
  482 {
  483         taskqueue_run(taskqueue_fast);
  484 }
  485 
  486 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  487         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  488         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  489 
  490 int
  491 taskqueue_member(struct taskqueue *queue, struct thread *td)
  492 {
  493         int i, j, ret = 0;
  494 
  495         TQ_LOCK(queue);
  496         for (i = 0, j = 0; ; i++) {
  497                 if (queue->tq_threads[i] == NULL)
  498                         continue;
  499                 if (queue->tq_threads[i] == td) {
  500                         ret = 1;
  501                         break;
  502                 }
  503                 if (++j >= queue->tq_tcount)
  504                         break;
  505         }
  506         TQ_UNLOCK(queue);
  507         return (ret);
  508 }

Cache object: 3aa5eb3c5d66154f956d773a19231dc0


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.