The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
    3  *
    4  * Copyright (c) 2000 Doug Rabson
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD$");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/epoch.h>
   46 #include <sys/sched.h>
   47 #include <sys/smp.h>
   48 #include <sys/taskqueue.h>
   49 #include <sys/unistd.h>
   50 #include <machine/stdarg.h>
   51 
   52 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   53 static void     *taskqueue_giant_ih;
   54 static void     *taskqueue_ih;
   55 static void      taskqueue_fast_enqueue(void *);
   56 static void      taskqueue_swi_enqueue(void *);
   57 static void      taskqueue_swi_giant_enqueue(void *);
   58 
   59 struct taskqueue_busy {
   60         struct task             *tb_running;
   61         u_int                    tb_seq;
   62         LIST_ENTRY(taskqueue_busy) tb_link;
   63 };
   64 
   65 struct taskqueue {
   66         STAILQ_HEAD(, task)     tq_queue;
   67         LIST_HEAD(, taskqueue_busy) tq_active;
   68         struct task             *tq_hint;
   69         u_int                   tq_seq;
   70         int                     tq_callouts;
   71         struct mtx_padalign     tq_mutex;
   72         taskqueue_enqueue_fn    tq_enqueue;
   73         void                    *tq_context;
   74         char                    *tq_name;
   75         struct thread           **tq_threads;
   76         int                     tq_tcount;
   77         int                     tq_spin;
   78         int                     tq_flags;
   79         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   80         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   81 };
   82 
   83 #define TQ_FLAGS_ACTIVE         (1 << 0)
   84 #define TQ_FLAGS_BLOCKED        (1 << 1)
   85 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   86 
   87 #define DT_CALLOUT_ARMED        (1 << 0)
   88 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   89 
   90 #define TQ_LOCK(tq)                                                     \
   91         do {                                                            \
   92                 if ((tq)->tq_spin)                                      \
   93                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   94                 else                                                    \
   95                         mtx_lock(&(tq)->tq_mutex);                      \
   96         } while (0)
   97 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   98 
   99 #define TQ_UNLOCK(tq)                                                   \
  100         do {                                                            \
  101                 if ((tq)->tq_spin)                                      \
  102                         mtx_unlock_spin(&(tq)->tq_mutex);               \
  103                 else                                                    \
  104                         mtx_unlock(&(tq)->tq_mutex);                    \
  105         } while (0)
  106 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  107 
  108 void
  109 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  110     int priority, task_fn_t func, void *context)
  111 {
  112 
  113         TASK_INIT(&timeout_task->t, priority, func, context);
  114         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  115             CALLOUT_RETURNUNLOCKED);
  116         timeout_task->q = queue;
  117         timeout_task->f = 0;
  118 }
  119 
  120 static __inline int
  121 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm)
  122 {
  123         if (tq->tq_spin)
  124                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
  125         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
  126 }
  127 
  128 static struct taskqueue *
  129 _taskqueue_create(const char *name, int mflags,
  130                  taskqueue_enqueue_fn enqueue, void *context,
  131                  int mtxflags, const char *mtxname __unused)
  132 {
  133         struct taskqueue *queue;
  134         char *tq_name;
  135 
  136         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
  137         if (tq_name == NULL)
  138                 return (NULL);
  139 
  140         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  141         if (queue == NULL) {
  142                 free(tq_name, M_TASKQUEUE);
  143                 return (NULL);
  144         }
  145 
  146         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  147 
  148         STAILQ_INIT(&queue->tq_queue);
  149         LIST_INIT(&queue->tq_active);
  150         queue->tq_enqueue = enqueue;
  151         queue->tq_context = context;
  152         queue->tq_name = tq_name;
  153         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  154         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  155         if (enqueue == taskqueue_fast_enqueue ||
  156             enqueue == taskqueue_swi_enqueue ||
  157             enqueue == taskqueue_swi_giant_enqueue ||
  158             enqueue == taskqueue_thread_enqueue)
  159                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  160         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  161 
  162         return (queue);
  163 }
  164 
  165 struct taskqueue *
  166 taskqueue_create(const char *name, int mflags,
  167                  taskqueue_enqueue_fn enqueue, void *context)
  168 {
  169 
  170         return _taskqueue_create(name, mflags, enqueue, context,
  171                         MTX_DEF, name);
  172 }
  173 
  174 void
  175 taskqueue_set_callback(struct taskqueue *queue,
  176     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  177     void *context)
  178 {
  179 
  180         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  181             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  182             ("Callback type %d not valid, must be %d-%d", cb_type,
  183             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  184         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  185             ("Re-initialization of taskqueue callback?"));
  186 
  187         queue->tq_callbacks[cb_type] = callback;
  188         queue->tq_cb_contexts[cb_type] = context;
  189 }
  190 
  191 /*
  192  * Signal a taskqueue thread to terminate.
  193  */
  194 static void
  195 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  196 {
  197 
  198         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  199                 wakeup(tq);
  200                 TQ_SLEEP(tq, pp, "tq_destroy");
  201         }
  202 }
  203 
  204 void
  205 taskqueue_free(struct taskqueue *queue)
  206 {
  207 
  208         TQ_LOCK(queue);
  209         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  210         taskqueue_terminate(queue->tq_threads, queue);
  211         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
  212         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  213         mtx_destroy(&queue->tq_mutex);
  214         free(queue->tq_threads, M_TASKQUEUE);
  215         free(queue->tq_name, M_TASKQUEUE);
  216         free(queue, M_TASKQUEUE);
  217 }
  218 
  219 static int
  220 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  221 {
  222         struct task *ins;
  223         struct task *prev;
  224 
  225         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
  226         /*
  227          * Count multiple enqueues.
  228          */
  229         if (task->ta_pending) {
  230                 if (task->ta_pending < USHRT_MAX)
  231                         task->ta_pending++;
  232                 TQ_UNLOCK(queue);
  233                 return (0);
  234         }
  235 
  236         /*
  237          * Optimise cases when all tasks use small set of priorities.
  238          * In case of only one priority we always insert at the end.
  239          * In case of two tq_hint typically gives the insertion point.
  240          * In case of more then two tq_hint should halve the search.
  241          */
  242         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  243         if (!prev || prev->ta_priority >= task->ta_priority) {
  244                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  245         } else {
  246                 prev = queue->tq_hint;
  247                 if (prev && prev->ta_priority >= task->ta_priority) {
  248                         ins = STAILQ_NEXT(prev, ta_link);
  249                 } else {
  250                         prev = NULL;
  251                         ins = STAILQ_FIRST(&queue->tq_queue);
  252                 }
  253                 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  254                         if (ins->ta_priority < task->ta_priority)
  255                                 break;
  256 
  257                 if (prev) {
  258                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  259                         queue->tq_hint = task;
  260                 } else
  261                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  262         }
  263 
  264         task->ta_pending = 1;
  265         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  266                 TQ_UNLOCK(queue);
  267         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  268                 queue->tq_enqueue(queue->tq_context);
  269         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  270                 TQ_UNLOCK(queue);
  271 
  272         /* Return with lock released. */
  273         return (0);
  274 }
  275 
  276 int
  277 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  278 {
  279         int res;
  280 
  281         TQ_LOCK(queue);
  282         res = taskqueue_enqueue_locked(queue, task);
  283         /* The lock is released inside. */
  284 
  285         return (res);
  286 }
  287 
  288 static void
  289 taskqueue_timeout_func(void *arg)
  290 {
  291         struct taskqueue *queue;
  292         struct timeout_task *timeout_task;
  293 
  294         timeout_task = arg;
  295         queue = timeout_task->q;
  296         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  297         timeout_task->f &= ~DT_CALLOUT_ARMED;
  298         queue->tq_callouts--;
  299         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  300         /* The lock is released inside. */
  301 }
  302 
  303 int
  304 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
  305     struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
  306 {
  307         int res;
  308 
  309         TQ_LOCK(queue);
  310         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  311             ("Migrated queue"));
  312         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  313         timeout_task->q = queue;
  314         res = timeout_task->t.ta_pending;
  315         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  316                 /* Do nothing */
  317                 TQ_UNLOCK(queue);
  318                 res = -1;
  319         } else if (sbt == 0) {
  320                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  321                 /* The lock is released inside. */
  322         } else {
  323                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  324                         res++;
  325                 } else {
  326                         queue->tq_callouts++;
  327                         timeout_task->f |= DT_CALLOUT_ARMED;
  328                         if (sbt < 0)
  329                                 sbt = -sbt; /* Ignore overflow. */
  330                 }
  331                 if (sbt > 0) {
  332                         callout_reset_sbt(&timeout_task->c, sbt, pr,
  333                             taskqueue_timeout_func, timeout_task, flags);
  334                 }
  335                 TQ_UNLOCK(queue);
  336         }
  337         return (res);
  338 }
  339 
  340 int
  341 taskqueue_enqueue_timeout(struct taskqueue *queue,
  342     struct timeout_task *ttask, int ticks)
  343 {
  344 
  345         return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
  346             0, 0));
  347 }
  348 
  349 static void
  350 taskqueue_task_nop_fn(void *context, int pending)
  351 {
  352 }
  353 
  354 /*
  355  * Block until all currently queued tasks in this taskqueue
  356  * have begun execution.  Tasks queued during execution of
  357  * this function are ignored.
  358  */
  359 static int
  360 taskqueue_drain_tq_queue(struct taskqueue *queue)
  361 {
  362         struct task t_barrier;
  363 
  364         if (STAILQ_EMPTY(&queue->tq_queue))
  365                 return (0);
  366 
  367         /*
  368          * Enqueue our barrier after all current tasks, but with
  369          * the highest priority so that newly queued tasks cannot
  370          * pass it.  Because of the high priority, we can not use
  371          * taskqueue_enqueue_locked directly (which drops the lock
  372          * anyway) so just insert it at tail while we have the
  373          * queue lock.
  374          */
  375         TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier);
  376         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  377         queue->tq_hint = &t_barrier;
  378         t_barrier.ta_pending = 1;
  379 
  380         /*
  381          * Once the barrier has executed, all previously queued tasks
  382          * have completed or are currently executing.
  383          */
  384         while (t_barrier.ta_pending != 0)
  385                 TQ_SLEEP(queue, &t_barrier, "tq_qdrain");
  386         return (1);
  387 }
  388 
  389 /*
  390  * Block until all currently executing tasks for this taskqueue
  391  * complete.  Tasks that begin execution during the execution
  392  * of this function are ignored.
  393  */
  394 static int
  395 taskqueue_drain_tq_active(struct taskqueue *queue)
  396 {
  397         struct taskqueue_busy *tb;
  398         u_int seq;
  399 
  400         if (LIST_EMPTY(&queue->tq_active))
  401                 return (0);
  402 
  403         /* Block taskq_terminate().*/
  404         queue->tq_callouts++;
  405 
  406         /* Wait for any active task with sequence from the past. */
  407         seq = queue->tq_seq;
  408 restart:
  409         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  410                 if ((int)(tb->tb_seq - seq) <= 0) {
  411                         TQ_SLEEP(queue, tb->tb_running, "tq_adrain");
  412                         goto restart;
  413                 }
  414         }
  415 
  416         /* Release taskqueue_terminate(). */
  417         queue->tq_callouts--;
  418         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  419                 wakeup_one(queue->tq_threads);
  420         return (1);
  421 }
  422 
  423 void
  424 taskqueue_block(struct taskqueue *queue)
  425 {
  426 
  427         TQ_LOCK(queue);
  428         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  429         TQ_UNLOCK(queue);
  430 }
  431 
  432 void
  433 taskqueue_unblock(struct taskqueue *queue)
  434 {
  435 
  436         TQ_LOCK(queue);
  437         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  438         if (!STAILQ_EMPTY(&queue->tq_queue))
  439                 queue->tq_enqueue(queue->tq_context);
  440         TQ_UNLOCK(queue);
  441 }
  442 
  443 static void
  444 taskqueue_run_locked(struct taskqueue *queue)
  445 {
  446         struct epoch_tracker et;
  447         struct taskqueue_busy tb;
  448         struct task *task;
  449         bool in_net_epoch;
  450         int pending;
  451 
  452         KASSERT(queue != NULL, ("tq is NULL"));
  453         TQ_ASSERT_LOCKED(queue);
  454         tb.tb_running = NULL;
  455         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
  456         in_net_epoch = false;
  457 
  458         while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
  459                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  460                 if (queue->tq_hint == task)
  461                         queue->tq_hint = NULL;
  462                 pending = task->ta_pending;
  463                 task->ta_pending = 0;
  464                 tb.tb_running = task;
  465                 tb.tb_seq = ++queue->tq_seq;
  466                 TQ_UNLOCK(queue);
  467 
  468                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
  469                 if (!in_net_epoch && TASK_IS_NET(task)) {
  470                         in_net_epoch = true;
  471                         NET_EPOCH_ENTER(et);
  472                 } else if (in_net_epoch && !TASK_IS_NET(task)) {
  473                         NET_EPOCH_EXIT(et);
  474                         in_net_epoch = false;
  475                 }
  476                 task->ta_func(task->ta_context, pending);
  477 
  478                 TQ_LOCK(queue);
  479                 wakeup(task);
  480         }
  481         if (in_net_epoch)
  482                 NET_EPOCH_EXIT(et);
  483         LIST_REMOVE(&tb, tb_link);
  484 }
  485 
  486 void
  487 taskqueue_run(struct taskqueue *queue)
  488 {
  489 
  490         TQ_LOCK(queue);
  491         taskqueue_run_locked(queue);
  492         TQ_UNLOCK(queue);
  493 }
  494 
  495 static int
  496 task_is_running(struct taskqueue *queue, struct task *task)
  497 {
  498         struct taskqueue_busy *tb;
  499 
  500         TQ_ASSERT_LOCKED(queue);
  501         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  502                 if (tb->tb_running == task)
  503                         return (1);
  504         }
  505         return (0);
  506 }
  507 
  508 /*
  509  * Only use this function in single threaded contexts. It returns
  510  * non-zero if the given task is either pending or running. Else the
  511  * task is idle and can be queued again or freed.
  512  */
  513 int
  514 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  515 {
  516         int retval;
  517 
  518         TQ_LOCK(queue);
  519         retval = task->ta_pending > 0 || task_is_running(queue, task);
  520         TQ_UNLOCK(queue);
  521 
  522         return (retval);
  523 }
  524 
  525 static int
  526 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  527     u_int *pendp)
  528 {
  529 
  530         if (task->ta_pending > 0) {
  531                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  532                 if (queue->tq_hint == task)
  533                         queue->tq_hint = NULL;
  534         }
  535         if (pendp != NULL)
  536                 *pendp = task->ta_pending;
  537         task->ta_pending = 0;
  538         return (task_is_running(queue, task) ? EBUSY : 0);
  539 }
  540 
  541 int
  542 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  543 {
  544         int error;
  545 
  546         TQ_LOCK(queue);
  547         error = taskqueue_cancel_locked(queue, task, pendp);
  548         TQ_UNLOCK(queue);
  549 
  550         return (error);
  551 }
  552 
  553 int
  554 taskqueue_cancel_timeout(struct taskqueue *queue,
  555     struct timeout_task *timeout_task, u_int *pendp)
  556 {
  557         u_int pending, pending1;
  558         int error;
  559 
  560         TQ_LOCK(queue);
  561         pending = !!(callout_stop(&timeout_task->c) > 0);
  562         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  563         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  564                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  565                 queue->tq_callouts--;
  566         }
  567         TQ_UNLOCK(queue);
  568 
  569         if (pendp != NULL)
  570                 *pendp = pending + pending1;
  571         return (error);
  572 }
  573 
  574 void
  575 taskqueue_drain(struct taskqueue *queue, struct task *task)
  576 {
  577 
  578         if (!queue->tq_spin)
  579                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  580 
  581         TQ_LOCK(queue);
  582         while (task->ta_pending != 0 || task_is_running(queue, task))
  583                 TQ_SLEEP(queue, task, "tq_drain");
  584         TQ_UNLOCK(queue);
  585 }
  586 
  587 void
  588 taskqueue_drain_all(struct taskqueue *queue)
  589 {
  590 
  591         if (!queue->tq_spin)
  592                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  593 
  594         TQ_LOCK(queue);
  595         (void)taskqueue_drain_tq_queue(queue);
  596         (void)taskqueue_drain_tq_active(queue);
  597         TQ_UNLOCK(queue);
  598 }
  599 
  600 void
  601 taskqueue_drain_timeout(struct taskqueue *queue,
  602     struct timeout_task *timeout_task)
  603 {
  604 
  605         /*
  606          * Set flag to prevent timer from re-starting during drain:
  607          */
  608         TQ_LOCK(queue);
  609         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  610             ("Drain already in progress"));
  611         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  612         TQ_UNLOCK(queue);
  613 
  614         callout_drain(&timeout_task->c);
  615         taskqueue_drain(queue, &timeout_task->t);
  616 
  617         /*
  618          * Clear flag to allow timer to re-start:
  619          */
  620         TQ_LOCK(queue);
  621         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  622         TQ_UNLOCK(queue);
  623 }
  624 
  625 void
  626 taskqueue_quiesce(struct taskqueue *queue)
  627 {
  628         int ret;
  629 
  630         TQ_LOCK(queue);
  631         do {
  632                 ret = taskqueue_drain_tq_queue(queue);
  633                 if (ret == 0)
  634                         ret = taskqueue_drain_tq_active(queue);
  635         } while (ret != 0);
  636         TQ_UNLOCK(queue);
  637 }
  638 
  639 static void
  640 taskqueue_swi_enqueue(void *context)
  641 {
  642         swi_sched(taskqueue_ih, 0);
  643 }
  644 
  645 static void
  646 taskqueue_swi_run(void *dummy)
  647 {
  648         taskqueue_run(taskqueue_swi);
  649 }
  650 
  651 static void
  652 taskqueue_swi_giant_enqueue(void *context)
  653 {
  654         swi_sched(taskqueue_giant_ih, 0);
  655 }
  656 
  657 static void
  658 taskqueue_swi_giant_run(void *dummy)
  659 {
  660         taskqueue_run(taskqueue_swi_giant);
  661 }
  662 
  663 static int
  664 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  665     cpuset_t *mask, struct proc *p, const char *name, va_list ap)
  666 {
  667         char ktname[MAXCOMLEN + 1];
  668         struct thread *td;
  669         struct taskqueue *tq;
  670         int i, error;
  671 
  672         if (count <= 0)
  673                 return (EINVAL);
  674 
  675         vsnprintf(ktname, sizeof(ktname), name, ap);
  676         tq = *tqp;
  677 
  678         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  679             M_NOWAIT | M_ZERO);
  680         if (tq->tq_threads == NULL) {
  681                 printf("%s: no memory for %s threads\n", __func__, ktname);
  682                 return (ENOMEM);
  683         }
  684 
  685         for (i = 0; i < count; i++) {
  686                 if (count == 1)
  687                         error = kthread_add(taskqueue_thread_loop, tqp, p,
  688                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  689                 else
  690                         error = kthread_add(taskqueue_thread_loop, tqp, p,
  691                             &tq->tq_threads[i], RFSTOPPED, 0,
  692                             "%s_%d", ktname, i);
  693                 if (error) {
  694                         /* should be ok to continue, taskqueue_free will dtrt */
  695                         printf("%s: kthread_add(%s): error %d", __func__,
  696                             ktname, error);
  697                         tq->tq_threads[i] = NULL;               /* paranoid */
  698                 } else
  699                         tq->tq_tcount++;
  700         }
  701         if (tq->tq_tcount == 0) {
  702                 free(tq->tq_threads, M_TASKQUEUE);
  703                 tq->tq_threads = NULL;
  704                 return (ENOMEM);
  705         }
  706         for (i = 0; i < count; i++) {
  707                 if (tq->tq_threads[i] == NULL)
  708                         continue;
  709                 td = tq->tq_threads[i];
  710                 if (mask) {
  711                         error = cpuset_setthread(td->td_tid, mask);
  712                         /*
  713                          * Failing to pin is rarely an actual fatal error;
  714                          * it'll just affect performance.
  715                          */
  716                         if (error)
  717                                 printf("%s: curthread=%llu: can't pin; "
  718                                     "error=%d\n",
  719                                     __func__,
  720                                     (unsigned long long) td->td_tid,
  721                                     error);
  722                 }
  723                 thread_lock(td);
  724                 sched_prio(td, pri);
  725                 sched_add(td, SRQ_BORING);
  726         }
  727 
  728         return (0);
  729 }
  730 
  731 int
  732 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  733     const char *name, ...)
  734 {
  735         va_list ap;
  736         int error;
  737 
  738         va_start(ap, name);
  739         error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap);
  740         va_end(ap);
  741         return (error);
  742 }
  743 
  744 int
  745 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri,
  746     struct proc *proc, const char *name, ...)
  747 {
  748         va_list ap;
  749         int error;
  750 
  751         va_start(ap, name);
  752         error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap);
  753         va_end(ap);
  754         return (error);
  755 }
  756 
  757 int
  758 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
  759     cpuset_t *mask, const char *name, ...)
  760 {
  761         va_list ap;
  762         int error;
  763 
  764         va_start(ap, name);
  765         error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap);
  766         va_end(ap);
  767         return (error);
  768 }
  769 
  770 static inline void
  771 taskqueue_run_callback(struct taskqueue *tq,
  772     enum taskqueue_callback_type cb_type)
  773 {
  774         taskqueue_callback_fn tq_callback;
  775 
  776         TQ_ASSERT_UNLOCKED(tq);
  777         tq_callback = tq->tq_callbacks[cb_type];
  778         if (tq_callback != NULL)
  779                 tq_callback(tq->tq_cb_contexts[cb_type]);
  780 }
  781 
  782 void
  783 taskqueue_thread_loop(void *arg)
  784 {
  785         struct taskqueue **tqp, *tq;
  786 
  787         tqp = arg;
  788         tq = *tqp;
  789         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  790         TQ_LOCK(tq);
  791         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  792                 /* XXX ? */
  793                 taskqueue_run_locked(tq);
  794                 /*
  795                  * Because taskqueue_run() can drop tq_mutex, we need to
  796                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  797                  * meantime, which means we missed a wakeup.
  798                  */
  799                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  800                         break;
  801                 TQ_SLEEP(tq, tq, "-");
  802         }
  803         taskqueue_run_locked(tq);
  804         /*
  805          * This thread is on its way out, so just drop the lock temporarily
  806          * in order to call the shutdown callback.  This allows the callback
  807          * to look at the taskqueue, even just before it dies.
  808          */
  809         TQ_UNLOCK(tq);
  810         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  811         TQ_LOCK(tq);
  812 
  813         /* rendezvous with thread that asked us to terminate */
  814         tq->tq_tcount--;
  815         wakeup_one(tq->tq_threads);
  816         TQ_UNLOCK(tq);
  817         kthread_exit();
  818 }
  819 
  820 void
  821 taskqueue_thread_enqueue(void *context)
  822 {
  823         struct taskqueue **tqp, *tq;
  824 
  825         tqp = context;
  826         tq = *tqp;
  827         wakeup_any(tq);
  828 }
  829 
  830 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  831                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  832                      INTR_MPSAFE, &taskqueue_ih));
  833 
  834 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  835                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  836                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
  837 
  838 TASKQUEUE_DEFINE_THREAD(thread);
  839 
  840 struct taskqueue *
  841 taskqueue_create_fast(const char *name, int mflags,
  842                  taskqueue_enqueue_fn enqueue, void *context)
  843 {
  844         return _taskqueue_create(name, mflags, enqueue, context,
  845                         MTX_SPIN, "fast_taskqueue");
  846 }
  847 
  848 static void     *taskqueue_fast_ih;
  849 
  850 static void
  851 taskqueue_fast_enqueue(void *context)
  852 {
  853         swi_sched(taskqueue_fast_ih, 0);
  854 }
  855 
  856 static void
  857 taskqueue_fast_run(void *dummy)
  858 {
  859         taskqueue_run(taskqueue_fast);
  860 }
  861 
  862 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  863         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  864         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  865 
  866 int
  867 taskqueue_member(struct taskqueue *queue, struct thread *td)
  868 {
  869         int i, j, ret = 0;
  870 
  871         for (i = 0, j = 0; ; i++) {
  872                 if (queue->tq_threads[i] == NULL)
  873                         continue;
  874                 if (queue->tq_threads[i] == td) {
  875                         ret = 1;
  876                         break;
  877                 }
  878                 if (++j >= queue->tq_tcount)
  879                         break;
  880         }
  881         return (ret);
  882 }

Cache object: 25db95056b5363237b5664f998bada68


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.