The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD: releng/11.2/sys/kern/subr_taskqueue.c 328392 2018-01-25 07:27:03Z pkelsey $");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/cpuset.h>
   34 #include <sys/interrupt.h>
   35 #include <sys/kernel.h>
   36 #include <sys/kthread.h>
   37 #include <sys/libkern.h>
   38 #include <sys/limits.h>
   39 #include <sys/lock.h>
   40 #include <sys/malloc.h>
   41 #include <sys/mutex.h>
   42 #include <sys/proc.h>
   43 #include <sys/sched.h>
   44 #include <sys/smp.h>
   45 #include <sys/taskqueue.h>
   46 #include <sys/unistd.h>
   47 #include <machine/stdarg.h>
   48 
   49 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   50 static void     *taskqueue_giant_ih;
   51 static void     *taskqueue_ih;
   52 static void      taskqueue_fast_enqueue(void *);
   53 static void      taskqueue_swi_enqueue(void *);
   54 static void      taskqueue_swi_giant_enqueue(void *);
   55 
   56 struct taskqueue_busy {
   57         struct task     *tb_running;
   58         TAILQ_ENTRY(taskqueue_busy) tb_link;
   59 };
   60 
   61 struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
   62 
   63 struct taskqueue {
   64         STAILQ_HEAD(, task)     tq_queue;
   65         taskqueue_enqueue_fn    tq_enqueue;
   66         void                    *tq_context;
   67         char                    *tq_name;
   68         TAILQ_HEAD(, taskqueue_busy) tq_active;
   69         struct mtx              tq_mutex;
   70         struct thread           **tq_threads;
   71         int                     tq_tcount;
   72         int                     tq_spin;
   73         int                     tq_flags;
   74         int                     tq_callouts;
   75         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   76         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   77 };
   78 
   79 #define TQ_FLAGS_ACTIVE         (1 << 0)
   80 #define TQ_FLAGS_BLOCKED        (1 << 1)
   81 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   82 
   83 #define DT_CALLOUT_ARMED        (1 << 0)
   84 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   85 
   86 #define TQ_LOCK(tq)                                                     \
   87         do {                                                            \
   88                 if ((tq)->tq_spin)                                      \
   89                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   90                 else                                                    \
   91                         mtx_lock(&(tq)->tq_mutex);                      \
   92         } while (0)
   93 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   94 
   95 #define TQ_UNLOCK(tq)                                                   \
   96         do {                                                            \
   97                 if ((tq)->tq_spin)                                      \
   98                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   99                 else                                                    \
  100                         mtx_unlock(&(tq)->tq_mutex);                    \
  101         } while (0)
  102 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  103 
  104 void
  105 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  106     int priority, task_fn_t func, void *context)
  107 {
  108 
  109         TASK_INIT(&timeout_task->t, priority, func, context);
  110         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  111             CALLOUT_RETURNUNLOCKED);
  112         timeout_task->q = queue;
  113         timeout_task->f = 0;
  114 }
  115 
  116 static __inline int
  117 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  118     int t)
  119 {
  120         if (tq->tq_spin)
  121                 return (msleep_spin(p, m, wm, t));
  122         return (msleep(p, m, pri, wm, t));
  123 }
  124 
  125 static struct taskqueue *
  126 _taskqueue_create(const char *name, int mflags,
  127                  taskqueue_enqueue_fn enqueue, void *context,
  128                  int mtxflags, const char *mtxname __unused)
  129 {
  130         struct taskqueue *queue;
  131         char *tq_name;
  132 
  133         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
  134         if (tq_name == NULL)
  135                 return (NULL);
  136 
  137         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  138         if (queue == NULL) {
  139                 free(tq_name, M_TASKQUEUE);
  140                 return (NULL);
  141         }
  142 
  143         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  144 
  145         STAILQ_INIT(&queue->tq_queue);
  146         TAILQ_INIT(&queue->tq_active);
  147         queue->tq_enqueue = enqueue;
  148         queue->tq_context = context;
  149         queue->tq_name = tq_name;
  150         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  151         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  152         if (enqueue == taskqueue_fast_enqueue ||
  153             enqueue == taskqueue_swi_enqueue ||
  154             enqueue == taskqueue_swi_giant_enqueue ||
  155             enqueue == taskqueue_thread_enqueue)
  156                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  157         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  158 
  159         return (queue);
  160 }
  161 
  162 struct taskqueue *
  163 taskqueue_create(const char *name, int mflags,
  164                  taskqueue_enqueue_fn enqueue, void *context)
  165 {
  166 
  167         return _taskqueue_create(name, mflags, enqueue, context,
  168                         MTX_DEF, name);
  169 }
  170 
  171 void
  172 taskqueue_set_callback(struct taskqueue *queue,
  173     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  174     void *context)
  175 {
  176 
  177         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  178             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  179             ("Callback type %d not valid, must be %d-%d", cb_type,
  180             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  181         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  182             ("Re-initialization of taskqueue callback?"));
  183 
  184         queue->tq_callbacks[cb_type] = callback;
  185         queue->tq_cb_contexts[cb_type] = context;
  186 }
  187 
  188 /*
  189  * Signal a taskqueue thread to terminate.
  190  */
  191 static void
  192 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  193 {
  194 
  195         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  196                 wakeup(tq);
  197                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  198         }
  199 }
  200 
  201 void
  202 taskqueue_free(struct taskqueue *queue)
  203 {
  204 
  205         TQ_LOCK(queue);
  206         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  207         taskqueue_terminate(queue->tq_threads, queue);
  208         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  209         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  210         mtx_destroy(&queue->tq_mutex);
  211         free(queue->tq_threads, M_TASKQUEUE);
  212         free(queue->tq_name, M_TASKQUEUE);
  213         free(queue, M_TASKQUEUE);
  214 }
  215 
  216 static int
  217 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  218 {
  219         struct task *ins;
  220         struct task *prev;
  221 
  222         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
  223         /*
  224          * Count multiple enqueues.
  225          */
  226         if (task->ta_pending) {
  227                 if (task->ta_pending < USHRT_MAX)
  228                         task->ta_pending++;
  229                 TQ_UNLOCK(queue);
  230                 return (0);
  231         }
  232 
  233         /*
  234          * Optimise the case when all tasks have the same priority.
  235          */
  236         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  237         if (!prev || prev->ta_priority >= task->ta_priority) {
  238                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  239         } else {
  240                 prev = NULL;
  241                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  242                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  243                         if (ins->ta_priority < task->ta_priority)
  244                                 break;
  245 
  246                 if (prev)
  247                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  248                 else
  249                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  250         }
  251 
  252         task->ta_pending = 1;
  253         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  254                 TQ_UNLOCK(queue);
  255         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  256                 queue->tq_enqueue(queue->tq_context);
  257         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  258                 TQ_UNLOCK(queue);
  259 
  260         /* Return with lock released. */
  261         return (0);
  262 }
  263 
  264 int
  265 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  266 {
  267         int res;
  268 
  269         TQ_LOCK(queue);
  270         res = taskqueue_enqueue_locked(queue, task);
  271         /* The lock is released inside. */
  272 
  273         return (res);
  274 }
  275 
  276 static void
  277 taskqueue_timeout_func(void *arg)
  278 {
  279         struct taskqueue *queue;
  280         struct timeout_task *timeout_task;
  281 
  282         timeout_task = arg;
  283         queue = timeout_task->q;
  284         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  285         timeout_task->f &= ~DT_CALLOUT_ARMED;
  286         queue->tq_callouts--;
  287         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  288         /* The lock is released inside. */
  289 }
  290 
  291 int
  292 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
  293     struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
  294 {
  295         int res;
  296 
  297         TQ_LOCK(queue);
  298         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  299             ("Migrated queue"));
  300         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  301         timeout_task->q = queue;
  302         res = timeout_task->t.ta_pending;
  303         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  304                 /* Do nothing */
  305                 TQ_UNLOCK(queue);
  306                 res = -1;
  307         } else if (sbt == 0) {
  308                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  309                 /* The lock is released inside. */
  310         } else {
  311                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  312                         res++;
  313                 } else {
  314                         queue->tq_callouts++;
  315                         timeout_task->f |= DT_CALLOUT_ARMED;
  316                         if (sbt < 0)
  317                                 sbt = -sbt; /* Ignore overflow. */
  318                 }
  319                 if (sbt > 0) {
  320                         callout_reset_sbt(&timeout_task->c, sbt, pr,
  321                             taskqueue_timeout_func, timeout_task, flags);
  322                 }
  323                 TQ_UNLOCK(queue);
  324         }
  325         return (res);
  326 }
  327 
  328 int
  329 taskqueue_enqueue_timeout(struct taskqueue *queue,
  330     struct timeout_task *ttask, int ticks)
  331 {
  332 
  333         return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
  334             0, 0));
  335 }
  336 
  337 static void
  338 taskqueue_task_nop_fn(void *context, int pending)
  339 {
  340 }
  341 
  342 /*
  343  * Block until all currently queued tasks in this taskqueue
  344  * have begun execution.  Tasks queued during execution of
  345  * this function are ignored.
  346  */
  347 static void
  348 taskqueue_drain_tq_queue(struct taskqueue *queue)
  349 {
  350         struct task t_barrier;
  351 
  352         if (STAILQ_EMPTY(&queue->tq_queue))
  353                 return;
  354 
  355         /*
  356          * Enqueue our barrier after all current tasks, but with
  357          * the highest priority so that newly queued tasks cannot
  358          * pass it.  Because of the high priority, we can not use
  359          * taskqueue_enqueue_locked directly (which drops the lock
  360          * anyway) so just insert it at tail while we have the
  361          * queue lock.
  362          */
  363         TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
  364         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  365         t_barrier.ta_pending = 1;
  366 
  367         /*
  368          * Once the barrier has executed, all previously queued tasks
  369          * have completed or are currently executing.
  370          */
  371         while (t_barrier.ta_pending != 0)
  372                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
  373 }
  374 
  375 /*
  376  * Block until all currently executing tasks for this taskqueue
  377  * complete.  Tasks that begin execution during the execution
  378  * of this function are ignored.
  379  */
  380 static void
  381 taskqueue_drain_tq_active(struct taskqueue *queue)
  382 {
  383         struct taskqueue_busy tb_marker, *tb_first;
  384 
  385         if (TAILQ_EMPTY(&queue->tq_active))
  386                 return;
  387 
  388         /* Block taskq_terminate().*/
  389         queue->tq_callouts++;
  390 
  391         /*
  392          * Wait for all currently executing taskqueue threads
  393          * to go idle.
  394          */
  395         tb_marker.tb_running = TB_DRAIN_WAITER;
  396         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
  397         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
  398                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
  399         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
  400 
  401         /*
  402          * Wakeup any other drain waiter that happened to queue up
  403          * without any intervening active thread.
  404          */
  405         tb_first = TAILQ_FIRST(&queue->tq_active);
  406         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
  407                 wakeup(tb_first);
  408 
  409         /* Release taskqueue_terminate(). */
  410         queue->tq_callouts--;
  411         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  412                 wakeup_one(queue->tq_threads);
  413 }
  414 
  415 void
  416 taskqueue_block(struct taskqueue *queue)
  417 {
  418 
  419         TQ_LOCK(queue);
  420         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  421         TQ_UNLOCK(queue);
  422 }
  423 
  424 void
  425 taskqueue_unblock(struct taskqueue *queue)
  426 {
  427 
  428         TQ_LOCK(queue);
  429         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  430         if (!STAILQ_EMPTY(&queue->tq_queue))
  431                 queue->tq_enqueue(queue->tq_context);
  432         TQ_UNLOCK(queue);
  433 }
  434 
  435 static void
  436 taskqueue_run_locked(struct taskqueue *queue)
  437 {
  438         struct taskqueue_busy tb;
  439         struct taskqueue_busy *tb_first;
  440         struct task *task;
  441         int pending;
  442 
  443         KASSERT(queue != NULL, ("tq is NULL"));
  444         TQ_ASSERT_LOCKED(queue);
  445         tb.tb_running = NULL;
  446 
  447         while (STAILQ_FIRST(&queue->tq_queue)) {
  448                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  449 
  450                 /*
  451                  * Carefully remove the first task from the queue and
  452                  * zero its pending count.
  453                  */
  454                 task = STAILQ_FIRST(&queue->tq_queue);
  455                 KASSERT(task != NULL, ("task is NULL"));
  456                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  457                 pending = task->ta_pending;
  458                 task->ta_pending = 0;
  459                 tb.tb_running = task;
  460                 TQ_UNLOCK(queue);
  461 
  462                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
  463                 task->ta_func(task->ta_context, pending);
  464 
  465                 TQ_LOCK(queue);
  466                 tb.tb_running = NULL;
  467                 wakeup(task);
  468 
  469                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  470                 tb_first = TAILQ_FIRST(&queue->tq_active);
  471                 if (tb_first != NULL &&
  472                     tb_first->tb_running == TB_DRAIN_WAITER)
  473                         wakeup(tb_first);
  474         }
  475 }
  476 
  477 void
  478 taskqueue_run(struct taskqueue *queue)
  479 {
  480 
  481         TQ_LOCK(queue);
  482         taskqueue_run_locked(queue);
  483         TQ_UNLOCK(queue);
  484 }
  485 
  486 static int
  487 task_is_running(struct taskqueue *queue, struct task *task)
  488 {
  489         struct taskqueue_busy *tb;
  490 
  491         TQ_ASSERT_LOCKED(queue);
  492         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  493                 if (tb->tb_running == task)
  494                         return (1);
  495         }
  496         return (0);
  497 }
  498 
  499 /*
  500  * Only use this function in single threaded contexts. It returns
  501  * non-zero if the given task is either pending or running. Else the
  502  * task is idle and can be queued again or freed.
  503  */
  504 int
  505 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  506 {
  507         int retval;
  508 
  509         TQ_LOCK(queue);
  510         retval = task->ta_pending > 0 || task_is_running(queue, task);
  511         TQ_UNLOCK(queue);
  512 
  513         return (retval);
  514 }
  515 
  516 static int
  517 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  518     u_int *pendp)
  519 {
  520 
  521         if (task->ta_pending > 0)
  522                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  523         if (pendp != NULL)
  524                 *pendp = task->ta_pending;
  525         task->ta_pending = 0;
  526         return (task_is_running(queue, task) ? EBUSY : 0);
  527 }
  528 
  529 int
  530 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  531 {
  532         int error;
  533 
  534         TQ_LOCK(queue);
  535         error = taskqueue_cancel_locked(queue, task, pendp);
  536         TQ_UNLOCK(queue);
  537 
  538         return (error);
  539 }
  540 
  541 int
  542 taskqueue_cancel_timeout(struct taskqueue *queue,
  543     struct timeout_task *timeout_task, u_int *pendp)
  544 {
  545         u_int pending, pending1;
  546         int error;
  547 
  548         TQ_LOCK(queue);
  549         pending = !!(callout_stop(&timeout_task->c) > 0);
  550         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  551         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  552                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  553                 queue->tq_callouts--;
  554         }
  555         TQ_UNLOCK(queue);
  556 
  557         if (pendp != NULL)
  558                 *pendp = pending + pending1;
  559         return (error);
  560 }
  561 
  562 void
  563 taskqueue_drain(struct taskqueue *queue, struct task *task)
  564 {
  565 
  566         if (!queue->tq_spin)
  567                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  568 
  569         TQ_LOCK(queue);
  570         while (task->ta_pending != 0 || task_is_running(queue, task))
  571                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  572         TQ_UNLOCK(queue);
  573 }
  574 
  575 void
  576 taskqueue_drain_all(struct taskqueue *queue)
  577 {
  578 
  579         if (!queue->tq_spin)
  580                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  581 
  582         TQ_LOCK(queue);
  583         taskqueue_drain_tq_queue(queue);
  584         taskqueue_drain_tq_active(queue);
  585         TQ_UNLOCK(queue);
  586 }
  587 
  588 void
  589 taskqueue_drain_timeout(struct taskqueue *queue,
  590     struct timeout_task *timeout_task)
  591 {
  592 
  593         /*
  594          * Set flag to prevent timer from re-starting during drain:
  595          */
  596         TQ_LOCK(queue);
  597         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  598             ("Drain already in progress"));
  599         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  600         TQ_UNLOCK(queue);
  601 
  602         callout_drain(&timeout_task->c);
  603         taskqueue_drain(queue, &timeout_task->t);
  604 
  605         /*
  606          * Clear flag to allow timer to re-start:
  607          */
  608         TQ_LOCK(queue);
  609         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  610         TQ_UNLOCK(queue);
  611 }
  612 
  613 static void
  614 taskqueue_swi_enqueue(void *context)
  615 {
  616         swi_sched(taskqueue_ih, 0);
  617 }
  618 
  619 static void
  620 taskqueue_swi_run(void *dummy)
  621 {
  622         taskqueue_run(taskqueue_swi);
  623 }
  624 
  625 static void
  626 taskqueue_swi_giant_enqueue(void *context)
  627 {
  628         swi_sched(taskqueue_giant_ih, 0);
  629 }
  630 
  631 static void
  632 taskqueue_swi_giant_run(void *dummy)
  633 {
  634         taskqueue_run(taskqueue_swi_giant);
  635 }
  636 
  637 static int
  638 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  639     cpuset_t *mask, const char *name, va_list ap)
  640 {
  641         char ktname[MAXCOMLEN + 1];
  642         struct thread *td;
  643         struct taskqueue *tq;
  644         int i, error;
  645 
  646         if (count <= 0)
  647                 return (EINVAL);
  648 
  649         vsnprintf(ktname, sizeof(ktname), name, ap);
  650         tq = *tqp;
  651 
  652         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  653             M_NOWAIT | M_ZERO);
  654         if (tq->tq_threads == NULL) {
  655                 printf("%s: no memory for %s threads\n", __func__, ktname);
  656                 return (ENOMEM);
  657         }
  658 
  659         for (i = 0; i < count; i++) {
  660                 if (count == 1)
  661                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  662                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  663                 else
  664                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  665                             &tq->tq_threads[i], RFSTOPPED, 0,
  666                             "%s_%d", ktname, i);
  667                 if (error) {
  668                         /* should be ok to continue, taskqueue_free will dtrt */
  669                         printf("%s: kthread_add(%s): error %d", __func__,
  670                             ktname, error);
  671                         tq->tq_threads[i] = NULL;               /* paranoid */
  672                 } else
  673                         tq->tq_tcount++;
  674         }
  675         if (tq->tq_tcount == 0) {
  676                 free(tq->tq_threads, M_TASKQUEUE);
  677                 tq->tq_threads = NULL;
  678                 return (ENOMEM);
  679         }
  680         for (i = 0; i < count; i++) {
  681                 if (tq->tq_threads[i] == NULL)
  682                         continue;
  683                 td = tq->tq_threads[i];
  684                 if (mask) {
  685                         error = cpuset_setthread(td->td_tid, mask);
  686                         /*
  687                          * Failing to pin is rarely an actual fatal error;
  688                          * it'll just affect performance.
  689                          */
  690                         if (error)
  691                                 printf("%s: curthread=%llu: can't pin; "
  692                                     "error=%d\n",
  693                                     __func__,
  694                                     (unsigned long long) td->td_tid,
  695                                     error);
  696                 }
  697                 thread_lock(td);
  698                 sched_prio(td, pri);
  699                 sched_add(td, SRQ_BORING);
  700                 thread_unlock(td);
  701         }
  702 
  703         return (0);
  704 }
  705 
  706 int
  707 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  708     const char *name, ...)
  709 {
  710         va_list ap;
  711         int error;
  712 
  713         va_start(ap, name);
  714         error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  715         va_end(ap);
  716         return (error);
  717 }
  718 
  719 int
  720 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
  721     cpuset_t *mask, const char *name, ...)
  722 {
  723         va_list ap;
  724         int error;
  725 
  726         va_start(ap, name);
  727         error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
  728         va_end(ap);
  729         return (error);
  730 }
  731 
  732 static inline void
  733 taskqueue_run_callback(struct taskqueue *tq,
  734     enum taskqueue_callback_type cb_type)
  735 {
  736         taskqueue_callback_fn tq_callback;
  737 
  738         TQ_ASSERT_UNLOCKED(tq);
  739         tq_callback = tq->tq_callbacks[cb_type];
  740         if (tq_callback != NULL)
  741                 tq_callback(tq->tq_cb_contexts[cb_type]);
  742 }
  743 
  744 void
  745 taskqueue_thread_loop(void *arg)
  746 {
  747         struct taskqueue **tqp, *tq;
  748 
  749         tqp = arg;
  750         tq = *tqp;
  751         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  752         TQ_LOCK(tq);
  753         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  754                 /* XXX ? */
  755                 taskqueue_run_locked(tq);
  756                 /*
  757                  * Because taskqueue_run() can drop tq_mutex, we need to
  758                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  759                  * meantime, which means we missed a wakeup.
  760                  */
  761                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  762                         break;
  763                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  764         }
  765         taskqueue_run_locked(tq);
  766         /*
  767          * This thread is on its way out, so just drop the lock temporarily
  768          * in order to call the shutdown callback.  This allows the callback
  769          * to look at the taskqueue, even just before it dies.
  770          */
  771         TQ_UNLOCK(tq);
  772         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  773         TQ_LOCK(tq);
  774 
  775         /* rendezvous with thread that asked us to terminate */
  776         tq->tq_tcount--;
  777         wakeup_one(tq->tq_threads);
  778         TQ_UNLOCK(tq);
  779         kthread_exit();
  780 }
  781 
  782 void
  783 taskqueue_thread_enqueue(void *context)
  784 {
  785         struct taskqueue **tqp, *tq;
  786 
  787         tqp = context;
  788         tq = *tqp;
  789         wakeup_one(tq);
  790 }
  791 
  792 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  793                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  794                      INTR_MPSAFE, &taskqueue_ih));
  795 
  796 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  797                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  798                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
  799 
  800 TASKQUEUE_DEFINE_THREAD(thread);
  801 
  802 struct taskqueue *
  803 taskqueue_create_fast(const char *name, int mflags,
  804                  taskqueue_enqueue_fn enqueue, void *context)
  805 {
  806         return _taskqueue_create(name, mflags, enqueue, context,
  807                         MTX_SPIN, "fast_taskqueue");
  808 }
  809 
  810 static void     *taskqueue_fast_ih;
  811 
  812 static void
  813 taskqueue_fast_enqueue(void *context)
  814 {
  815         swi_sched(taskqueue_fast_ih, 0);
  816 }
  817 
  818 static void
  819 taskqueue_fast_run(void *dummy)
  820 {
  821         taskqueue_run(taskqueue_fast);
  822 }
  823 
  824 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  825         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  826         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  827 
  828 int
  829 taskqueue_member(struct taskqueue *queue, struct thread *td)
  830 {
  831         int i, j, ret = 0;
  832 
  833         for (i = 0, j = 0; ; i++) {
  834                 if (queue->tq_threads[i] == NULL)
  835                         continue;
  836                 if (queue->tq_threads[i] == td) {
  837                         ret = 1;
  838                         break;
  839                 }
  840                 if (++j >= queue->tq_tcount)
  841                         break;
  842         }
  843         return (ret);
  844 }

Cache object: 0edfac63e28159e4559ec17578da0777


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.