The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
    3  *
    4  * Copyright (c) 2000 Doug Rabson
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD$");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/epoch.h>
   46 #include <sys/sched.h>
   47 #include <sys/smp.h>
   48 #include <sys/taskqueue.h>
   49 #include <sys/unistd.h>
   50 #include <machine/stdarg.h>
   51 
   52 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   53 static void     *taskqueue_giant_ih;
   54 static void     *taskqueue_ih;
   55 static void      taskqueue_fast_enqueue(void *);
   56 static void      taskqueue_swi_enqueue(void *);
   57 static void      taskqueue_swi_giant_enqueue(void *);
   58 
   59 struct taskqueue_busy {
   60         struct task             *tb_running;
   61         u_int                    tb_seq;
   62         LIST_ENTRY(taskqueue_busy) tb_link;
   63 };
   64 
   65 struct taskqueue {
   66         STAILQ_HEAD(, task)     tq_queue;
   67         LIST_HEAD(, taskqueue_busy) tq_active;
   68         struct task             *tq_hint;
   69         u_int                   tq_seq;
   70         int                     tq_callouts;
   71         struct mtx_padalign     tq_mutex;
   72         taskqueue_enqueue_fn    tq_enqueue;
   73         void                    *tq_context;
   74         char                    *tq_name;
   75         struct thread           **tq_threads;
   76         int                     tq_tcount;
   77         int                     tq_spin;
   78         int                     tq_flags;
   79         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   80         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   81 };
   82 
   83 #define TQ_FLAGS_ACTIVE         (1 << 0)
   84 #define TQ_FLAGS_BLOCKED        (1 << 1)
   85 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   86 
   87 #define DT_CALLOUT_ARMED        (1 << 0)
   88 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   89 
   90 #define TQ_LOCK(tq)                                                     \
   91         do {                                                            \
   92                 if ((tq)->tq_spin)                                      \
   93                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   94                 else                                                    \
   95                         mtx_lock(&(tq)->tq_mutex);                      \
   96         } while (0)
   97 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   98 
   99 #define TQ_UNLOCK(tq)                                                   \
  100         do {                                                            \
  101                 if ((tq)->tq_spin)                                      \
  102                         mtx_unlock_spin(&(tq)->tq_mutex);               \
  103                 else                                                    \
  104                         mtx_unlock(&(tq)->tq_mutex);                    \
  105         } while (0)
  106 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  107 
  108 void
  109 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  110     int priority, task_fn_t func, void *context)
  111 {
  112 
  113         TASK_INIT(&timeout_task->t, priority, func, context);
  114         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  115             CALLOUT_RETURNUNLOCKED);
  116         timeout_task->q = queue;
  117         timeout_task->f = 0;
  118 }
  119 
  120 static __inline int
  121 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm)
  122 {
  123         if (tq->tq_spin)
  124                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
  125         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
  126 }
  127 
  128 static struct taskqueue *
  129 _taskqueue_create(const char *name, int mflags,
  130                  taskqueue_enqueue_fn enqueue, void *context,
  131                  int mtxflags, const char *mtxname __unused)
  132 {
  133         struct taskqueue *queue;
  134         char *tq_name;
  135 
  136         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
  137         if (tq_name == NULL)
  138                 return (NULL);
  139 
  140         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  141         if (queue == NULL) {
  142                 free(tq_name, M_TASKQUEUE);
  143                 return (NULL);
  144         }
  145 
  146         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  147 
  148         STAILQ_INIT(&queue->tq_queue);
  149         LIST_INIT(&queue->tq_active);
  150         queue->tq_enqueue = enqueue;
  151         queue->tq_context = context;
  152         queue->tq_name = tq_name;
  153         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  154         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  155         if (enqueue == taskqueue_fast_enqueue ||
  156             enqueue == taskqueue_swi_enqueue ||
  157             enqueue == taskqueue_swi_giant_enqueue ||
  158             enqueue == taskqueue_thread_enqueue)
  159                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  160         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  161 
  162         return (queue);
  163 }
  164 
  165 struct taskqueue *
  166 taskqueue_create(const char *name, int mflags,
  167                  taskqueue_enqueue_fn enqueue, void *context)
  168 {
  169 
  170         return _taskqueue_create(name, mflags, enqueue, context,
  171                         MTX_DEF, name);
  172 }
  173 
  174 void
  175 taskqueue_set_callback(struct taskqueue *queue,
  176     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  177     void *context)
  178 {
  179 
  180         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  181             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  182             ("Callback type %d not valid, must be %d-%d", cb_type,
  183             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  184         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  185             ("Re-initialization of taskqueue callback?"));
  186 
  187         queue->tq_callbacks[cb_type] = callback;
  188         queue->tq_cb_contexts[cb_type] = context;
  189 }
  190 
  191 /*
  192  * Signal a taskqueue thread to terminate.
  193  */
  194 static void
  195 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  196 {
  197 
  198         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  199                 wakeup(tq);
  200                 TQ_SLEEP(tq, pp, "tq_destroy");
  201         }
  202 }
  203 
  204 void
  205 taskqueue_free(struct taskqueue *queue)
  206 {
  207 
  208         TQ_LOCK(queue);
  209         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  210         taskqueue_terminate(queue->tq_threads, queue);
  211         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
  212         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  213         mtx_destroy(&queue->tq_mutex);
  214         free(queue->tq_threads, M_TASKQUEUE);
  215         free(queue->tq_name, M_TASKQUEUE);
  216         free(queue, M_TASKQUEUE);
  217 }
  218 
  219 static int
  220 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  221 {
  222         struct task *ins;
  223         struct task *prev;
  224 
  225         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
  226         /*
  227          * Count multiple enqueues.
  228          */
  229         if (task->ta_pending) {
  230                 if (task->ta_pending < USHRT_MAX)
  231                         task->ta_pending++;
  232                 TQ_UNLOCK(queue);
  233                 return (0);
  234         }
  235 
  236         /*
  237          * Optimise cases when all tasks use small set of priorities.
  238          * In case of only one priority we always insert at the end.
  239          * In case of two tq_hint typically gives the insertion point.
  240          * In case of more then two tq_hint should halve the search.
  241          */
  242         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  243         if (!prev || prev->ta_priority >= task->ta_priority) {
  244                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  245         } else {
  246                 prev = queue->tq_hint;
  247                 if (prev && prev->ta_priority >= task->ta_priority) {
  248                         ins = STAILQ_NEXT(prev, ta_link);
  249                 } else {
  250                         prev = NULL;
  251                         ins = STAILQ_FIRST(&queue->tq_queue);
  252                 }
  253                 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  254                         if (ins->ta_priority < task->ta_priority)
  255                                 break;
  256 
  257                 if (prev) {
  258                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  259                         queue->tq_hint = task;
  260                 } else
  261                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  262         }
  263 
  264         task->ta_pending = 1;
  265         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  266                 TQ_UNLOCK(queue);
  267         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  268                 queue->tq_enqueue(queue->tq_context);
  269         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  270                 TQ_UNLOCK(queue);
  271 
  272         /* Return with lock released. */
  273         return (0);
  274 }
  275 
  276 int
  277 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  278 {
  279         int res;
  280 
  281         TQ_LOCK(queue);
  282         res = taskqueue_enqueue_locked(queue, task);
  283         /* The lock is released inside. */
  284 
  285         return (res);
  286 }
  287 
  288 static void
  289 taskqueue_timeout_func(void *arg)
  290 {
  291         struct taskqueue *queue;
  292         struct timeout_task *timeout_task;
  293 
  294         timeout_task = arg;
  295         queue = timeout_task->q;
  296         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  297         timeout_task->f &= ~DT_CALLOUT_ARMED;
  298         queue->tq_callouts--;
  299         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  300         /* The lock is released inside. */
  301 }
  302 
  303 int
  304 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
  305     struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
  306 {
  307         int res;
  308 
  309         TQ_LOCK(queue);
  310         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  311             ("Migrated queue"));
  312         timeout_task->q = queue;
  313         res = timeout_task->t.ta_pending;
  314         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  315                 /* Do nothing */
  316                 TQ_UNLOCK(queue);
  317                 res = -1;
  318         } else if (sbt == 0) {
  319                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  320                 /* The lock is released inside. */
  321         } else {
  322                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  323                         res++;
  324                 } else {
  325                         queue->tq_callouts++;
  326                         timeout_task->f |= DT_CALLOUT_ARMED;
  327                         if (sbt < 0)
  328                                 sbt = -sbt; /* Ignore overflow. */
  329                 }
  330                 if (sbt > 0) {
  331                         if (queue->tq_spin)
  332                                 flags |= C_DIRECT_EXEC;
  333                         callout_reset_sbt(&timeout_task->c, sbt, pr,
  334                             taskqueue_timeout_func, timeout_task, flags);
  335                 }
  336                 TQ_UNLOCK(queue);
  337         }
  338         return (res);
  339 }
  340 
  341 int
  342 taskqueue_enqueue_timeout(struct taskqueue *queue,
  343     struct timeout_task *ttask, int ticks)
  344 {
  345 
  346         return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
  347             0, C_HARDCLOCK));
  348 }
  349 
  350 static void
  351 taskqueue_task_nop_fn(void *context, int pending)
  352 {
  353 }
  354 
  355 /*
  356  * Block until all currently queued tasks in this taskqueue
  357  * have begun execution.  Tasks queued during execution of
  358  * this function are ignored.
  359  */
  360 static int
  361 taskqueue_drain_tq_queue(struct taskqueue *queue)
  362 {
  363         struct task t_barrier;
  364 
  365         if (STAILQ_EMPTY(&queue->tq_queue))
  366                 return (0);
  367 
  368         /*
  369          * Enqueue our barrier after all current tasks, but with
  370          * the highest priority so that newly queued tasks cannot
  371          * pass it.  Because of the high priority, we can not use
  372          * taskqueue_enqueue_locked directly (which drops the lock
  373          * anyway) so just insert it at tail while we have the
  374          * queue lock.
  375          */
  376         TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier);
  377         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  378         queue->tq_hint = &t_barrier;
  379         t_barrier.ta_pending = 1;
  380 
  381         /*
  382          * Once the barrier has executed, all previously queued tasks
  383          * have completed or are currently executing.
  384          */
  385         while (t_barrier.ta_pending != 0)
  386                 TQ_SLEEP(queue, &t_barrier, "tq_qdrain");
  387         return (1);
  388 }
  389 
  390 /*
  391  * Block until all currently executing tasks for this taskqueue
  392  * complete.  Tasks that begin execution during the execution
  393  * of this function are ignored.
  394  */
  395 static int
  396 taskqueue_drain_tq_active(struct taskqueue *queue)
  397 {
  398         struct taskqueue_busy *tb;
  399         u_int seq;
  400 
  401         if (LIST_EMPTY(&queue->tq_active))
  402                 return (0);
  403 
  404         /* Block taskq_terminate().*/
  405         queue->tq_callouts++;
  406 
  407         /* Wait for any active task with sequence from the past. */
  408         seq = queue->tq_seq;
  409 restart:
  410         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  411                 if ((int)(tb->tb_seq - seq) <= 0) {
  412                         TQ_SLEEP(queue, tb->tb_running, "tq_adrain");
  413                         goto restart;
  414                 }
  415         }
  416 
  417         /* Release taskqueue_terminate(). */
  418         queue->tq_callouts--;
  419         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  420                 wakeup_one(queue->tq_threads);
  421         return (1);
  422 }
  423 
  424 void
  425 taskqueue_block(struct taskqueue *queue)
  426 {
  427 
  428         TQ_LOCK(queue);
  429         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  430         TQ_UNLOCK(queue);
  431 }
  432 
  433 void
  434 taskqueue_unblock(struct taskqueue *queue)
  435 {
  436 
  437         TQ_LOCK(queue);
  438         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  439         if (!STAILQ_EMPTY(&queue->tq_queue))
  440                 queue->tq_enqueue(queue->tq_context);
  441         TQ_UNLOCK(queue);
  442 }
  443 
  444 static void
  445 taskqueue_run_locked(struct taskqueue *queue)
  446 {
  447         struct epoch_tracker et;
  448         struct taskqueue_busy tb;
  449         struct task *task;
  450         bool in_net_epoch;
  451         int pending;
  452 
  453         KASSERT(queue != NULL, ("tq is NULL"));
  454         TQ_ASSERT_LOCKED(queue);
  455         tb.tb_running = NULL;
  456         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
  457         in_net_epoch = false;
  458 
  459         while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
  460                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  461                 if (queue->tq_hint == task)
  462                         queue->tq_hint = NULL;
  463                 pending = task->ta_pending;
  464                 task->ta_pending = 0;
  465                 tb.tb_running = task;
  466                 tb.tb_seq = ++queue->tq_seq;
  467                 TQ_UNLOCK(queue);
  468 
  469                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
  470                 if (!in_net_epoch && TASK_IS_NET(task)) {
  471                         in_net_epoch = true;
  472                         NET_EPOCH_ENTER(et);
  473                 } else if (in_net_epoch && !TASK_IS_NET(task)) {
  474                         NET_EPOCH_EXIT(et);
  475                         in_net_epoch = false;
  476                 }
  477                 task->ta_func(task->ta_context, pending);
  478 
  479                 TQ_LOCK(queue);
  480                 wakeup(task);
  481         }
  482         if (in_net_epoch)
  483                 NET_EPOCH_EXIT(et);
  484         LIST_REMOVE(&tb, tb_link);
  485 }
  486 
  487 void
  488 taskqueue_run(struct taskqueue *queue)
  489 {
  490 
  491         TQ_LOCK(queue);
  492         taskqueue_run_locked(queue);
  493         TQ_UNLOCK(queue);
  494 }
  495 
  496 static int
  497 task_is_running(struct taskqueue *queue, struct task *task)
  498 {
  499         struct taskqueue_busy *tb;
  500 
  501         TQ_ASSERT_LOCKED(queue);
  502         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  503                 if (tb->tb_running == task)
  504                         return (1);
  505         }
  506         return (0);
  507 }
  508 
  509 /*
  510  * Only use this function in single threaded contexts. It returns
  511  * non-zero if the given task is either pending or running. Else the
  512  * task is idle and can be queued again or freed.
  513  */
  514 int
  515 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  516 {
  517         int retval;
  518 
  519         TQ_LOCK(queue);
  520         retval = task->ta_pending > 0 || task_is_running(queue, task);
  521         TQ_UNLOCK(queue);
  522 
  523         return (retval);
  524 }
  525 
  526 static int
  527 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  528     u_int *pendp)
  529 {
  530 
  531         if (task->ta_pending > 0) {
  532                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  533                 if (queue->tq_hint == task)
  534                         queue->tq_hint = NULL;
  535         }
  536         if (pendp != NULL)
  537                 *pendp = task->ta_pending;
  538         task->ta_pending = 0;
  539         return (task_is_running(queue, task) ? EBUSY : 0);
  540 }
  541 
  542 int
  543 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  544 {
  545         int error;
  546 
  547         TQ_LOCK(queue);
  548         error = taskqueue_cancel_locked(queue, task, pendp);
  549         TQ_UNLOCK(queue);
  550 
  551         return (error);
  552 }
  553 
  554 int
  555 taskqueue_cancel_timeout(struct taskqueue *queue,
  556     struct timeout_task *timeout_task, u_int *pendp)
  557 {
  558         u_int pending, pending1;
  559         int error;
  560 
  561         TQ_LOCK(queue);
  562         pending = !!(callout_stop(&timeout_task->c) > 0);
  563         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  564         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  565                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  566                 queue->tq_callouts--;
  567         }
  568         TQ_UNLOCK(queue);
  569 
  570         if (pendp != NULL)
  571                 *pendp = pending + pending1;
  572         return (error);
  573 }
  574 
  575 void
  576 taskqueue_drain(struct taskqueue *queue, struct task *task)
  577 {
  578 
  579         if (!queue->tq_spin)
  580                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  581 
  582         TQ_LOCK(queue);
  583         while (task->ta_pending != 0 || task_is_running(queue, task))
  584                 TQ_SLEEP(queue, task, "tq_drain");
  585         TQ_UNLOCK(queue);
  586 }
  587 
  588 void
  589 taskqueue_drain_all(struct taskqueue *queue)
  590 {
  591 
  592         if (!queue->tq_spin)
  593                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  594 
  595         TQ_LOCK(queue);
  596         (void)taskqueue_drain_tq_queue(queue);
  597         (void)taskqueue_drain_tq_active(queue);
  598         TQ_UNLOCK(queue);
  599 }
  600 
  601 void
  602 taskqueue_drain_timeout(struct taskqueue *queue,
  603     struct timeout_task *timeout_task)
  604 {
  605 
  606         /*
  607          * Set flag to prevent timer from re-starting during drain:
  608          */
  609         TQ_LOCK(queue);
  610         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  611             ("Drain already in progress"));
  612         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  613         TQ_UNLOCK(queue);
  614 
  615         callout_drain(&timeout_task->c);
  616         taskqueue_drain(queue, &timeout_task->t);
  617 
  618         /*
  619          * Clear flag to allow timer to re-start:
  620          */
  621         TQ_LOCK(queue);
  622         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  623         TQ_UNLOCK(queue);
  624 }
  625 
  626 void
  627 taskqueue_quiesce(struct taskqueue *queue)
  628 {
  629         int ret;
  630 
  631         TQ_LOCK(queue);
  632         do {
  633                 ret = taskqueue_drain_tq_queue(queue);
  634                 if (ret == 0)
  635                         ret = taskqueue_drain_tq_active(queue);
  636         } while (ret != 0);
  637         TQ_UNLOCK(queue);
  638 }
  639 
  640 static void
  641 taskqueue_swi_enqueue(void *context)
  642 {
  643         swi_sched(taskqueue_ih, 0);
  644 }
  645 
  646 static void
  647 taskqueue_swi_run(void *dummy)
  648 {
  649         taskqueue_run(taskqueue_swi);
  650 }
  651 
  652 static void
  653 taskqueue_swi_giant_enqueue(void *context)
  654 {
  655         swi_sched(taskqueue_giant_ih, 0);
  656 }
  657 
  658 static void
  659 taskqueue_swi_giant_run(void *dummy)
  660 {
  661         taskqueue_run(taskqueue_swi_giant);
  662 }
  663 
  664 static int
  665 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  666     cpuset_t *mask, struct proc *p, const char *name, va_list ap)
  667 {
  668         char ktname[MAXCOMLEN + 1];
  669         struct thread *td;
  670         struct taskqueue *tq;
  671         int i, error;
  672 
  673         if (count <= 0)
  674                 return (EINVAL);
  675 
  676         vsnprintf(ktname, sizeof(ktname), name, ap);
  677         tq = *tqp;
  678 
  679         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  680             M_NOWAIT | M_ZERO);
  681         if (tq->tq_threads == NULL) {
  682                 printf("%s: no memory for %s threads\n", __func__, ktname);
  683                 return (ENOMEM);
  684         }
  685 
  686         for (i = 0; i < count; i++) {
  687                 if (count == 1)
  688                         error = kthread_add(taskqueue_thread_loop, tqp, p,
  689                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  690                 else
  691                         error = kthread_add(taskqueue_thread_loop, tqp, p,
  692                             &tq->tq_threads[i], RFSTOPPED, 0,
  693                             "%s_%d", ktname, i);
  694                 if (error) {
  695                         /* should be ok to continue, taskqueue_free will dtrt */
  696                         printf("%s: kthread_add(%s): error %d", __func__,
  697                             ktname, error);
  698                         tq->tq_threads[i] = NULL;               /* paranoid */
  699                 } else
  700                         tq->tq_tcount++;
  701         }
  702         if (tq->tq_tcount == 0) {
  703                 free(tq->tq_threads, M_TASKQUEUE);
  704                 tq->tq_threads = NULL;
  705                 return (ENOMEM);
  706         }
  707         for (i = 0; i < count; i++) {
  708                 if (tq->tq_threads[i] == NULL)
  709                         continue;
  710                 td = tq->tq_threads[i];
  711                 if (mask) {
  712                         error = cpuset_setthread(td->td_tid, mask);
  713                         /*
  714                          * Failing to pin is rarely an actual fatal error;
  715                          * it'll just affect performance.
  716                          */
  717                         if (error)
  718                                 printf("%s: curthread=%llu: can't pin; "
  719                                     "error=%d\n",
  720                                     __func__,
  721                                     (unsigned long long) td->td_tid,
  722                                     error);
  723                 }
  724                 thread_lock(td);
  725                 sched_prio(td, pri);
  726                 sched_add(td, SRQ_BORING);
  727         }
  728 
  729         return (0);
  730 }
  731 
  732 int
  733 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  734     const char *name, ...)
  735 {
  736         va_list ap;
  737         int error;
  738 
  739         va_start(ap, name);
  740         error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap);
  741         va_end(ap);
  742         return (error);
  743 }
  744 
  745 int
  746 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri,
  747     struct proc *proc, const char *name, ...)
  748 {
  749         va_list ap;
  750         int error;
  751 
  752         va_start(ap, name);
  753         error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap);
  754         va_end(ap);
  755         return (error);
  756 }
  757 
  758 int
  759 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
  760     cpuset_t *mask, const char *name, ...)
  761 {
  762         va_list ap;
  763         int error;
  764 
  765         va_start(ap, name);
  766         error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap);
  767         va_end(ap);
  768         return (error);
  769 }
  770 
  771 static inline void
  772 taskqueue_run_callback(struct taskqueue *tq,
  773     enum taskqueue_callback_type cb_type)
  774 {
  775         taskqueue_callback_fn tq_callback;
  776 
  777         TQ_ASSERT_UNLOCKED(tq);
  778         tq_callback = tq->tq_callbacks[cb_type];
  779         if (tq_callback != NULL)
  780                 tq_callback(tq->tq_cb_contexts[cb_type]);
  781 }
  782 
  783 void
  784 taskqueue_thread_loop(void *arg)
  785 {
  786         struct taskqueue **tqp, *tq;
  787 
  788         tqp = arg;
  789         tq = *tqp;
  790         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  791         TQ_LOCK(tq);
  792         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  793                 /* XXX ? */
  794                 taskqueue_run_locked(tq);
  795                 /*
  796                  * Because taskqueue_run() can drop tq_mutex, we need to
  797                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  798                  * meantime, which means we missed a wakeup.
  799                  */
  800                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  801                         break;
  802                 TQ_SLEEP(tq, tq, "-");
  803         }
  804         taskqueue_run_locked(tq);
  805         /*
  806          * This thread is on its way out, so just drop the lock temporarily
  807          * in order to call the shutdown callback.  This allows the callback
  808          * to look at the taskqueue, even just before it dies.
  809          */
  810         TQ_UNLOCK(tq);
  811         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  812         TQ_LOCK(tq);
  813 
  814         /* rendezvous with thread that asked us to terminate */
  815         tq->tq_tcount--;
  816         wakeup_one(tq->tq_threads);
  817         TQ_UNLOCK(tq);
  818         kthread_exit();
  819 }
  820 
  821 void
  822 taskqueue_thread_enqueue(void *context)
  823 {
  824         struct taskqueue **tqp, *tq;
  825 
  826         tqp = context;
  827         tq = *tqp;
  828         wakeup_any(tq);
  829 }
  830 
  831 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  832                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  833                      INTR_MPSAFE, &taskqueue_ih));
  834 
  835 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  836                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  837                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
  838 
  839 TASKQUEUE_DEFINE_THREAD(thread);
  840 
  841 struct taskqueue *
  842 taskqueue_create_fast(const char *name, int mflags,
  843                  taskqueue_enqueue_fn enqueue, void *context)
  844 {
  845         return _taskqueue_create(name, mflags, enqueue, context,
  846                         MTX_SPIN, "fast_taskqueue");
  847 }
  848 
  849 static void     *taskqueue_fast_ih;
  850 
  851 static void
  852 taskqueue_fast_enqueue(void *context)
  853 {
  854         swi_sched(taskqueue_fast_ih, 0);
  855 }
  856 
  857 static void
  858 taskqueue_fast_run(void *dummy)
  859 {
  860         taskqueue_run(taskqueue_fast);
  861 }
  862 
  863 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  864         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  865         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  866 
  867 int
  868 taskqueue_member(struct taskqueue *queue, struct thread *td)
  869 {
  870         int i, j, ret = 0;
  871 
  872         for (i = 0, j = 0; ; i++) {
  873                 if (queue->tq_threads[i] == NULL)
  874                         continue;
  875                 if (queue->tq_threads[i] == td) {
  876                         ret = 1;
  877                         break;
  878                 }
  879                 if (++j >= queue->tq_tcount)
  880                         break;
  881         }
  882         return (ret);
  883 }

Cache object: 27bcde3b9129fe52f27aa88ee1191a6b


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.