The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD: releng/8.4/sys/kern/subr_taskqueue.c 241113 2012-10-01 19:43:37Z jhb $");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/interrupt.h>
   34 #include <sys/kernel.h>
   35 #include <sys/kthread.h>
   36 #include <sys/limits.h>
   37 #include <sys/lock.h>
   38 #include <sys/malloc.h>
   39 #include <sys/mutex.h>
   40 #include <sys/proc.h>
   41 #include <sys/sched.h>
   42 #include <sys/taskqueue.h>
   43 #include <sys/unistd.h>
   44 #include <machine/stdarg.h>
   45 
   46 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   47 static void     *taskqueue_giant_ih;
   48 static void     *taskqueue_ih;
   49 
   50 struct taskqueue_busy {
   51         struct task     *tb_running;
   52         TAILQ_ENTRY(taskqueue_busy) tb_link;
   53 };
   54 
   55 struct taskqueue {
   56         STAILQ_HEAD(, task)     tq_queue;
   57         taskqueue_enqueue_fn    tq_enqueue;
   58         void                    *tq_context;
   59         TAILQ_HEAD(, taskqueue_busy) tq_active;
   60         struct mtx              tq_mutex;
   61         struct thread           **tq_threads;
   62         int                     tq_tcount;
   63         int                     tq_spin;
   64         int                     tq_flags;
   65 };
   66 
   67 #define TQ_FLAGS_ACTIVE         (1 << 0)
   68 #define TQ_FLAGS_BLOCKED        (1 << 1)
   69 #define TQ_FLAGS_PENDING        (1 << 2)
   70 
   71 static void     taskqueue_run_locked(struct taskqueue *);
   72 
   73 static __inline void
   74 TQ_LOCK(struct taskqueue *tq)
   75 {
   76         if (tq->tq_spin)
   77                 mtx_lock_spin(&tq->tq_mutex);
   78         else
   79                 mtx_lock(&tq->tq_mutex);
   80 }
   81 
   82 static __inline void
   83 TQ_UNLOCK(struct taskqueue *tq)
   84 {
   85         if (tq->tq_spin)
   86                 mtx_unlock_spin(&tq->tq_mutex);
   87         else
   88                 mtx_unlock(&tq->tq_mutex);
   89 }
   90 
   91 static __inline int
   92 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
   93     int t)
   94 {
   95         if (tq->tq_spin)
   96                 return (msleep_spin(p, m, wm, t));
   97         return (msleep(p, m, pri, wm, t));
   98 }
   99 
  100 static struct taskqueue *
  101 _taskqueue_create(const char *name __unused, int mflags,
  102                  taskqueue_enqueue_fn enqueue, void *context,
  103                  int mtxflags, const char *mtxname)
  104 {
  105         struct taskqueue *queue;
  106 
  107         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  108         if (!queue)
  109                 return NULL;
  110 
  111         STAILQ_INIT(&queue->tq_queue);
  112         TAILQ_INIT(&queue->tq_active);
  113         queue->tq_enqueue = enqueue;
  114         queue->tq_context = context;
  115         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  116         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  117         mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
  118 
  119         return queue;
  120 }
  121 
  122 struct taskqueue *
  123 taskqueue_create(const char *name, int mflags,
  124                  taskqueue_enqueue_fn enqueue, void *context)
  125 {
  126         return _taskqueue_create(name, mflags, enqueue, context,
  127                         MTX_DEF, "taskqueue");
  128 }
  129 
  130 /*
  131  * Signal a taskqueue thread to terminate.
  132  */
  133 static void
  134 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  135 {
  136 
  137         while (tq->tq_tcount > 0) {
  138                 wakeup(tq);
  139                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  140         }
  141 }
  142 
  143 void
  144 taskqueue_free(struct taskqueue *queue)
  145 {
  146 
  147         TQ_LOCK(queue);
  148         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  149         taskqueue_run_locked(queue);
  150         taskqueue_terminate(queue->tq_threads, queue);
  151         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  152         mtx_destroy(&queue->tq_mutex);
  153         free(queue->tq_threads, M_TASKQUEUE);
  154         free(queue, M_TASKQUEUE);
  155 }
  156 
  157 int
  158 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  159 {
  160         struct task *ins;
  161         struct task *prev;
  162 
  163         TQ_LOCK(queue);
  164 
  165         /*
  166          * Count multiple enqueues.
  167          */
  168         if (task->ta_pending) {
  169                 if (task->ta_pending < USHRT_MAX)
  170                         task->ta_pending++;
  171                 TQ_UNLOCK(queue);
  172                 return 0;
  173         }
  174 
  175         /*
  176          * Optimise the case when all tasks have the same priority.
  177          */
  178         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  179         if (!prev || prev->ta_priority >= task->ta_priority) {
  180                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  181         } else {
  182                 prev = NULL;
  183                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  184                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  185                         if (ins->ta_priority < task->ta_priority)
  186                                 break;
  187 
  188                 if (prev)
  189                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  190                 else
  191                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  192         }
  193 
  194         task->ta_pending = 1;
  195         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  196                 queue->tq_enqueue(queue->tq_context);
  197         else
  198                 queue->tq_flags |= TQ_FLAGS_PENDING;
  199 
  200         TQ_UNLOCK(queue);
  201 
  202         return 0;
  203 }
  204 
  205 void
  206 taskqueue_block(struct taskqueue *queue)
  207 {
  208 
  209         TQ_LOCK(queue);
  210         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  211         TQ_UNLOCK(queue);
  212 }
  213 
  214 void
  215 taskqueue_unblock(struct taskqueue *queue)
  216 {
  217 
  218         TQ_LOCK(queue);
  219         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  220         if (queue->tq_flags & TQ_FLAGS_PENDING) {
  221                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
  222                 queue->tq_enqueue(queue->tq_context);
  223         }
  224         TQ_UNLOCK(queue);
  225 }
  226 
  227 static void
  228 taskqueue_run_locked(struct taskqueue *queue)
  229 {
  230         struct taskqueue_busy tb;
  231         struct task *task;
  232         int pending;
  233 
  234         mtx_assert(&queue->tq_mutex, MA_OWNED);
  235         tb.tb_running = NULL;
  236         TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  237 
  238         while (STAILQ_FIRST(&queue->tq_queue)) {
  239                 /*
  240                  * Carefully remove the first task from the queue and
  241                  * zero its pending count.
  242                  */
  243                 task = STAILQ_FIRST(&queue->tq_queue);
  244                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  245                 pending = task->ta_pending;
  246                 task->ta_pending = 0;
  247                 tb.tb_running = task;
  248                 TQ_UNLOCK(queue);
  249 
  250                 task->ta_func(task->ta_context, pending);
  251 
  252                 TQ_LOCK(queue);
  253                 tb.tb_running = NULL;
  254                 wakeup(task);
  255         }
  256         TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  257 }
  258 
  259 void
  260 taskqueue_run(struct taskqueue *queue)
  261 {
  262 
  263         TQ_LOCK(queue);
  264         taskqueue_run_locked(queue);
  265         TQ_UNLOCK(queue);
  266 }
  267 
  268 static int
  269 task_is_running(struct taskqueue *queue, struct task *task)
  270 {
  271         struct taskqueue_busy *tb;
  272 
  273         mtx_assert(&queue->tq_mutex, MA_OWNED);
  274         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  275                 if (tb->tb_running == task)
  276                         return (1);
  277         }
  278         return (0);
  279 }
  280 
  281 void
  282 taskqueue_drain(struct taskqueue *queue, struct task *task)
  283 {
  284         if (queue->tq_spin) {           /* XXX */
  285                 mtx_lock_spin(&queue->tq_mutex);
  286                 while (task->ta_pending != 0 || task_is_running(queue, task))
  287                         msleep_spin(task, &queue->tq_mutex, "-", 0);
  288                 mtx_unlock_spin(&queue->tq_mutex);
  289         } else {
  290                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  291 
  292                 mtx_lock(&queue->tq_mutex);
  293                 while (task->ta_pending != 0 || task_is_running(queue, task))
  294                         msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
  295                 mtx_unlock(&queue->tq_mutex);
  296         }
  297 }
  298 
  299 static void
  300 taskqueue_swi_enqueue(void *context)
  301 {
  302         swi_sched(taskqueue_ih, 0);
  303 }
  304 
  305 static void
  306 taskqueue_swi_run(void *dummy)
  307 {
  308         taskqueue_run(taskqueue_swi);
  309 }
  310 
  311 static void
  312 taskqueue_swi_giant_enqueue(void *context)
  313 {
  314         swi_sched(taskqueue_giant_ih, 0);
  315 }
  316 
  317 static void
  318 taskqueue_swi_giant_run(void *dummy)
  319 {
  320         taskqueue_run(taskqueue_swi_giant);
  321 }
  322 
  323 int
  324 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  325                         const char *name, ...)
  326 {
  327         va_list ap;
  328         struct thread *td;
  329         struct taskqueue *tq;
  330         int i, error;
  331         char ktname[MAXCOMLEN + 1];
  332 
  333         if (count <= 0)
  334                 return (EINVAL);
  335 
  336         tq = *tqp;
  337 
  338         va_start(ap, name);
  339         vsnprintf(ktname, sizeof(ktname), name, ap);
  340         va_end(ap);
  341 
  342         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  343             M_NOWAIT | M_ZERO);
  344         if (tq->tq_threads == NULL) {
  345                 printf("%s: no memory for %s threads\n", __func__, ktname);
  346                 return (ENOMEM);
  347         }
  348 
  349         for (i = 0; i < count; i++) {
  350                 if (count == 1)
  351                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  352                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  353                 else
  354                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  355                             &tq->tq_threads[i], RFSTOPPED, 0,
  356                             "%s_%d", ktname, i);
  357                 if (error) {
  358                         /* should be ok to continue, taskqueue_free will dtrt */
  359                         printf("%s: kthread_add(%s): error %d", __func__,
  360                             ktname, error);
  361                         tq->tq_threads[i] = NULL;               /* paranoid */
  362                 } else
  363                         tq->tq_tcount++;
  364         }
  365         for (i = 0; i < count; i++) {
  366                 if (tq->tq_threads[i] == NULL)
  367                         continue;
  368                 td = tq->tq_threads[i];
  369                 thread_lock(td);
  370                 sched_prio(td, pri);
  371                 sched_add(td, SRQ_BORING);
  372                 thread_unlock(td);
  373         }
  374 
  375         return (0);
  376 }
  377 
  378 void
  379 taskqueue_thread_loop(void *arg)
  380 {
  381         struct taskqueue **tqp, *tq;
  382 
  383         tqp = arg;
  384         tq = *tqp;
  385         TQ_LOCK(tq);
  386         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  387                 taskqueue_run_locked(tq);
  388                 /*
  389                  * Because taskqueue_run() can drop tq_mutex, we need to
  390                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  391                  * meantime, which means we missed a wakeup.
  392                  */
  393                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  394                         break;
  395                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  396         }
  397 
  398         /* rendezvous with thread that asked us to terminate */
  399         tq->tq_tcount--;
  400         wakeup_one(tq->tq_threads);
  401         TQ_UNLOCK(tq);
  402         kthread_exit();
  403 }
  404 
  405 void
  406 taskqueue_thread_enqueue(void *context)
  407 {
  408         struct taskqueue **tqp, *tq;
  409 
  410         tqp = context;
  411         tq = *tqp;
  412 
  413         mtx_assert(&tq->tq_mutex, MA_OWNED);
  414         wakeup_one(tq);
  415 }
  416 
  417 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  418                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  419                      INTR_MPSAFE, &taskqueue_ih)); 
  420 
  421 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  422                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  423                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 
  424 
  425 TASKQUEUE_DEFINE_THREAD(thread);
  426 
  427 struct taskqueue *
  428 taskqueue_create_fast(const char *name, int mflags,
  429                  taskqueue_enqueue_fn enqueue, void *context)
  430 {
  431         return _taskqueue_create(name, mflags, enqueue, context,
  432                         MTX_SPIN, "fast_taskqueue");
  433 }
  434 
  435 /* NB: for backwards compatibility */
  436 int
  437 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
  438 {
  439         return taskqueue_enqueue(queue, task);
  440 }
  441 
  442 static void     *taskqueue_fast_ih;
  443 
  444 static void
  445 taskqueue_fast_enqueue(void *context)
  446 {
  447         swi_sched(taskqueue_fast_ih, 0);
  448 }
  449 
  450 static void
  451 taskqueue_fast_run(void *dummy)
  452 {
  453         taskqueue_run(taskqueue_fast);
  454 }
  455 
  456 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  457         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  458         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  459 
  460 int
  461 taskqueue_member(struct taskqueue *queue, struct thread *td)
  462 {
  463         int i, j, ret = 0;
  464 
  465         TQ_LOCK(queue);
  466         for (i = 0, j = 0; ; i++) {
  467                 if (queue->tq_threads[i] == NULL)
  468                         continue;
  469                 if (queue->tq_threads[i] == td) {
  470                         ret = 1;
  471                         break;
  472                 }
  473                 if (++j >= queue->tq_tcount)
  474                         break;
  475         }
  476         TQ_UNLOCK(queue);
  477         return (ret);
  478 }

Cache object: 42e6cd3e8b0bb24bfe74e8ab6b741fdb


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.