The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD$");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/cpuset.h>
   34 #include <sys/interrupt.h>
   35 #include <sys/kernel.h>
   36 #include <sys/kthread.h>
   37 #include <sys/libkern.h>
   38 #include <sys/limits.h>
   39 #include <sys/lock.h>
   40 #include <sys/malloc.h>
   41 #include <sys/mutex.h>
   42 #include <sys/proc.h>
   43 #include <sys/sched.h>
   44 #include <sys/smp.h>
   45 #include <sys/taskqueue.h>
   46 #include <sys/unistd.h>
   47 #include <machine/stdarg.h>
   48 
   49 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   50 static void     *taskqueue_giant_ih;
   51 static void     *taskqueue_ih;
   52 static void      taskqueue_fast_enqueue(void *);
   53 static void      taskqueue_swi_enqueue(void *);
   54 static void      taskqueue_swi_giant_enqueue(void *);
   55 
   56 struct taskqueue_busy {
   57         struct task             *tb_running;
   58         u_int                    tb_seq;
   59         LIST_ENTRY(taskqueue_busy) tb_link;
   60 };
   61 
   62 struct taskqueue {
   63         STAILQ_HEAD(, task)     tq_queue;
   64         LIST_HEAD(, taskqueue_busy) tq_active;
   65         struct task             *tq_hint;
   66         u_int                   tq_seq;
   67         int                     tq_callouts;
   68         struct mtx_padalign     tq_mutex;
   69         taskqueue_enqueue_fn    tq_enqueue;
   70         void                    *tq_context;
   71         char                    *tq_name;
   72         struct thread           **tq_threads;
   73         int                     tq_tcount;
   74         int                     tq_spin;
   75         int                     tq_flags;
   76         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   77         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   78 };
   79 
   80 #define TQ_FLAGS_ACTIVE         (1 << 0)
   81 #define TQ_FLAGS_BLOCKED        (1 << 1)
   82 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   83 
   84 #define DT_CALLOUT_ARMED        (1 << 0)
   85 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   86 
   87 #define TQ_LOCK(tq)                                                     \
   88         do {                                                            \
   89                 if ((tq)->tq_spin)                                      \
   90                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   91                 else                                                    \
   92                         mtx_lock(&(tq)->tq_mutex);                      \
   93         } while (0)
   94 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   95 
   96 #define TQ_UNLOCK(tq)                                                   \
   97         do {                                                            \
   98                 if ((tq)->tq_spin)                                      \
   99                         mtx_unlock_spin(&(tq)->tq_mutex);               \
  100                 else                                                    \
  101                         mtx_unlock(&(tq)->tq_mutex);                    \
  102         } while (0)
  103 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  104 
  105 void
  106 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  107     int priority, task_fn_t func, void *context)
  108 {
  109 
  110         TASK_INIT(&timeout_task->t, priority, func, context);
  111         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  112             CALLOUT_RETURNUNLOCKED);
  113         timeout_task->q = queue;
  114         timeout_task->f = 0;
  115 }
  116 
  117 static __inline int
  118 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm)
  119 {
  120         if (tq->tq_spin)
  121                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
  122         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
  123 }
  124 
  125 static struct taskqueue *
  126 _taskqueue_create(const char *name, int mflags,
  127                  taskqueue_enqueue_fn enqueue, void *context,
  128                  int mtxflags, const char *mtxname __unused)
  129 {
  130         struct taskqueue *queue;
  131         char *tq_name;
  132 
  133         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
  134         if (tq_name == NULL)
  135                 return (NULL);
  136 
  137         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  138         if (queue == NULL) {
  139                 free(tq_name, M_TASKQUEUE);
  140                 return (NULL);
  141         }
  142 
  143         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  144 
  145         STAILQ_INIT(&queue->tq_queue);
  146         LIST_INIT(&queue->tq_active);
  147         queue->tq_enqueue = enqueue;
  148         queue->tq_context = context;
  149         queue->tq_name = tq_name;
  150         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  151         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  152         if (enqueue == taskqueue_fast_enqueue ||
  153             enqueue == taskqueue_swi_enqueue ||
  154             enqueue == taskqueue_swi_giant_enqueue ||
  155             enqueue == taskqueue_thread_enqueue)
  156                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  157         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  158 
  159         return (queue);
  160 }
  161 
  162 struct taskqueue *
  163 taskqueue_create(const char *name, int mflags,
  164                  taskqueue_enqueue_fn enqueue, void *context)
  165 {
  166 
  167         return _taskqueue_create(name, mflags, enqueue, context,
  168                         MTX_DEF, name);
  169 }
  170 
  171 void
  172 taskqueue_set_callback(struct taskqueue *queue,
  173     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  174     void *context)
  175 {
  176 
  177         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  178             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  179             ("Callback type %d not valid, must be %d-%d", cb_type,
  180             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  181         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  182             ("Re-initialization of taskqueue callback?"));
  183 
  184         queue->tq_callbacks[cb_type] = callback;
  185         queue->tq_cb_contexts[cb_type] = context;
  186 }
  187 
  188 /*
  189  * Signal a taskqueue thread to terminate.
  190  */
  191 static void
  192 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  193 {
  194 
  195         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  196                 wakeup(tq);
  197                 TQ_SLEEP(tq, pp, "tq_destroy");
  198         }
  199 }
  200 
  201 void
  202 taskqueue_free(struct taskqueue *queue)
  203 {
  204 
  205         TQ_LOCK(queue);
  206         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  207         taskqueue_terminate(queue->tq_threads, queue);
  208         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
  209         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  210         mtx_destroy(&queue->tq_mutex);
  211         free(queue->tq_threads, M_TASKQUEUE);
  212         free(queue->tq_name, M_TASKQUEUE);
  213         free(queue, M_TASKQUEUE);
  214 }
  215 
  216 static int
  217 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  218 {
  219         struct task *ins;
  220         struct task *prev;
  221 
  222         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
  223         /*
  224          * Count multiple enqueues.
  225          */
  226         if (task->ta_pending) {
  227                 if (task->ta_pending < USHRT_MAX)
  228                         task->ta_pending++;
  229                 TQ_UNLOCK(queue);
  230                 return (0);
  231         }
  232 
  233         /*
  234          * Optimise cases when all tasks use small set of priorities.
  235          * In case of only one priority we always insert at the end.
  236          * In case of two tq_hint typically gives the insertion point.
  237          * In case of more then two tq_hint should halve the search.
  238          */
  239         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  240         if (!prev || prev->ta_priority >= task->ta_priority) {
  241                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  242         } else {
  243                 prev = queue->tq_hint;
  244                 if (prev && prev->ta_priority >= task->ta_priority) {
  245                         ins = STAILQ_NEXT(prev, ta_link);
  246                 } else {
  247                         prev = NULL;
  248                         ins = STAILQ_FIRST(&queue->tq_queue);
  249                 }
  250                 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  251                         if (ins->ta_priority < task->ta_priority)
  252                                 break;
  253 
  254                 if (prev) {
  255                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  256                         queue->tq_hint = task;
  257                 } else
  258                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  259         }
  260 
  261         task->ta_pending = 1;
  262         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  263                 TQ_UNLOCK(queue);
  264         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  265                 queue->tq_enqueue(queue->tq_context);
  266         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  267                 TQ_UNLOCK(queue);
  268 
  269         /* Return with lock released. */
  270         return (0);
  271 }
  272 
  273 int
  274 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  275 {
  276         int res;
  277 
  278         TQ_LOCK(queue);
  279         res = taskqueue_enqueue_locked(queue, task);
  280         /* The lock is released inside. */
  281 
  282         return (res);
  283 }
  284 
  285 static void
  286 taskqueue_timeout_func(void *arg)
  287 {
  288         struct taskqueue *queue;
  289         struct timeout_task *timeout_task;
  290 
  291         timeout_task = arg;
  292         queue = timeout_task->q;
  293         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  294         timeout_task->f &= ~DT_CALLOUT_ARMED;
  295         queue->tq_callouts--;
  296         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  297         /* The lock is released inside. */
  298 }
  299 
  300 int
  301 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
  302     struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
  303 {
  304         int res;
  305 
  306         TQ_LOCK(queue);
  307         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  308             ("Migrated queue"));
  309         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  310         timeout_task->q = queue;
  311         res = timeout_task->t.ta_pending;
  312         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  313                 /* Do nothing */
  314                 TQ_UNLOCK(queue);
  315                 res = -1;
  316         } else if (sbt == 0) {
  317                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  318                 /* The lock is released inside. */
  319         } else {
  320                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  321                         res++;
  322                 } else {
  323                         queue->tq_callouts++;
  324                         timeout_task->f |= DT_CALLOUT_ARMED;
  325                         if (sbt < 0)
  326                                 sbt = -sbt; /* Ignore overflow. */
  327                 }
  328                 if (sbt > 0) {
  329                         callout_reset_sbt(&timeout_task->c, sbt, pr,
  330                             taskqueue_timeout_func, timeout_task, flags);
  331                 }
  332                 TQ_UNLOCK(queue);
  333         }
  334         return (res);
  335 }
  336 
  337 int
  338 taskqueue_enqueue_timeout(struct taskqueue *queue,
  339     struct timeout_task *ttask, int ticks)
  340 {
  341 
  342         return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
  343             0, 0));
  344 }
  345 
  346 static void
  347 taskqueue_task_nop_fn(void *context, int pending)
  348 {
  349 }
  350 
  351 /*
  352  * Block until all currently queued tasks in this taskqueue
  353  * have begun execution.  Tasks queued during execution of
  354  * this function are ignored.
  355  */
  356 static int
  357 taskqueue_drain_tq_queue(struct taskqueue *queue)
  358 {
  359         struct task t_barrier;
  360 
  361         if (STAILQ_EMPTY(&queue->tq_queue))
  362                 return (0);
  363 
  364         /*
  365          * Enqueue our barrier after all current tasks, but with
  366          * the highest priority so that newly queued tasks cannot
  367          * pass it.  Because of the high priority, we can not use
  368          * taskqueue_enqueue_locked directly (which drops the lock
  369          * anyway) so just insert it at tail while we have the
  370          * queue lock.
  371          */
  372         TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
  373         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  374         queue->tq_hint = &t_barrier;
  375         t_barrier.ta_pending = 1;
  376 
  377         /*
  378          * Once the barrier has executed, all previously queued tasks
  379          * have completed or are currently executing.
  380          */
  381         while (t_barrier.ta_pending != 0)
  382                 TQ_SLEEP(queue, &t_barrier, "tq_qdrain");
  383         return (1);
  384 }
  385 
  386 /*
  387  * Block until all currently executing tasks for this taskqueue
  388  * complete.  Tasks that begin execution during the execution
  389  * of this function are ignored.
  390  */
  391 static int
  392 taskqueue_drain_tq_active(struct taskqueue *queue)
  393 {
  394         struct taskqueue_busy *tb;
  395         u_int seq;
  396 
  397         if (LIST_EMPTY(&queue->tq_active))
  398                 return (0);
  399 
  400         /* Block taskq_terminate().*/
  401         queue->tq_callouts++;
  402 
  403         /* Wait for any active task with sequence from the past. */
  404         seq = queue->tq_seq;
  405 restart:
  406         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  407                 if ((int)(tb->tb_seq - seq) <= 0) {
  408                         TQ_SLEEP(queue, tb->tb_running, "tq_adrain");
  409                         goto restart;
  410                 }
  411         }
  412 
  413         /* Release taskqueue_terminate(). */
  414         queue->tq_callouts--;
  415         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  416                 wakeup_one(queue->tq_threads);
  417         return (1);
  418 }
  419 
  420 void
  421 taskqueue_block(struct taskqueue *queue)
  422 {
  423 
  424         TQ_LOCK(queue);
  425         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  426         TQ_UNLOCK(queue);
  427 }
  428 
  429 void
  430 taskqueue_unblock(struct taskqueue *queue)
  431 {
  432 
  433         TQ_LOCK(queue);
  434         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  435         if (!STAILQ_EMPTY(&queue->tq_queue))
  436                 queue->tq_enqueue(queue->tq_context);
  437         TQ_UNLOCK(queue);
  438 }
  439 
  440 static void
  441 taskqueue_run_locked(struct taskqueue *queue)
  442 {
  443         struct taskqueue_busy tb;
  444         struct task *task;
  445         int pending;
  446 
  447         KASSERT(queue != NULL, ("tq is NULL"));
  448         TQ_ASSERT_LOCKED(queue);
  449         tb.tb_running = NULL;
  450         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
  451 
  452         while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
  453                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  454                 if (queue->tq_hint == task)
  455                         queue->tq_hint = NULL;
  456                 pending = task->ta_pending;
  457                 task->ta_pending = 0;
  458                 tb.tb_running = task;
  459                 tb.tb_seq = ++queue->tq_seq;
  460                 TQ_UNLOCK(queue);
  461 
  462                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
  463                 task->ta_func(task->ta_context, pending);
  464 
  465                 TQ_LOCK(queue);
  466                 wakeup(task);
  467         }
  468         LIST_REMOVE(&tb, tb_link);
  469 }
  470 
  471 void
  472 taskqueue_run(struct taskqueue *queue)
  473 {
  474 
  475         TQ_LOCK(queue);
  476         taskqueue_run_locked(queue);
  477         TQ_UNLOCK(queue);
  478 }
  479 
  480 static int
  481 task_is_running(struct taskqueue *queue, struct task *task)
  482 {
  483         struct taskqueue_busy *tb;
  484 
  485         TQ_ASSERT_LOCKED(queue);
  486         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  487                 if (tb->tb_running == task)
  488                         return (1);
  489         }
  490         return (0);
  491 }
  492 
  493 /*
  494  * Only use this function in single threaded contexts. It returns
  495  * non-zero if the given task is either pending or running. Else the
  496  * task is idle and can be queued again or freed.
  497  */
  498 int
  499 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  500 {
  501         int retval;
  502 
  503         TQ_LOCK(queue);
  504         retval = task->ta_pending > 0 || task_is_running(queue, task);
  505         TQ_UNLOCK(queue);
  506 
  507         return (retval);
  508 }
  509 
  510 static int
  511 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  512     u_int *pendp)
  513 {
  514 
  515         if (task->ta_pending > 0) {
  516                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  517                 if (queue->tq_hint == task)
  518                         queue->tq_hint = NULL;
  519         }
  520         if (pendp != NULL)
  521                 *pendp = task->ta_pending;
  522         task->ta_pending = 0;
  523         return (task_is_running(queue, task) ? EBUSY : 0);
  524 }
  525 
  526 int
  527 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  528 {
  529         int error;
  530 
  531         TQ_LOCK(queue);
  532         error = taskqueue_cancel_locked(queue, task, pendp);
  533         TQ_UNLOCK(queue);
  534 
  535         return (error);
  536 }
  537 
  538 int
  539 taskqueue_cancel_timeout(struct taskqueue *queue,
  540     struct timeout_task *timeout_task, u_int *pendp)
  541 {
  542         u_int pending, pending1;
  543         int error;
  544 
  545         TQ_LOCK(queue);
  546         pending = !!(callout_stop(&timeout_task->c) > 0);
  547         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  548         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  549                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  550                 queue->tq_callouts--;
  551         }
  552         TQ_UNLOCK(queue);
  553 
  554         if (pendp != NULL)
  555                 *pendp = pending + pending1;
  556         return (error);
  557 }
  558 
  559 void
  560 taskqueue_drain(struct taskqueue *queue, struct task *task)
  561 {
  562 
  563         if (!queue->tq_spin)
  564                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  565 
  566         TQ_LOCK(queue);
  567         while (task->ta_pending != 0 || task_is_running(queue, task))
  568                 TQ_SLEEP(queue, task, "tq_drain");
  569         TQ_UNLOCK(queue);
  570 }
  571 
  572 void
  573 taskqueue_drain_all(struct taskqueue *queue)
  574 {
  575 
  576         if (!queue->tq_spin)
  577                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  578 
  579         TQ_LOCK(queue);
  580         (void)taskqueue_drain_tq_queue(queue);
  581         (void)taskqueue_drain_tq_active(queue);
  582         TQ_UNLOCK(queue);
  583 }
  584 
  585 void
  586 taskqueue_drain_timeout(struct taskqueue *queue,
  587     struct timeout_task *timeout_task)
  588 {
  589 
  590         /*
  591          * Set flag to prevent timer from re-starting during drain:
  592          */
  593         TQ_LOCK(queue);
  594         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  595             ("Drain already in progress"));
  596         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  597         TQ_UNLOCK(queue);
  598 
  599         callout_drain(&timeout_task->c);
  600         taskqueue_drain(queue, &timeout_task->t);
  601 
  602         /*
  603          * Clear flag to allow timer to re-start:
  604          */
  605         TQ_LOCK(queue);
  606         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  607         TQ_UNLOCK(queue);
  608 }
  609 
  610 void
  611 taskqueue_quiesce(struct taskqueue *queue)
  612 {
  613         int ret;
  614 
  615         TQ_LOCK(queue);
  616         do {
  617                 ret = taskqueue_drain_tq_queue(queue);
  618                 if (ret == 0)
  619                         ret = taskqueue_drain_tq_active(queue);
  620         } while (ret != 0);
  621         TQ_UNLOCK(queue);
  622 }
  623 
  624 static void
  625 taskqueue_swi_enqueue(void *context)
  626 {
  627         swi_sched(taskqueue_ih, 0);
  628 }
  629 
  630 static void
  631 taskqueue_swi_run(void *dummy)
  632 {
  633         taskqueue_run(taskqueue_swi);
  634 }
  635 
  636 static void
  637 taskqueue_swi_giant_enqueue(void *context)
  638 {
  639         swi_sched(taskqueue_giant_ih, 0);
  640 }
  641 
  642 static void
  643 taskqueue_swi_giant_run(void *dummy)
  644 {
  645         taskqueue_run(taskqueue_swi_giant);
  646 }
  647 
  648 static int
  649 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  650     cpuset_t *mask, const char *name, va_list ap)
  651 {
  652         char ktname[MAXCOMLEN + 1];
  653         struct thread *td;
  654         struct taskqueue *tq;
  655         int i, error;
  656 
  657         if (count <= 0)
  658                 return (EINVAL);
  659 
  660         vsnprintf(ktname, sizeof(ktname), name, ap);
  661         tq = *tqp;
  662 
  663         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  664             M_NOWAIT | M_ZERO);
  665         if (tq->tq_threads == NULL) {
  666                 printf("%s: no memory for %s threads\n", __func__, ktname);
  667                 return (ENOMEM);
  668         }
  669 
  670         for (i = 0; i < count; i++) {
  671                 if (count == 1)
  672                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  673                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  674                 else
  675                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  676                             &tq->tq_threads[i], RFSTOPPED, 0,
  677                             "%s_%d", ktname, i);
  678                 if (error) {
  679                         /* should be ok to continue, taskqueue_free will dtrt */
  680                         printf("%s: kthread_add(%s): error %d", __func__,
  681                             ktname, error);
  682                         tq->tq_threads[i] = NULL;               /* paranoid */
  683                 } else
  684                         tq->tq_tcount++;
  685         }
  686         if (tq->tq_tcount == 0) {
  687                 free(tq->tq_threads, M_TASKQUEUE);
  688                 tq->tq_threads = NULL;
  689                 return (ENOMEM);
  690         }
  691         for (i = 0; i < count; i++) {
  692                 if (tq->tq_threads[i] == NULL)
  693                         continue;
  694                 td = tq->tq_threads[i];
  695                 if (mask) {
  696                         error = cpuset_setthread(td->td_tid, mask);
  697                         /*
  698                          * Failing to pin is rarely an actual fatal error;
  699                          * it'll just affect performance.
  700                          */
  701                         if (error)
  702                                 printf("%s: curthread=%llu: can't pin; "
  703                                     "error=%d\n",
  704                                     __func__,
  705                                     (unsigned long long) td->td_tid,
  706                                     error);
  707                 }
  708                 thread_lock(td);
  709                 sched_prio(td, pri);
  710                 sched_add(td, SRQ_BORING);
  711                 thread_unlock(td);
  712         }
  713 
  714         return (0);
  715 }
  716 
  717 int
  718 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  719     const char *name, ...)
  720 {
  721         va_list ap;
  722         int error;
  723 
  724         va_start(ap, name);
  725         error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  726         va_end(ap);
  727         return (error);
  728 }
  729 
  730 int
  731 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
  732     cpuset_t *mask, const char *name, ...)
  733 {
  734         va_list ap;
  735         int error;
  736 
  737         va_start(ap, name);
  738         error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
  739         va_end(ap);
  740         return (error);
  741 }
  742 
  743 static inline void
  744 taskqueue_run_callback(struct taskqueue *tq,
  745     enum taskqueue_callback_type cb_type)
  746 {
  747         taskqueue_callback_fn tq_callback;
  748 
  749         TQ_ASSERT_UNLOCKED(tq);
  750         tq_callback = tq->tq_callbacks[cb_type];
  751         if (tq_callback != NULL)
  752                 tq_callback(tq->tq_cb_contexts[cb_type]);
  753 }
  754 
  755 void
  756 taskqueue_thread_loop(void *arg)
  757 {
  758         struct taskqueue **tqp, *tq;
  759 
  760         tqp = arg;
  761         tq = *tqp;
  762         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  763         TQ_LOCK(tq);
  764         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  765                 /* XXX ? */
  766                 taskqueue_run_locked(tq);
  767                 /*
  768                  * Because taskqueue_run() can drop tq_mutex, we need to
  769                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  770                  * meantime, which means we missed a wakeup.
  771                  */
  772                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  773                         break;
  774                 TQ_SLEEP(tq, tq, "-");
  775         }
  776         taskqueue_run_locked(tq);
  777         /*
  778          * This thread is on its way out, so just drop the lock temporarily
  779          * in order to call the shutdown callback.  This allows the callback
  780          * to look at the taskqueue, even just before it dies.
  781          */
  782         TQ_UNLOCK(tq);
  783         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  784         TQ_LOCK(tq);
  785 
  786         /* rendezvous with thread that asked us to terminate */
  787         tq->tq_tcount--;
  788         wakeup_one(tq->tq_threads);
  789         TQ_UNLOCK(tq);
  790         kthread_exit();
  791 }
  792 
  793 void
  794 taskqueue_thread_enqueue(void *context)
  795 {
  796         struct taskqueue **tqp, *tq;
  797 
  798         tqp = context;
  799         tq = *tqp;
  800         wakeup_any(tq);
  801 }
  802 
  803 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  804                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  805                      INTR_MPSAFE, &taskqueue_ih));
  806 
  807 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  808                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  809                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
  810 
  811 TASKQUEUE_DEFINE_THREAD(thread);
  812 
  813 struct taskqueue *
  814 taskqueue_create_fast(const char *name, int mflags,
  815                  taskqueue_enqueue_fn enqueue, void *context)
  816 {
  817         return _taskqueue_create(name, mflags, enqueue, context,
  818                         MTX_SPIN, "fast_taskqueue");
  819 }
  820 
  821 static void     *taskqueue_fast_ih;
  822 
  823 static void
  824 taskqueue_fast_enqueue(void *context)
  825 {
  826         swi_sched(taskqueue_fast_ih, 0);
  827 }
  828 
  829 static void
  830 taskqueue_fast_run(void *dummy)
  831 {
  832         taskqueue_run(taskqueue_fast);
  833 }
  834 
  835 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  836         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  837         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  838 
  839 int
  840 taskqueue_member(struct taskqueue *queue, struct thread *td)
  841 {
  842         int i, j, ret = 0;
  843 
  844         for (i = 0, j = 0; ; i++) {
  845                 if (queue->tq_threads[i] == NULL)
  846                         continue;
  847                 if (queue->tq_threads[i] == td) {
  848                         ret = 1;
  849                         break;
  850                 }
  851                 if (++j >= queue->tq_tcount)
  852                         break;
  853         }
  854         return (ret);
  855 }

Cache object: 4a977f321c6d49379c8cd63cc879c4ca


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.