The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD: releng/11.1/sys/kern/subr_taskqueue.c 315267 2017-03-14 15:59:51Z hselasky $");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/cpuset.h>
   34 #include <sys/interrupt.h>
   35 #include <sys/kernel.h>
   36 #include <sys/kthread.h>
   37 #include <sys/libkern.h>
   38 #include <sys/limits.h>
   39 #include <sys/lock.h>
   40 #include <sys/malloc.h>
   41 #include <sys/mutex.h>
   42 #include <sys/proc.h>
   43 #include <sys/sched.h>
   44 #include <sys/smp.h>
   45 #include <sys/taskqueue.h>
   46 #include <sys/unistd.h>
   47 #include <machine/stdarg.h>
   48 
   49 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   50 static void     *taskqueue_giant_ih;
   51 static void     *taskqueue_ih;
   52 static void      taskqueue_fast_enqueue(void *);
   53 static void      taskqueue_swi_enqueue(void *);
   54 static void      taskqueue_swi_giant_enqueue(void *);
   55 
   56 struct taskqueue_busy {
   57         struct task     *tb_running;
   58         TAILQ_ENTRY(taskqueue_busy) tb_link;
   59 };
   60 
   61 struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
   62 
   63 struct taskqueue {
   64         STAILQ_HEAD(, task)     tq_queue;
   65         taskqueue_enqueue_fn    tq_enqueue;
   66         void                    *tq_context;
   67         char                    *tq_name;
   68         TAILQ_HEAD(, taskqueue_busy) tq_active;
   69         struct mtx              tq_mutex;
   70         struct thread           **tq_threads;
   71         int                     tq_tcount;
   72         int                     tq_spin;
   73         int                     tq_flags;
   74         int                     tq_callouts;
   75         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   76         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   77 };
   78 
   79 #define TQ_FLAGS_ACTIVE         (1 << 0)
   80 #define TQ_FLAGS_BLOCKED        (1 << 1)
   81 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   82 
   83 #define DT_CALLOUT_ARMED        (1 << 0)
   84 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   85 
   86 #define TQ_LOCK(tq)                                                     \
   87         do {                                                            \
   88                 if ((tq)->tq_spin)                                      \
   89                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   90                 else                                                    \
   91                         mtx_lock(&(tq)->tq_mutex);                      \
   92         } while (0)
   93 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   94 
   95 #define TQ_UNLOCK(tq)                                                   \
   96         do {                                                            \
   97                 if ((tq)->tq_spin)                                      \
   98                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   99                 else                                                    \
  100                         mtx_unlock(&(tq)->tq_mutex);                    \
  101         } while (0)
  102 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  103 
  104 void
  105 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  106     int priority, task_fn_t func, void *context)
  107 {
  108 
  109         TASK_INIT(&timeout_task->t, priority, func, context);
  110         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  111             CALLOUT_RETURNUNLOCKED);
  112         timeout_task->q = queue;
  113         timeout_task->f = 0;
  114 }
  115 
  116 static __inline int
  117 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  118     int t)
  119 {
  120         if (tq->tq_spin)
  121                 return (msleep_spin(p, m, wm, t));
  122         return (msleep(p, m, pri, wm, t));
  123 }
  124 
  125 static struct taskqueue *
  126 _taskqueue_create(const char *name, int mflags,
  127                  taskqueue_enqueue_fn enqueue, void *context,
  128                  int mtxflags, const char *mtxname __unused)
  129 {
  130         struct taskqueue *queue;
  131         char *tq_name;
  132 
  133         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
  134         if (tq_name == NULL)
  135                 return (NULL);
  136 
  137         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  138         if (queue == NULL) {
  139                 free(tq_name, M_TASKQUEUE);
  140                 return (NULL);
  141         }
  142 
  143         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  144 
  145         STAILQ_INIT(&queue->tq_queue);
  146         TAILQ_INIT(&queue->tq_active);
  147         queue->tq_enqueue = enqueue;
  148         queue->tq_context = context;
  149         queue->tq_name = tq_name;
  150         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  151         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  152         if (enqueue == taskqueue_fast_enqueue ||
  153             enqueue == taskqueue_swi_enqueue ||
  154             enqueue == taskqueue_swi_giant_enqueue ||
  155             enqueue == taskqueue_thread_enqueue)
  156                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  157         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  158 
  159         return (queue);
  160 }
  161 
  162 struct taskqueue *
  163 taskqueue_create(const char *name, int mflags,
  164                  taskqueue_enqueue_fn enqueue, void *context)
  165 {
  166 
  167         return _taskqueue_create(name, mflags, enqueue, context,
  168                         MTX_DEF, name);
  169 }
  170 
  171 void
  172 taskqueue_set_callback(struct taskqueue *queue,
  173     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  174     void *context)
  175 {
  176 
  177         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  178             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  179             ("Callback type %d not valid, must be %d-%d", cb_type,
  180             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  181         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  182             ("Re-initialization of taskqueue callback?"));
  183 
  184         queue->tq_callbacks[cb_type] = callback;
  185         queue->tq_cb_contexts[cb_type] = context;
  186 }
  187 
  188 /*
  189  * Signal a taskqueue thread to terminate.
  190  */
  191 static void
  192 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  193 {
  194 
  195         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  196                 wakeup(tq);
  197                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  198         }
  199 }
  200 
  201 void
  202 taskqueue_free(struct taskqueue *queue)
  203 {
  204 
  205         TQ_LOCK(queue);
  206         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  207         taskqueue_terminate(queue->tq_threads, queue);
  208         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  209         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  210         mtx_destroy(&queue->tq_mutex);
  211         free(queue->tq_threads, M_TASKQUEUE);
  212         free(queue->tq_name, M_TASKQUEUE);
  213         free(queue, M_TASKQUEUE);
  214 }
  215 
  216 static int
  217 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  218 {
  219         struct task *ins;
  220         struct task *prev;
  221 
  222         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
  223         /*
  224          * Count multiple enqueues.
  225          */
  226         if (task->ta_pending) {
  227                 if (task->ta_pending < USHRT_MAX)
  228                         task->ta_pending++;
  229                 TQ_UNLOCK(queue);
  230                 return (0);
  231         }
  232 
  233         /*
  234          * Optimise the case when all tasks have the same priority.
  235          */
  236         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  237         if (!prev || prev->ta_priority >= task->ta_priority) {
  238                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  239         } else {
  240                 prev = NULL;
  241                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  242                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  243                         if (ins->ta_priority < task->ta_priority)
  244                                 break;
  245 
  246                 if (prev)
  247                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  248                 else
  249                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  250         }
  251 
  252         task->ta_pending = 1;
  253         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  254                 TQ_UNLOCK(queue);
  255         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  256                 queue->tq_enqueue(queue->tq_context);
  257         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  258                 TQ_UNLOCK(queue);
  259 
  260         /* Return with lock released. */
  261         return (0);
  262 }
  263 
  264 int
  265 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  266 {
  267         int res;
  268 
  269         TQ_LOCK(queue);
  270         res = taskqueue_enqueue_locked(queue, task);
  271         /* The lock is released inside. */
  272 
  273         return (res);
  274 }
  275 
  276 static void
  277 taskqueue_timeout_func(void *arg)
  278 {
  279         struct taskqueue *queue;
  280         struct timeout_task *timeout_task;
  281 
  282         timeout_task = arg;
  283         queue = timeout_task->q;
  284         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  285         timeout_task->f &= ~DT_CALLOUT_ARMED;
  286         queue->tq_callouts--;
  287         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  288         /* The lock is released inside. */
  289 }
  290 
  291 int
  292 taskqueue_enqueue_timeout(struct taskqueue *queue,
  293     struct timeout_task *timeout_task, int ticks)
  294 {
  295         int res;
  296 
  297         TQ_LOCK(queue);
  298         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  299             ("Migrated queue"));
  300         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  301         timeout_task->q = queue;
  302         res = timeout_task->t.ta_pending;
  303         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  304                 /* Do nothing */
  305                 TQ_UNLOCK(queue);
  306                 res = -1;
  307         } else if (ticks == 0) {
  308                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  309                 /* The lock is released inside. */
  310         } else {
  311                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  312                         res++;
  313                 } else {
  314                         queue->tq_callouts++;
  315                         timeout_task->f |= DT_CALLOUT_ARMED;
  316                         if (ticks < 0)
  317                                 ticks = -ticks; /* Ignore overflow. */
  318                 }
  319                 if (ticks > 0) {
  320                         callout_reset(&timeout_task->c, ticks,
  321                             taskqueue_timeout_func, timeout_task);
  322                 }
  323                 TQ_UNLOCK(queue);
  324         }
  325         return (res);
  326 }
  327 
  328 static void
  329 taskqueue_task_nop_fn(void *context, int pending)
  330 {
  331 }
  332 
  333 /*
  334  * Block until all currently queued tasks in this taskqueue
  335  * have begun execution.  Tasks queued during execution of
  336  * this function are ignored.
  337  */
  338 static void
  339 taskqueue_drain_tq_queue(struct taskqueue *queue)
  340 {
  341         struct task t_barrier;
  342 
  343         if (STAILQ_EMPTY(&queue->tq_queue))
  344                 return;
  345 
  346         /*
  347          * Enqueue our barrier after all current tasks, but with
  348          * the highest priority so that newly queued tasks cannot
  349          * pass it.  Because of the high priority, we can not use
  350          * taskqueue_enqueue_locked directly (which drops the lock
  351          * anyway) so just insert it at tail while we have the
  352          * queue lock.
  353          */
  354         TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
  355         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  356         t_barrier.ta_pending = 1;
  357 
  358         /*
  359          * Once the barrier has executed, all previously queued tasks
  360          * have completed or are currently executing.
  361          */
  362         while (t_barrier.ta_pending != 0)
  363                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
  364 }
  365 
  366 /*
  367  * Block until all currently executing tasks for this taskqueue
  368  * complete.  Tasks that begin execution during the execution
  369  * of this function are ignored.
  370  */
  371 static void
  372 taskqueue_drain_tq_active(struct taskqueue *queue)
  373 {
  374         struct taskqueue_busy tb_marker, *tb_first;
  375 
  376         if (TAILQ_EMPTY(&queue->tq_active))
  377                 return;
  378 
  379         /* Block taskq_terminate().*/
  380         queue->tq_callouts++;
  381 
  382         /*
  383          * Wait for all currently executing taskqueue threads
  384          * to go idle.
  385          */
  386         tb_marker.tb_running = TB_DRAIN_WAITER;
  387         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
  388         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
  389                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
  390         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
  391 
  392         /*
  393          * Wakeup any other drain waiter that happened to queue up
  394          * without any intervening active thread.
  395          */
  396         tb_first = TAILQ_FIRST(&queue->tq_active);
  397         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
  398                 wakeup(tb_first);
  399 
  400         /* Release taskqueue_terminate(). */
  401         queue->tq_callouts--;
  402         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  403                 wakeup_one(queue->tq_threads);
  404 }
  405 
  406 void
  407 taskqueue_block(struct taskqueue *queue)
  408 {
  409 
  410         TQ_LOCK(queue);
  411         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  412         TQ_UNLOCK(queue);
  413 }
  414 
  415 void
  416 taskqueue_unblock(struct taskqueue *queue)
  417 {
  418 
  419         TQ_LOCK(queue);
  420         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  421         if (!STAILQ_EMPTY(&queue->tq_queue))
  422                 queue->tq_enqueue(queue->tq_context);
  423         TQ_UNLOCK(queue);
  424 }
  425 
  426 static void
  427 taskqueue_run_locked(struct taskqueue *queue)
  428 {
  429         struct taskqueue_busy tb;
  430         struct taskqueue_busy *tb_first;
  431         struct task *task;
  432         int pending;
  433 
  434         KASSERT(queue != NULL, ("tq is NULL"));
  435         TQ_ASSERT_LOCKED(queue);
  436         tb.tb_running = NULL;
  437 
  438         while (STAILQ_FIRST(&queue->tq_queue)) {
  439                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  440 
  441                 /*
  442                  * Carefully remove the first task from the queue and
  443                  * zero its pending count.
  444                  */
  445                 task = STAILQ_FIRST(&queue->tq_queue);
  446                 KASSERT(task != NULL, ("task is NULL"));
  447                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  448                 pending = task->ta_pending;
  449                 task->ta_pending = 0;
  450                 tb.tb_running = task;
  451                 TQ_UNLOCK(queue);
  452 
  453                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
  454                 task->ta_func(task->ta_context, pending);
  455 
  456                 TQ_LOCK(queue);
  457                 tb.tb_running = NULL;
  458                 wakeup(task);
  459 
  460                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  461                 tb_first = TAILQ_FIRST(&queue->tq_active);
  462                 if (tb_first != NULL &&
  463                     tb_first->tb_running == TB_DRAIN_WAITER)
  464                         wakeup(tb_first);
  465         }
  466 }
  467 
  468 void
  469 taskqueue_run(struct taskqueue *queue)
  470 {
  471 
  472         TQ_LOCK(queue);
  473         taskqueue_run_locked(queue);
  474         TQ_UNLOCK(queue);
  475 }
  476 
  477 static int
  478 task_is_running(struct taskqueue *queue, struct task *task)
  479 {
  480         struct taskqueue_busy *tb;
  481 
  482         TQ_ASSERT_LOCKED(queue);
  483         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  484                 if (tb->tb_running == task)
  485                         return (1);
  486         }
  487         return (0);
  488 }
  489 
  490 /*
  491  * Only use this function in single threaded contexts. It returns
  492  * non-zero if the given task is either pending or running. Else the
  493  * task is idle and can be queued again or freed.
  494  */
  495 int
  496 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  497 {
  498         int retval;
  499 
  500         TQ_LOCK(queue);
  501         retval = task->ta_pending > 0 || task_is_running(queue, task);
  502         TQ_UNLOCK(queue);
  503 
  504         return (retval);
  505 }
  506 
  507 static int
  508 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  509     u_int *pendp)
  510 {
  511 
  512         if (task->ta_pending > 0)
  513                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  514         if (pendp != NULL)
  515                 *pendp = task->ta_pending;
  516         task->ta_pending = 0;
  517         return (task_is_running(queue, task) ? EBUSY : 0);
  518 }
  519 
  520 int
  521 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  522 {
  523         int error;
  524 
  525         TQ_LOCK(queue);
  526         error = taskqueue_cancel_locked(queue, task, pendp);
  527         TQ_UNLOCK(queue);
  528 
  529         return (error);
  530 }
  531 
  532 int
  533 taskqueue_cancel_timeout(struct taskqueue *queue,
  534     struct timeout_task *timeout_task, u_int *pendp)
  535 {
  536         u_int pending, pending1;
  537         int error;
  538 
  539         TQ_LOCK(queue);
  540         pending = !!(callout_stop(&timeout_task->c) > 0);
  541         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  542         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  543                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  544                 queue->tq_callouts--;
  545         }
  546         TQ_UNLOCK(queue);
  547 
  548         if (pendp != NULL)
  549                 *pendp = pending + pending1;
  550         return (error);
  551 }
  552 
  553 void
  554 taskqueue_drain(struct taskqueue *queue, struct task *task)
  555 {
  556 
  557         if (!queue->tq_spin)
  558                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  559 
  560         TQ_LOCK(queue);
  561         while (task->ta_pending != 0 || task_is_running(queue, task))
  562                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  563         TQ_UNLOCK(queue);
  564 }
  565 
  566 void
  567 taskqueue_drain_all(struct taskqueue *queue)
  568 {
  569 
  570         if (!queue->tq_spin)
  571                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  572 
  573         TQ_LOCK(queue);
  574         taskqueue_drain_tq_queue(queue);
  575         taskqueue_drain_tq_active(queue);
  576         TQ_UNLOCK(queue);
  577 }
  578 
  579 void
  580 taskqueue_drain_timeout(struct taskqueue *queue,
  581     struct timeout_task *timeout_task)
  582 {
  583 
  584         /*
  585          * Set flag to prevent timer from re-starting during drain:
  586          */
  587         TQ_LOCK(queue);
  588         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  589             ("Drain already in progress"));
  590         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  591         TQ_UNLOCK(queue);
  592 
  593         callout_drain(&timeout_task->c);
  594         taskqueue_drain(queue, &timeout_task->t);
  595 
  596         /*
  597          * Clear flag to allow timer to re-start:
  598          */
  599         TQ_LOCK(queue);
  600         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  601         TQ_UNLOCK(queue);
  602 }
  603 
  604 static void
  605 taskqueue_swi_enqueue(void *context)
  606 {
  607         swi_sched(taskqueue_ih, 0);
  608 }
  609 
  610 static void
  611 taskqueue_swi_run(void *dummy)
  612 {
  613         taskqueue_run(taskqueue_swi);
  614 }
  615 
  616 static void
  617 taskqueue_swi_giant_enqueue(void *context)
  618 {
  619         swi_sched(taskqueue_giant_ih, 0);
  620 }
  621 
  622 static void
  623 taskqueue_swi_giant_run(void *dummy)
  624 {
  625         taskqueue_run(taskqueue_swi_giant);
  626 }
  627 
  628 static int
  629 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  630     cpuset_t *mask, const char *name, va_list ap)
  631 {
  632         char ktname[MAXCOMLEN + 1];
  633         struct thread *td;
  634         struct taskqueue *tq;
  635         int i, error;
  636 
  637         if (count <= 0)
  638                 return (EINVAL);
  639 
  640         vsnprintf(ktname, sizeof(ktname), name, ap);
  641         tq = *tqp;
  642 
  643         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  644             M_NOWAIT | M_ZERO);
  645         if (tq->tq_threads == NULL) {
  646                 printf("%s: no memory for %s threads\n", __func__, ktname);
  647                 return (ENOMEM);
  648         }
  649 
  650         for (i = 0; i < count; i++) {
  651                 if (count == 1)
  652                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  653                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  654                 else
  655                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  656                             &tq->tq_threads[i], RFSTOPPED, 0,
  657                             "%s_%d", ktname, i);
  658                 if (error) {
  659                         /* should be ok to continue, taskqueue_free will dtrt */
  660                         printf("%s: kthread_add(%s): error %d", __func__,
  661                             ktname, error);
  662                         tq->tq_threads[i] = NULL;               /* paranoid */
  663                 } else
  664                         tq->tq_tcount++;
  665         }
  666         for (i = 0; i < count; i++) {
  667                 if (tq->tq_threads[i] == NULL)
  668                         continue;
  669                 td = tq->tq_threads[i];
  670                 if (mask) {
  671                         error = cpuset_setthread(td->td_tid, mask);
  672                         /*
  673                          * Failing to pin is rarely an actual fatal error;
  674                          * it'll just affect performance.
  675                          */
  676                         if (error)
  677                                 printf("%s: curthread=%llu: can't pin; "
  678                                     "error=%d\n",
  679                                     __func__,
  680                                     (unsigned long long) td->td_tid,
  681                                     error);
  682                 }
  683                 thread_lock(td);
  684                 sched_prio(td, pri);
  685                 sched_add(td, SRQ_BORING);
  686                 thread_unlock(td);
  687         }
  688 
  689         return (0);
  690 }
  691 
  692 int
  693 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  694     const char *name, ...)
  695 {
  696         va_list ap;
  697         int error;
  698 
  699         va_start(ap, name);
  700         error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  701         va_end(ap);
  702         return (error);
  703 }
  704 
  705 int
  706 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
  707     cpuset_t *mask, const char *name, ...)
  708 {
  709         va_list ap;
  710         int error;
  711 
  712         va_start(ap, name);
  713         error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
  714         va_end(ap);
  715         return (error);
  716 }
  717 
  718 static inline void
  719 taskqueue_run_callback(struct taskqueue *tq,
  720     enum taskqueue_callback_type cb_type)
  721 {
  722         taskqueue_callback_fn tq_callback;
  723 
  724         TQ_ASSERT_UNLOCKED(tq);
  725         tq_callback = tq->tq_callbacks[cb_type];
  726         if (tq_callback != NULL)
  727                 tq_callback(tq->tq_cb_contexts[cb_type]);
  728 }
  729 
  730 void
  731 taskqueue_thread_loop(void *arg)
  732 {
  733         struct taskqueue **tqp, *tq;
  734 
  735         tqp = arg;
  736         tq = *tqp;
  737         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  738         TQ_LOCK(tq);
  739         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  740                 /* XXX ? */
  741                 taskqueue_run_locked(tq);
  742                 /*
  743                  * Because taskqueue_run() can drop tq_mutex, we need to
  744                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  745                  * meantime, which means we missed a wakeup.
  746                  */
  747                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  748                         break;
  749                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  750         }
  751         taskqueue_run_locked(tq);
  752         /*
  753          * This thread is on its way out, so just drop the lock temporarily
  754          * in order to call the shutdown callback.  This allows the callback
  755          * to look at the taskqueue, even just before it dies.
  756          */
  757         TQ_UNLOCK(tq);
  758         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  759         TQ_LOCK(tq);
  760 
  761         /* rendezvous with thread that asked us to terminate */
  762         tq->tq_tcount--;
  763         wakeup_one(tq->tq_threads);
  764         TQ_UNLOCK(tq);
  765         kthread_exit();
  766 }
  767 
  768 void
  769 taskqueue_thread_enqueue(void *context)
  770 {
  771         struct taskqueue **tqp, *tq;
  772 
  773         tqp = context;
  774         tq = *tqp;
  775         wakeup_one(tq);
  776 }
  777 
  778 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  779                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  780                      INTR_MPSAFE, &taskqueue_ih));
  781 
  782 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  783                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  784                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
  785 
  786 TASKQUEUE_DEFINE_THREAD(thread);
  787 
  788 struct taskqueue *
  789 taskqueue_create_fast(const char *name, int mflags,
  790                  taskqueue_enqueue_fn enqueue, void *context)
  791 {
  792         return _taskqueue_create(name, mflags, enqueue, context,
  793                         MTX_SPIN, "fast_taskqueue");
  794 }
  795 
  796 static void     *taskqueue_fast_ih;
  797 
  798 static void
  799 taskqueue_fast_enqueue(void *context)
  800 {
  801         swi_sched(taskqueue_fast_ih, 0);
  802 }
  803 
  804 static void
  805 taskqueue_fast_run(void *dummy)
  806 {
  807         taskqueue_run(taskqueue_fast);
  808 }
  809 
  810 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  811         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  812         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  813 
  814 int
  815 taskqueue_member(struct taskqueue *queue, struct thread *td)
  816 {
  817         int i, j, ret = 0;
  818 
  819         for (i = 0, j = 0; ; i++) {
  820                 if (queue->tq_threads[i] == NULL)
  821                         continue;
  822                 if (queue->tq_threads[i] == td) {
  823                         ret = 1;
  824                         break;
  825                 }
  826                 if (++j >= queue->tq_tcount)
  827                         break;
  828         }
  829         return (ret);
  830 }

Cache object: 3646603fee4118210a0b5c5837de798a


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.