The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
    3  *
    4  * Copyright (c) 2000 Doug Rabson
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD: releng/12.0/sys/kern/subr_taskqueue.c 328218 2018-01-21 15:42:36Z pfg $");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/sched.h>
   46 #include <sys/smp.h>
   47 #include <sys/taskqueue.h>
   48 #include <sys/unistd.h>
   49 #include <machine/stdarg.h>
   50 
   51 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   52 static void     *taskqueue_giant_ih;
   53 static void     *taskqueue_ih;
   54 static void      taskqueue_fast_enqueue(void *);
   55 static void      taskqueue_swi_enqueue(void *);
   56 static void      taskqueue_swi_giant_enqueue(void *);
   57 
   58 struct taskqueue_busy {
   59         struct task     *tb_running;
   60         TAILQ_ENTRY(taskqueue_busy) tb_link;
   61 };
   62 
   63 struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
   64 
   65 struct taskqueue {
   66         STAILQ_HEAD(, task)     tq_queue;
   67         taskqueue_enqueue_fn    tq_enqueue;
   68         void                    *tq_context;
   69         char                    *tq_name;
   70         TAILQ_HEAD(, taskqueue_busy) tq_active;
   71         struct mtx              tq_mutex;
   72         struct thread           **tq_threads;
   73         int                     tq_tcount;
   74         int                     tq_spin;
   75         int                     tq_flags;
   76         int                     tq_callouts;
   77         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   78         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   79 };
   80 
   81 #define TQ_FLAGS_ACTIVE         (1 << 0)
   82 #define TQ_FLAGS_BLOCKED        (1 << 1)
   83 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   84 
   85 #define DT_CALLOUT_ARMED        (1 << 0)
   86 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   87 
   88 #define TQ_LOCK(tq)                                                     \
   89         do {                                                            \
   90                 if ((tq)->tq_spin)                                      \
   91                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   92                 else                                                    \
   93                         mtx_lock(&(tq)->tq_mutex);                      \
   94         } while (0)
   95 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   96 
   97 #define TQ_UNLOCK(tq)                                                   \
   98         do {                                                            \
   99                 if ((tq)->tq_spin)                                      \
  100                         mtx_unlock_spin(&(tq)->tq_mutex);               \
  101                 else                                                    \
  102                         mtx_unlock(&(tq)->tq_mutex);                    \
  103         } while (0)
  104 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  105 
  106 void
  107 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  108     int priority, task_fn_t func, void *context)
  109 {
  110 
  111         TASK_INIT(&timeout_task->t, priority, func, context);
  112         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  113             CALLOUT_RETURNUNLOCKED);
  114         timeout_task->q = queue;
  115         timeout_task->f = 0;
  116 }
  117 
  118 static __inline int
  119 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  120     int t)
  121 {
  122         if (tq->tq_spin)
  123                 return (msleep_spin(p, m, wm, t));
  124         return (msleep(p, m, pri, wm, t));
  125 }
  126 
  127 static struct taskqueue *
  128 _taskqueue_create(const char *name, int mflags,
  129                  taskqueue_enqueue_fn enqueue, void *context,
  130                  int mtxflags, const char *mtxname __unused)
  131 {
  132         struct taskqueue *queue;
  133         char *tq_name;
  134 
  135         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
  136         if (tq_name == NULL)
  137                 return (NULL);
  138 
  139         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  140         if (queue == NULL) {
  141                 free(tq_name, M_TASKQUEUE);
  142                 return (NULL);
  143         }
  144 
  145         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  146 
  147         STAILQ_INIT(&queue->tq_queue);
  148         TAILQ_INIT(&queue->tq_active);
  149         queue->tq_enqueue = enqueue;
  150         queue->tq_context = context;
  151         queue->tq_name = tq_name;
  152         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  153         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  154         if (enqueue == taskqueue_fast_enqueue ||
  155             enqueue == taskqueue_swi_enqueue ||
  156             enqueue == taskqueue_swi_giant_enqueue ||
  157             enqueue == taskqueue_thread_enqueue)
  158                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  159         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  160 
  161         return (queue);
  162 }
  163 
  164 struct taskqueue *
  165 taskqueue_create(const char *name, int mflags,
  166                  taskqueue_enqueue_fn enqueue, void *context)
  167 {
  168 
  169         return _taskqueue_create(name, mflags, enqueue, context,
  170                         MTX_DEF, name);
  171 }
  172 
  173 void
  174 taskqueue_set_callback(struct taskqueue *queue,
  175     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  176     void *context)
  177 {
  178 
  179         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  180             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  181             ("Callback type %d not valid, must be %d-%d", cb_type,
  182             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  183         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  184             ("Re-initialization of taskqueue callback?"));
  185 
  186         queue->tq_callbacks[cb_type] = callback;
  187         queue->tq_cb_contexts[cb_type] = context;
  188 }
  189 
  190 /*
  191  * Signal a taskqueue thread to terminate.
  192  */
  193 static void
  194 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  195 {
  196 
  197         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  198                 wakeup(tq);
  199                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  200         }
  201 }
  202 
  203 void
  204 taskqueue_free(struct taskqueue *queue)
  205 {
  206 
  207         TQ_LOCK(queue);
  208         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  209         taskqueue_terminate(queue->tq_threads, queue);
  210         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  211         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  212         mtx_destroy(&queue->tq_mutex);
  213         free(queue->tq_threads, M_TASKQUEUE);
  214         free(queue->tq_name, M_TASKQUEUE);
  215         free(queue, M_TASKQUEUE);
  216 }
  217 
  218 static int
  219 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  220 {
  221         struct task *ins;
  222         struct task *prev;
  223 
  224         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
  225         /*
  226          * Count multiple enqueues.
  227          */
  228         if (task->ta_pending) {
  229                 if (task->ta_pending < USHRT_MAX)
  230                         task->ta_pending++;
  231                 TQ_UNLOCK(queue);
  232                 return (0);
  233         }
  234 
  235         /*
  236          * Optimise the case when all tasks have the same priority.
  237          */
  238         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  239         if (!prev || prev->ta_priority >= task->ta_priority) {
  240                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  241         } else {
  242                 prev = NULL;
  243                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  244                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  245                         if (ins->ta_priority < task->ta_priority)
  246                                 break;
  247 
  248                 if (prev)
  249                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  250                 else
  251                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  252         }
  253 
  254         task->ta_pending = 1;
  255         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  256                 TQ_UNLOCK(queue);
  257         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  258                 queue->tq_enqueue(queue->tq_context);
  259         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  260                 TQ_UNLOCK(queue);
  261 
  262         /* Return with lock released. */
  263         return (0);
  264 }
  265 
  266 int
  267 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  268 {
  269         int res;
  270 
  271         TQ_LOCK(queue);
  272         res = taskqueue_enqueue_locked(queue, task);
  273         /* The lock is released inside. */
  274 
  275         return (res);
  276 }
  277 
  278 static void
  279 taskqueue_timeout_func(void *arg)
  280 {
  281         struct taskqueue *queue;
  282         struct timeout_task *timeout_task;
  283 
  284         timeout_task = arg;
  285         queue = timeout_task->q;
  286         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  287         timeout_task->f &= ~DT_CALLOUT_ARMED;
  288         queue->tq_callouts--;
  289         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  290         /* The lock is released inside. */
  291 }
  292 
  293 int
  294 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
  295     struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
  296 {
  297         int res;
  298 
  299         TQ_LOCK(queue);
  300         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  301             ("Migrated queue"));
  302         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  303         timeout_task->q = queue;
  304         res = timeout_task->t.ta_pending;
  305         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  306                 /* Do nothing */
  307                 TQ_UNLOCK(queue);
  308                 res = -1;
  309         } else if (sbt == 0) {
  310                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  311                 /* The lock is released inside. */
  312         } else {
  313                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  314                         res++;
  315                 } else {
  316                         queue->tq_callouts++;
  317                         timeout_task->f |= DT_CALLOUT_ARMED;
  318                         if (sbt < 0)
  319                                 sbt = -sbt; /* Ignore overflow. */
  320                 }
  321                 if (sbt > 0) {
  322                         callout_reset_sbt(&timeout_task->c, sbt, pr,
  323                             taskqueue_timeout_func, timeout_task, flags);
  324                 }
  325                 TQ_UNLOCK(queue);
  326         }
  327         return (res);
  328 }
  329 
  330 int
  331 taskqueue_enqueue_timeout(struct taskqueue *queue,
  332     struct timeout_task *ttask, int ticks)
  333 {
  334 
  335         return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
  336             0, 0));
  337 }
  338 
  339 static void
  340 taskqueue_task_nop_fn(void *context, int pending)
  341 {
  342 }
  343 
  344 /*
  345  * Block until all currently queued tasks in this taskqueue
  346  * have begun execution.  Tasks queued during execution of
  347  * this function are ignored.
  348  */
  349 static void
  350 taskqueue_drain_tq_queue(struct taskqueue *queue)
  351 {
  352         struct task t_barrier;
  353 
  354         if (STAILQ_EMPTY(&queue->tq_queue))
  355                 return;
  356 
  357         /*
  358          * Enqueue our barrier after all current tasks, but with
  359          * the highest priority so that newly queued tasks cannot
  360          * pass it.  Because of the high priority, we can not use
  361          * taskqueue_enqueue_locked directly (which drops the lock
  362          * anyway) so just insert it at tail while we have the
  363          * queue lock.
  364          */
  365         TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
  366         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  367         t_barrier.ta_pending = 1;
  368 
  369         /*
  370          * Once the barrier has executed, all previously queued tasks
  371          * have completed or are currently executing.
  372          */
  373         while (t_barrier.ta_pending != 0)
  374                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
  375 }
  376 
  377 /*
  378  * Block until all currently executing tasks for this taskqueue
  379  * complete.  Tasks that begin execution during the execution
  380  * of this function are ignored.
  381  */
  382 static void
  383 taskqueue_drain_tq_active(struct taskqueue *queue)
  384 {
  385         struct taskqueue_busy tb_marker, *tb_first;
  386 
  387         if (TAILQ_EMPTY(&queue->tq_active))
  388                 return;
  389 
  390         /* Block taskq_terminate().*/
  391         queue->tq_callouts++;
  392 
  393         /*
  394          * Wait for all currently executing taskqueue threads
  395          * to go idle.
  396          */
  397         tb_marker.tb_running = TB_DRAIN_WAITER;
  398         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
  399         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
  400                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
  401         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
  402 
  403         /*
  404          * Wakeup any other drain waiter that happened to queue up
  405          * without any intervening active thread.
  406          */
  407         tb_first = TAILQ_FIRST(&queue->tq_active);
  408         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
  409                 wakeup(tb_first);
  410 
  411         /* Release taskqueue_terminate(). */
  412         queue->tq_callouts--;
  413         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  414                 wakeup_one(queue->tq_threads);
  415 }
  416 
  417 void
  418 taskqueue_block(struct taskqueue *queue)
  419 {
  420 
  421         TQ_LOCK(queue);
  422         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  423         TQ_UNLOCK(queue);
  424 }
  425 
  426 void
  427 taskqueue_unblock(struct taskqueue *queue)
  428 {
  429 
  430         TQ_LOCK(queue);
  431         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  432         if (!STAILQ_EMPTY(&queue->tq_queue))
  433                 queue->tq_enqueue(queue->tq_context);
  434         TQ_UNLOCK(queue);
  435 }
  436 
  437 static void
  438 taskqueue_run_locked(struct taskqueue *queue)
  439 {
  440         struct taskqueue_busy tb;
  441         struct taskqueue_busy *tb_first;
  442         struct task *task;
  443         int pending;
  444 
  445         KASSERT(queue != NULL, ("tq is NULL"));
  446         TQ_ASSERT_LOCKED(queue);
  447         tb.tb_running = NULL;
  448 
  449         while (STAILQ_FIRST(&queue->tq_queue)) {
  450                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  451 
  452                 /*
  453                  * Carefully remove the first task from the queue and
  454                  * zero its pending count.
  455                  */
  456                 task = STAILQ_FIRST(&queue->tq_queue);
  457                 KASSERT(task != NULL, ("task is NULL"));
  458                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  459                 pending = task->ta_pending;
  460                 task->ta_pending = 0;
  461                 tb.tb_running = task;
  462                 TQ_UNLOCK(queue);
  463 
  464                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
  465                 task->ta_func(task->ta_context, pending);
  466 
  467                 TQ_LOCK(queue);
  468                 tb.tb_running = NULL;
  469                 wakeup(task);
  470 
  471                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  472                 tb_first = TAILQ_FIRST(&queue->tq_active);
  473                 if (tb_first != NULL &&
  474                     tb_first->tb_running == TB_DRAIN_WAITER)
  475                         wakeup(tb_first);
  476         }
  477 }
  478 
  479 void
  480 taskqueue_run(struct taskqueue *queue)
  481 {
  482 
  483         TQ_LOCK(queue);
  484         taskqueue_run_locked(queue);
  485         TQ_UNLOCK(queue);
  486 }
  487 
  488 static int
  489 task_is_running(struct taskqueue *queue, struct task *task)
  490 {
  491         struct taskqueue_busy *tb;
  492 
  493         TQ_ASSERT_LOCKED(queue);
  494         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  495                 if (tb->tb_running == task)
  496                         return (1);
  497         }
  498         return (0);
  499 }
  500 
  501 /*
  502  * Only use this function in single threaded contexts. It returns
  503  * non-zero if the given task is either pending or running. Else the
  504  * task is idle and can be queued again or freed.
  505  */
  506 int
  507 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  508 {
  509         int retval;
  510 
  511         TQ_LOCK(queue);
  512         retval = task->ta_pending > 0 || task_is_running(queue, task);
  513         TQ_UNLOCK(queue);
  514 
  515         return (retval);
  516 }
  517 
  518 static int
  519 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  520     u_int *pendp)
  521 {
  522 
  523         if (task->ta_pending > 0)
  524                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  525         if (pendp != NULL)
  526                 *pendp = task->ta_pending;
  527         task->ta_pending = 0;
  528         return (task_is_running(queue, task) ? EBUSY : 0);
  529 }
  530 
  531 int
  532 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  533 {
  534         int error;
  535 
  536         TQ_LOCK(queue);
  537         error = taskqueue_cancel_locked(queue, task, pendp);
  538         TQ_UNLOCK(queue);
  539 
  540         return (error);
  541 }
  542 
  543 int
  544 taskqueue_cancel_timeout(struct taskqueue *queue,
  545     struct timeout_task *timeout_task, u_int *pendp)
  546 {
  547         u_int pending, pending1;
  548         int error;
  549 
  550         TQ_LOCK(queue);
  551         pending = !!(callout_stop(&timeout_task->c) > 0);
  552         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  553         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  554                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  555                 queue->tq_callouts--;
  556         }
  557         TQ_UNLOCK(queue);
  558 
  559         if (pendp != NULL)
  560                 *pendp = pending + pending1;
  561         return (error);
  562 }
  563 
  564 void
  565 taskqueue_drain(struct taskqueue *queue, struct task *task)
  566 {
  567 
  568         if (!queue->tq_spin)
  569                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  570 
  571         TQ_LOCK(queue);
  572         while (task->ta_pending != 0 || task_is_running(queue, task))
  573                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  574         TQ_UNLOCK(queue);
  575 }
  576 
  577 void
  578 taskqueue_drain_all(struct taskqueue *queue)
  579 {
  580 
  581         if (!queue->tq_spin)
  582                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  583 
  584         TQ_LOCK(queue);
  585         taskqueue_drain_tq_queue(queue);
  586         taskqueue_drain_tq_active(queue);
  587         TQ_UNLOCK(queue);
  588 }
  589 
  590 void
  591 taskqueue_drain_timeout(struct taskqueue *queue,
  592     struct timeout_task *timeout_task)
  593 {
  594 
  595         /*
  596          * Set flag to prevent timer from re-starting during drain:
  597          */
  598         TQ_LOCK(queue);
  599         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  600             ("Drain already in progress"));
  601         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  602         TQ_UNLOCK(queue);
  603 
  604         callout_drain(&timeout_task->c);
  605         taskqueue_drain(queue, &timeout_task->t);
  606 
  607         /*
  608          * Clear flag to allow timer to re-start:
  609          */
  610         TQ_LOCK(queue);
  611         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  612         TQ_UNLOCK(queue);
  613 }
  614 
  615 static void
  616 taskqueue_swi_enqueue(void *context)
  617 {
  618         swi_sched(taskqueue_ih, 0);
  619 }
  620 
  621 static void
  622 taskqueue_swi_run(void *dummy)
  623 {
  624         taskqueue_run(taskqueue_swi);
  625 }
  626 
  627 static void
  628 taskqueue_swi_giant_enqueue(void *context)
  629 {
  630         swi_sched(taskqueue_giant_ih, 0);
  631 }
  632 
  633 static void
  634 taskqueue_swi_giant_run(void *dummy)
  635 {
  636         taskqueue_run(taskqueue_swi_giant);
  637 }
  638 
  639 static int
  640 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  641     cpuset_t *mask, const char *name, va_list ap)
  642 {
  643         char ktname[MAXCOMLEN + 1];
  644         struct thread *td;
  645         struct taskqueue *tq;
  646         int i, error;
  647 
  648         if (count <= 0)
  649                 return (EINVAL);
  650 
  651         vsnprintf(ktname, sizeof(ktname), name, ap);
  652         tq = *tqp;
  653 
  654         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  655             M_NOWAIT | M_ZERO);
  656         if (tq->tq_threads == NULL) {
  657                 printf("%s: no memory for %s threads\n", __func__, ktname);
  658                 return (ENOMEM);
  659         }
  660 
  661         for (i = 0; i < count; i++) {
  662                 if (count == 1)
  663                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  664                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  665                 else
  666                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  667                             &tq->tq_threads[i], RFSTOPPED, 0,
  668                             "%s_%d", ktname, i);
  669                 if (error) {
  670                         /* should be ok to continue, taskqueue_free will dtrt */
  671                         printf("%s: kthread_add(%s): error %d", __func__,
  672                             ktname, error);
  673                         tq->tq_threads[i] = NULL;               /* paranoid */
  674                 } else
  675                         tq->tq_tcount++;
  676         }
  677         if (tq->tq_tcount == 0) {
  678                 free(tq->tq_threads, M_TASKQUEUE);
  679                 tq->tq_threads = NULL;
  680                 return (ENOMEM);
  681         }
  682         for (i = 0; i < count; i++) {
  683                 if (tq->tq_threads[i] == NULL)
  684                         continue;
  685                 td = tq->tq_threads[i];
  686                 if (mask) {
  687                         error = cpuset_setthread(td->td_tid, mask);
  688                         /*
  689                          * Failing to pin is rarely an actual fatal error;
  690                          * it'll just affect performance.
  691                          */
  692                         if (error)
  693                                 printf("%s: curthread=%llu: can't pin; "
  694                                     "error=%d\n",
  695                                     __func__,
  696                                     (unsigned long long) td->td_tid,
  697                                     error);
  698                 }
  699                 thread_lock(td);
  700                 sched_prio(td, pri);
  701                 sched_add(td, SRQ_BORING);
  702                 thread_unlock(td);
  703         }
  704 
  705         return (0);
  706 }
  707 
  708 int
  709 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  710     const char *name, ...)
  711 {
  712         va_list ap;
  713         int error;
  714 
  715         va_start(ap, name);
  716         error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  717         va_end(ap);
  718         return (error);
  719 }
  720 
  721 int
  722 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
  723     cpuset_t *mask, const char *name, ...)
  724 {
  725         va_list ap;
  726         int error;
  727 
  728         va_start(ap, name);
  729         error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
  730         va_end(ap);
  731         return (error);
  732 }
  733 
  734 static inline void
  735 taskqueue_run_callback(struct taskqueue *tq,
  736     enum taskqueue_callback_type cb_type)
  737 {
  738         taskqueue_callback_fn tq_callback;
  739 
  740         TQ_ASSERT_UNLOCKED(tq);
  741         tq_callback = tq->tq_callbacks[cb_type];
  742         if (tq_callback != NULL)
  743                 tq_callback(tq->tq_cb_contexts[cb_type]);
  744 }
  745 
  746 void
  747 taskqueue_thread_loop(void *arg)
  748 {
  749         struct taskqueue **tqp, *tq;
  750 
  751         tqp = arg;
  752         tq = *tqp;
  753         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  754         TQ_LOCK(tq);
  755         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  756                 /* XXX ? */
  757                 taskqueue_run_locked(tq);
  758                 /*
  759                  * Because taskqueue_run() can drop tq_mutex, we need to
  760                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  761                  * meantime, which means we missed a wakeup.
  762                  */
  763                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  764                         break;
  765                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  766         }
  767         taskqueue_run_locked(tq);
  768         /*
  769          * This thread is on its way out, so just drop the lock temporarily
  770          * in order to call the shutdown callback.  This allows the callback
  771          * to look at the taskqueue, even just before it dies.
  772          */
  773         TQ_UNLOCK(tq);
  774         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  775         TQ_LOCK(tq);
  776 
  777         /* rendezvous with thread that asked us to terminate */
  778         tq->tq_tcount--;
  779         wakeup_one(tq->tq_threads);
  780         TQ_UNLOCK(tq);
  781         kthread_exit();
  782 }
  783 
  784 void
  785 taskqueue_thread_enqueue(void *context)
  786 {
  787         struct taskqueue **tqp, *tq;
  788 
  789         tqp = context;
  790         tq = *tqp;
  791         wakeup_one(tq);
  792 }
  793 
  794 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  795                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  796                      INTR_MPSAFE, &taskqueue_ih));
  797 
  798 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  799                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  800                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
  801 
  802 TASKQUEUE_DEFINE_THREAD(thread);
  803 
  804 struct taskqueue *
  805 taskqueue_create_fast(const char *name, int mflags,
  806                  taskqueue_enqueue_fn enqueue, void *context)
  807 {
  808         return _taskqueue_create(name, mflags, enqueue, context,
  809                         MTX_SPIN, "fast_taskqueue");
  810 }
  811 
  812 static void     *taskqueue_fast_ih;
  813 
  814 static void
  815 taskqueue_fast_enqueue(void *context)
  816 {
  817         swi_sched(taskqueue_fast_ih, 0);
  818 }
  819 
  820 static void
  821 taskqueue_fast_run(void *dummy)
  822 {
  823         taskqueue_run(taskqueue_fast);
  824 }
  825 
  826 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  827         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  828         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  829 
  830 int
  831 taskqueue_member(struct taskqueue *queue, struct thread *td)
  832 {
  833         int i, j, ret = 0;
  834 
  835         for (i = 0, j = 0; ; i++) {
  836                 if (queue->tq_threads[i] == NULL)
  837                         continue;
  838                 if (queue->tq_threads[i] == td) {
  839                         ret = 1;
  840                         break;
  841                 }
  842                 if (++j >= queue->tq_tcount)
  843                         break;
  844         }
  845         return (ret);
  846 }

Cache object: 68615a70563f3ec29a18f1212b4d8851


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.