The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #include <sys/cdefs.h>
   28 __FBSDID("$FreeBSD$");
   29 
   30 #include <sys/param.h>
   31 #include <sys/systm.h>
   32 #include <sys/bus.h>
   33 #include <sys/interrupt.h>
   34 #include <sys/kernel.h>
   35 #include <sys/kthread.h>
   36 #include <sys/limits.h>
   37 #include <sys/lock.h>
   38 #include <sys/malloc.h>
   39 #include <sys/mutex.h>
   40 #include <sys/proc.h>
   41 #include <sys/sched.h>
   42 #include <sys/taskqueue.h>
   43 #include <sys/unistd.h>
   44 #include <machine/stdarg.h>
   45 
   46 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   47 static void     *taskqueue_giant_ih;
   48 static void     *taskqueue_ih;
   49 static void      taskqueue_fast_enqueue(void *);
   50 static void      taskqueue_swi_enqueue(void *);
   51 static void      taskqueue_swi_giant_enqueue(void *);
   52 
   53 struct taskqueue_busy {
   54         struct task     *tb_running;
   55         TAILQ_ENTRY(taskqueue_busy) tb_link;
   56 };
   57 
   58 struct taskqueue {
   59         STAILQ_HEAD(, task)     tq_queue;
   60         taskqueue_enqueue_fn    tq_enqueue;
   61         void                    *tq_context;
   62         TAILQ_HEAD(, taskqueue_busy) tq_active;
   63         struct mtx              tq_mutex;
   64         struct thread           **tq_threads;
   65         int                     tq_tcount;
   66         int                     tq_spin;
   67         int                     tq_flags;
   68         int                     tq_callouts;
   69         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   70         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   71 };
   72 
   73 #define TQ_FLAGS_ACTIVE         (1 << 0)
   74 #define TQ_FLAGS_BLOCKED        (1 << 1)
   75 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   76 
   77 #define DT_CALLOUT_ARMED        (1 << 0)
   78 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
   79 
   80 #define TQ_LOCK(tq)                                                     \
   81         do {                                                            \
   82                 if ((tq)->tq_spin)                                      \
   83                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   84                 else                                                    \
   85                         mtx_lock(&(tq)->tq_mutex);                      \
   86         } while (0)
   87 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   88 
   89 #define TQ_UNLOCK(tq)                                                   \
   90         do {                                                            \
   91                 if ((tq)->tq_spin)                                      \
   92                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   93                 else                                                    \
   94                         mtx_unlock(&(tq)->tq_mutex);                    \
   95         } while (0)
   96 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
   97 
   98 void
   99 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
  100     int priority, task_fn_t func, void *context)
  101 {
  102 
  103         TASK_INIT(&timeout_task->t, priority, func, context);
  104         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
  105             CALLOUT_RETURNUNLOCKED);
  106         timeout_task->q = queue;
  107         timeout_task->f = 0;
  108 }
  109 
  110 static __inline int
  111 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  112     int t)
  113 {
  114         if (tq->tq_spin)
  115                 return (msleep_spin(p, m, wm, t));
  116         return (msleep(p, m, pri, wm, t));
  117 }
  118 
  119 static struct taskqueue *
  120 _taskqueue_create(const char *name __unused, int mflags,
  121                  taskqueue_enqueue_fn enqueue, void *context,
  122                  int mtxflags, const char *mtxname)
  123 {
  124         struct taskqueue *queue;
  125 
  126         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
  127         if (!queue)
  128                 return NULL;
  129 
  130         STAILQ_INIT(&queue->tq_queue);
  131         TAILQ_INIT(&queue->tq_active);
  132         queue->tq_enqueue = enqueue;
  133         queue->tq_context = context;
  134         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  135         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  136         if (enqueue == taskqueue_fast_enqueue ||
  137             enqueue == taskqueue_swi_enqueue ||
  138             enqueue == taskqueue_swi_giant_enqueue ||
  139             enqueue == taskqueue_thread_enqueue)
  140                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  141         mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
  142 
  143         return queue;
  144 }
  145 
  146 struct taskqueue *
  147 taskqueue_create(const char *name, int mflags,
  148                  taskqueue_enqueue_fn enqueue, void *context)
  149 {
  150         return _taskqueue_create(name, mflags, enqueue, context,
  151                         MTX_DEF, "taskqueue");
  152 }
  153 
  154 void
  155 taskqueue_set_callback(struct taskqueue *queue,
  156     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
  157     void *context)
  158 {
  159 
  160         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
  161             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
  162             ("Callback type %d not valid, must be %d-%d", cb_type,
  163             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
  164         KASSERT((queue->tq_callbacks[cb_type] == NULL),
  165             ("Re-initialization of taskqueue callback?"));
  166 
  167         queue->tq_callbacks[cb_type] = callback;
  168         queue->tq_cb_contexts[cb_type] = context;
  169 }
  170 
  171 /*
  172  * Signal a taskqueue thread to terminate.
  173  */
  174 static void
  175 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  176 {
  177 
  178         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  179                 wakeup(tq);
  180                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  181         }
  182 }
  183 
  184 void
  185 taskqueue_free(struct taskqueue *queue)
  186 {
  187 
  188         TQ_LOCK(queue);
  189         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  190         taskqueue_terminate(queue->tq_threads, queue);
  191         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  192         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  193         mtx_destroy(&queue->tq_mutex);
  194         free(queue->tq_threads, M_TASKQUEUE);
  195         free(queue, M_TASKQUEUE);
  196 }
  197 
  198 static int
  199 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  200 {
  201         struct task *ins;
  202         struct task *prev;
  203 
  204         /*
  205          * Count multiple enqueues.
  206          */
  207         if (task->ta_pending) {
  208                 if (task->ta_pending < USHRT_MAX)
  209                         task->ta_pending++;
  210                 TQ_UNLOCK(queue);
  211                 return (0);
  212         }
  213 
  214         /*
  215          * Optimise the case when all tasks have the same priority.
  216          */
  217         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  218         if (!prev || prev->ta_priority >= task->ta_priority) {
  219                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  220         } else {
  221                 prev = NULL;
  222                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  223                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  224                         if (ins->ta_priority < task->ta_priority)
  225                                 break;
  226 
  227                 if (prev)
  228                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  229                 else
  230                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  231         }
  232 
  233         task->ta_pending = 1;
  234         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
  235                 TQ_UNLOCK(queue);
  236         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  237                 queue->tq_enqueue(queue->tq_context);
  238         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
  239                 TQ_UNLOCK(queue);
  240 
  241         /* Return with lock released. */
  242         return (0);
  243 }
  244 int
  245 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  246 {
  247         int res;
  248 
  249         TQ_LOCK(queue);
  250         res = taskqueue_enqueue_locked(queue, task);
  251         /* The lock is released inside. */
  252 
  253         return (res);
  254 }
  255 
  256 static void
  257 taskqueue_timeout_func(void *arg)
  258 {
  259         struct taskqueue *queue;
  260         struct timeout_task *timeout_task;
  261 
  262         timeout_task = arg;
  263         queue = timeout_task->q;
  264         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  265         timeout_task->f &= ~DT_CALLOUT_ARMED;
  266         queue->tq_callouts--;
  267         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  268         /* The lock is released inside. */
  269 }
  270 
  271 int
  272 taskqueue_enqueue_timeout(struct taskqueue *queue,
  273     struct timeout_task *timeout_task, int ticks)
  274 {
  275         int res;
  276 
  277         TQ_LOCK(queue);
  278         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  279             ("Migrated queue"));
  280         KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
  281         timeout_task->q = queue;
  282         res = timeout_task->t.ta_pending;
  283         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
  284                 /* Do nothing */
  285                 TQ_UNLOCK(queue);
  286                 res = -1;
  287         } else if (ticks == 0) {
  288                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  289                 /* The lock is released inside. */
  290         } else {
  291                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  292                         res++;
  293                 } else {
  294                         queue->tq_callouts++;
  295                         timeout_task->f |= DT_CALLOUT_ARMED;
  296                         if (ticks < 0)
  297                                 ticks = -ticks; /* Ignore overflow. */
  298                 }
  299                 if (ticks > 0) {
  300                         callout_reset(&timeout_task->c, ticks,
  301                             taskqueue_timeout_func, timeout_task);
  302                 }
  303                 TQ_UNLOCK(queue);
  304         }
  305         return (res);
  306 }
  307 
  308 static void
  309 taskqueue_drain_running(struct taskqueue *queue)
  310 {
  311 
  312         while (!TAILQ_EMPTY(&queue->tq_active))
  313                 TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex,
  314                     PWAIT, "-", 0);
  315 }
  316 
  317 void
  318 taskqueue_block(struct taskqueue *queue)
  319 {
  320 
  321         TQ_LOCK(queue);
  322         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  323         TQ_UNLOCK(queue);
  324 }
  325 
  326 void
  327 taskqueue_unblock(struct taskqueue *queue)
  328 {
  329 
  330         TQ_LOCK(queue);
  331         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  332         if (!STAILQ_EMPTY(&queue->tq_queue))
  333                 queue->tq_enqueue(queue->tq_context);
  334         TQ_UNLOCK(queue);
  335 }
  336 
  337 static void
  338 taskqueue_run_locked(struct taskqueue *queue)
  339 {
  340         struct taskqueue_busy tb;
  341         struct task *task;
  342         int pending;
  343 
  344         TQ_ASSERT_LOCKED(queue);
  345         tb.tb_running = NULL;
  346         TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  347 
  348         while (STAILQ_FIRST(&queue->tq_queue)) {
  349                 /*
  350                  * Carefully remove the first task from the queue and
  351                  * zero its pending count.
  352                  */
  353                 task = STAILQ_FIRST(&queue->tq_queue);
  354                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  355                 pending = task->ta_pending;
  356                 task->ta_pending = 0;
  357                 tb.tb_running = task;
  358                 TQ_UNLOCK(queue);
  359 
  360                 task->ta_func(task->ta_context, pending);
  361 
  362                 TQ_LOCK(queue);
  363                 tb.tb_running = NULL;
  364                 wakeup(task);
  365         }
  366         TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  367         if (TAILQ_EMPTY(&queue->tq_active))
  368                 wakeup(&queue->tq_active);
  369 }
  370 
  371 void
  372 taskqueue_run(struct taskqueue *queue)
  373 {
  374 
  375         TQ_LOCK(queue);
  376         taskqueue_run_locked(queue);
  377         TQ_UNLOCK(queue);
  378 }
  379 
  380 static int
  381 task_is_running(struct taskqueue *queue, struct task *task)
  382 {
  383         struct taskqueue_busy *tb;
  384 
  385         TQ_ASSERT_LOCKED(queue);
  386         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  387                 if (tb->tb_running == task)
  388                         return (1);
  389         }
  390         return (0);
  391 }
  392 
  393 /*
  394  * Only use this function in single threaded contexts. It returns
  395  * non-zero if the given task is either pending or running. Else the
  396  * task is idle and can be queued again or freed.
  397  */
  398 int
  399 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
  400 {
  401         int retval;
  402 
  403         TQ_LOCK(queue);
  404         retval = task->ta_pending > 0 || task_is_running(queue, task);
  405         TQ_UNLOCK(queue);
  406 
  407         return (retval);
  408 }
  409 
  410 static int
  411 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  412     u_int *pendp)
  413 {
  414 
  415         if (task->ta_pending > 0)
  416                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  417         if (pendp != NULL)
  418                 *pendp = task->ta_pending;
  419         task->ta_pending = 0;
  420         return (task_is_running(queue, task) ? EBUSY : 0);
  421 }
  422 
  423 int
  424 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  425 {
  426         int error;
  427 
  428         TQ_LOCK(queue);
  429         error = taskqueue_cancel_locked(queue, task, pendp);
  430         TQ_UNLOCK(queue);
  431 
  432         return (error);
  433 }
  434 
  435 int
  436 taskqueue_cancel_timeout(struct taskqueue *queue,
  437     struct timeout_task *timeout_task, u_int *pendp)
  438 {
  439         u_int pending, pending1;
  440         int error;
  441 
  442         TQ_LOCK(queue);
  443         pending = !!callout_stop(&timeout_task->c);
  444         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  445         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  446                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  447                 queue->tq_callouts--;
  448         }
  449         TQ_UNLOCK(queue);
  450 
  451         if (pendp != NULL)
  452                 *pendp = pending + pending1;
  453         return (error);
  454 }
  455 
  456 void
  457 taskqueue_drain(struct taskqueue *queue, struct task *task)
  458 {
  459 
  460         if (!queue->tq_spin)
  461                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  462 
  463         TQ_LOCK(queue);
  464         while (task->ta_pending != 0 || task_is_running(queue, task))
  465                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  466         TQ_UNLOCK(queue);
  467 }
  468 
  469 void
  470 taskqueue_drain_all(struct taskqueue *queue)
  471 {
  472         struct task *task;
  473 
  474         if (!queue->tq_spin)
  475                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  476 
  477         TQ_LOCK(queue);
  478         task = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  479         while (task != NULL && task->ta_pending != 0) {
  480                 struct task *oldtask;
  481                 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
  482                 /*
  483                  * While we were asleeep the last entry may have been freed.
  484                  * We need to check if it's still even in the queue.
  485                  * Not perfect, but it's better than referencing bad memory.
  486                  * first guess is the current 'end of queue' but if a new
  487                  * item has been added we need to take the expensive path
  488                  * Better fix in 11.
  489                  */
  490                 oldtask = task;
  491                 if (oldtask !=
  492                     (task = STAILQ_LAST(&queue->tq_queue, task, ta_link))) {
  493                         STAILQ_FOREACH(task, &queue->tq_queue, ta_link) {
  494                                 if (task == oldtask)
  495                                         break;
  496                         }
  497                 }
  498         }
  499         taskqueue_drain_running(queue);
  500         KASSERT(STAILQ_EMPTY(&queue->tq_queue),
  501             ("taskqueue queue is not empty after draining"));
  502         TQ_UNLOCK(queue);
  503 }
  504 
  505 void
  506 taskqueue_drain_timeout(struct taskqueue *queue,
  507     struct timeout_task *timeout_task)
  508 {
  509 
  510         /*
  511          * Set flag to prevent timer from re-starting during drain:
  512          */
  513         TQ_LOCK(queue);
  514         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
  515             ("Drain already in progress"));
  516         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
  517         TQ_UNLOCK(queue);
  518 
  519         callout_drain(&timeout_task->c);
  520         taskqueue_drain(queue, &timeout_task->t);
  521 
  522         /*
  523          * Clear flag to allow timer to re-start:
  524          */
  525         TQ_LOCK(queue);
  526         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
  527         TQ_UNLOCK(queue);
  528 }
  529 
  530 static void
  531 taskqueue_swi_enqueue(void *context)
  532 {
  533         swi_sched(taskqueue_ih, 0);
  534 }
  535 
  536 static void
  537 taskqueue_swi_run(void *dummy)
  538 {
  539         taskqueue_run(taskqueue_swi);
  540 }
  541 
  542 static void
  543 taskqueue_swi_giant_enqueue(void *context)
  544 {
  545         swi_sched(taskqueue_giant_ih, 0);
  546 }
  547 
  548 static void
  549 taskqueue_swi_giant_run(void *dummy)
  550 {
  551         taskqueue_run(taskqueue_swi_giant);
  552 }
  553 
  554 int
  555 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
  556                         const char *name, ...)
  557 {
  558         va_list ap;
  559         struct thread *td;
  560         struct taskqueue *tq;
  561         int i, error;
  562         char ktname[MAXCOMLEN + 1];
  563 
  564         if (count <= 0)
  565                 return (EINVAL);
  566 
  567         tq = *tqp;
  568 
  569         va_start(ap, name);
  570         vsnprintf(ktname, sizeof(ktname), name, ap);
  571         va_end(ap);
  572 
  573         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  574             M_NOWAIT | M_ZERO);
  575         if (tq->tq_threads == NULL) {
  576                 printf("%s: no memory for %s threads\n", __func__, ktname);
  577                 return (ENOMEM);
  578         }
  579 
  580         for (i = 0; i < count; i++) {
  581                 if (count == 1)
  582                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  583                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  584                 else
  585                         error = kthread_add(taskqueue_thread_loop, tqp, NULL,
  586                             &tq->tq_threads[i], RFSTOPPED, 0,
  587                             "%s_%d", ktname, i);
  588                 if (error) {
  589                         /* should be ok to continue, taskqueue_free will dtrt */
  590                         printf("%s: kthread_add(%s): error %d", __func__,
  591                             ktname, error);
  592                         tq->tq_threads[i] = NULL;               /* paranoid */
  593                 } else
  594                         tq->tq_tcount++;
  595         }
  596         for (i = 0; i < count; i++) {
  597                 if (tq->tq_threads[i] == NULL)
  598                         continue;
  599                 td = tq->tq_threads[i];
  600                 thread_lock(td);
  601                 sched_prio(td, pri);
  602                 sched_add(td, SRQ_BORING);
  603                 thread_unlock(td);
  604         }
  605 
  606         return (0);
  607 }
  608 
  609 static inline void
  610 taskqueue_run_callback(struct taskqueue *tq,
  611     enum taskqueue_callback_type cb_type)
  612 {
  613         taskqueue_callback_fn tq_callback;
  614 
  615         TQ_ASSERT_UNLOCKED(tq);
  616         tq_callback = tq->tq_callbacks[cb_type];
  617         if (tq_callback != NULL)
  618                 tq_callback(tq->tq_cb_contexts[cb_type]);
  619 }
  620 
  621 void
  622 taskqueue_thread_loop(void *arg)
  623 {
  624         struct taskqueue **tqp, *tq;
  625 
  626         tqp = arg;
  627         tq = *tqp;
  628         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  629         TQ_LOCK(tq);
  630         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  631                 taskqueue_run_locked(tq);
  632                 /*
  633                  * Because taskqueue_run() can drop tq_mutex, we need to
  634                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  635                  * meantime, which means we missed a wakeup.
  636                  */
  637                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  638                         break;
  639                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  640         }
  641         taskqueue_run_locked(tq);
  642 
  643         /*
  644          * This thread is on its way out, so just drop the lock temporarily
  645          * in order to call the shutdown callback.  This allows the callback
  646          * to look at the taskqueue, even just before it dies.
  647          */
  648         TQ_UNLOCK(tq);
  649         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  650         TQ_LOCK(tq);
  651 
  652         /* rendezvous with thread that asked us to terminate */
  653         tq->tq_tcount--;
  654         wakeup_one(tq->tq_threads);
  655         TQ_UNLOCK(tq);
  656         kthread_exit();
  657 }
  658 
  659 void
  660 taskqueue_thread_enqueue(void *context)
  661 {
  662         struct taskqueue **tqp, *tq;
  663 
  664         tqp = context;
  665         tq = *tqp;
  666 
  667         wakeup_one(tq);
  668 }
  669 
  670 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
  671                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
  672                      INTR_MPSAFE, &taskqueue_ih)); 
  673 
  674 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
  675                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
  676                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 
  677 
  678 TASKQUEUE_DEFINE_THREAD(thread);
  679 
  680 struct taskqueue *
  681 taskqueue_create_fast(const char *name, int mflags,
  682                  taskqueue_enqueue_fn enqueue, void *context)
  683 {
  684         return _taskqueue_create(name, mflags, enqueue, context,
  685                         MTX_SPIN, "fast_taskqueue");
  686 }
  687 
  688 /* NB: for backwards compatibility */
  689 int
  690 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
  691 {
  692         return taskqueue_enqueue(queue, task);
  693 }
  694 
  695 static void     *taskqueue_fast_ih;
  696 
  697 static void
  698 taskqueue_fast_enqueue(void *context)
  699 {
  700         swi_sched(taskqueue_fast_ih, 0);
  701 }
  702 
  703 static void
  704 taskqueue_fast_run(void *dummy)
  705 {
  706         taskqueue_run(taskqueue_fast);
  707 }
  708 
  709 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
  710         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
  711         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
  712 
  713 int
  714 taskqueue_member(struct taskqueue *queue, struct thread *td)
  715 {
  716         int i, j, ret = 0;
  717 
  718         for (i = 0, j = 0; ; i++) {
  719                 if (queue->tq_threads[i] == NULL)
  720                         continue;
  721                 if (queue->tq_threads[i] == td) {
  722                         ret = 1;
  723                         break;
  724                 }
  725                 if (++j >= queue->tq_tcount)
  726                         break;
  727         }
  728         return (ret);
  729 }

Cache object: 889d991ed3b8f7f784ac5a7c5b6d157c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.