The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_taskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  *
   26  * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
   27  */
   28 
   29 #include <sys/param.h>
   30 #include <sys/queue.h>
   31 #include <sys/systm.h>
   32 #include <sys/kernel.h>
   33 #include <sys/taskqueue.h>
   34 #include <sys/interrupt.h>
   35 #include <sys/lock.h>
   36 #include <sys/malloc.h>
   37 #include <sys/kthread.h>
   38 #include <sys/thread2.h>
   39 #include <sys/spinlock.h>
   40 #include <sys/spinlock2.h>
   41 #include <sys/serialize.h>
   42 #include <sys/proc.h>
   43 #include <machine/varargs.h>
   44 
   45 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
   46 
   47 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
   48 static struct lock      taskqueue_queues_lock;
   49 
   50 struct taskqueue {
   51         STAILQ_ENTRY(taskqueue) tq_link;
   52         STAILQ_HEAD(, task)     tq_queue;
   53         const char              *tq_name;
   54         taskqueue_enqueue_fn    tq_enqueue;
   55         void                    *tq_context;
   56 
   57         struct task             *tq_running;
   58         struct spinlock         tq_lock;
   59         struct thread           **tq_threads;
   60         int                     tq_tcount;
   61         int                     tq_flags;
   62         int                     tq_callouts;
   63 };
   64 
   65 #define TQ_FLAGS_ACTIVE         (1 << 0)
   66 #define TQ_FLAGS_BLOCKED        (1 << 1)
   67 #define TQ_FLAGS_PENDING        (1 << 2)
   68 
   69 #define DT_CALLOUT_ARMED        (1 << 0)
   70 
   71 void
   72 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
   73     int priority, task_fn_t func, void *context)
   74 {
   75 
   76         TASK_INIT(&timeout_task->t, priority, func, context);
   77         callout_init(&timeout_task->c);
   78         timeout_task->q = queue;
   79         timeout_task->f = 0;
   80 }
   81 
   82 static void taskqueue_run(struct taskqueue *queue, int lock_held);
   83 
   84 static __inline void
   85 TQ_LOCK_INIT(struct taskqueue *tq)
   86 {
   87         spin_init(&tq->tq_lock);
   88 }
   89 
   90 static __inline void
   91 TQ_LOCK_UNINIT(struct taskqueue *tq)
   92 {
   93         spin_uninit(&tq->tq_lock);
   94 }
   95 
   96 static __inline void
   97 TQ_LOCK(struct taskqueue *tq)
   98 {
   99         spin_lock(&tq->tq_lock);
  100 }
  101 
  102 static __inline void
  103 TQ_UNLOCK(struct taskqueue *tq)
  104 {
  105         spin_unlock(&tq->tq_lock);
  106 }
  107 
  108 static __inline void
  109 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg)
  110 {
  111         ssleep(ident, &tq->tq_lock, 0, wmesg, 0);
  112 }
  113 
  114 struct taskqueue *
  115 taskqueue_create(const char *name, int mflags,
  116                  taskqueue_enqueue_fn enqueue, void *context)
  117 {
  118         struct taskqueue *queue;
  119 
  120         queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO);
  121         if (!queue)
  122                 return NULL;
  123         STAILQ_INIT(&queue->tq_queue);
  124         queue->tq_name = name;
  125         queue->tq_enqueue = enqueue;
  126         queue->tq_context = context;
  127         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  128         TQ_LOCK_INIT(queue);
  129 
  130         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
  131         STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
  132         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
  133 
  134         return queue;
  135 }
  136 
  137 static void
  138 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
  139 {
  140         while(tq->tq_tcount > 0) {
  141                 wakeup(tq);
  142                 TQ_SLEEP(tq, pp, "taskqueue_terminate");
  143         }
  144 }
  145 
  146 void
  147 taskqueue_free(struct taskqueue *queue)
  148 {
  149         TQ_LOCK(queue);
  150         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  151         taskqueue_run(queue, 1);
  152         taskqueue_terminate(queue->tq_threads, queue);
  153         TQ_UNLOCK(queue);
  154 
  155         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
  156         STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
  157         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
  158 
  159         TQ_LOCK_UNINIT(queue);
  160 
  161         kfree(queue, M_TASKQUEUE);
  162 }
  163 
  164 struct taskqueue *
  165 taskqueue_find(const char *name)
  166 {
  167         struct taskqueue *queue;
  168 
  169         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
  170         STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
  171                 if (!strcmp(queue->tq_name, name)) {
  172                         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
  173                         return queue;
  174                 }
  175         }
  176         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
  177         return NULL;
  178 }
  179 
  180 /*
  181  * NOTE!  If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
  182  * be sure NOT TO SHARE the ``task'' between CPUs.  TASKS ARE NOT LOCKED.
  183  * So either use a throwaway task which will only be enqueued once, or
  184  * use one task per CPU!
  185  */
  186 static int
  187 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
  188 {
  189         struct task *ins;
  190         struct task *prev;
  191 
  192         /*
  193          * Don't allow new tasks on a queue which is being freed.
  194          */
  195         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) {
  196                 TQ_UNLOCK(queue);
  197                 return EPIPE;
  198         }
  199 
  200         /*
  201          * Count multiple enqueues.
  202          */
  203         if (task->ta_pending) {
  204                 task->ta_pending++;
  205                 return 0;
  206         }
  207 
  208         /*
  209          * Optimise the case when all tasks have the same priority.
  210          */
  211         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
  212         if (!prev || prev->ta_priority >= task->ta_priority) {
  213                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
  214         } else {
  215                 prev = NULL;
  216                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
  217                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
  218                         if (ins->ta_priority < task->ta_priority)
  219                                 break;
  220 
  221                 if (prev)
  222                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
  223                 else
  224                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
  225         }
  226 
  227         task->ta_pending = 1;
  228         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) {
  229                 if (queue->tq_enqueue)
  230                         queue->tq_enqueue(queue->tq_context);
  231         } else {
  232                 queue->tq_flags |= TQ_FLAGS_PENDING;
  233         }
  234 
  235         return 0;
  236 }
  237 
  238 int
  239 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
  240 {
  241         int res;
  242 
  243         TQ_LOCK(queue);
  244         res = taskqueue_enqueue_locked(queue, task);
  245         TQ_UNLOCK(queue);
  246 
  247         return (res);
  248 }
  249 
  250 static void
  251 taskqueue_timeout_func(void *arg)
  252 {
  253         struct taskqueue *queue;
  254         struct timeout_task *timeout_task;
  255 
  256         timeout_task = arg;
  257         queue = timeout_task->q;
  258         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
  259         timeout_task->f &= ~DT_CALLOUT_ARMED;
  260         queue->tq_callouts--;
  261         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
  262 }
  263 
  264 int
  265 taskqueue_enqueue_timeout(struct taskqueue *queue,
  266     struct timeout_task *timeout_task, int ticks)
  267 {
  268         int res;
  269 
  270         TQ_LOCK(queue);
  271         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
  272             ("Migrated queue"));
  273         timeout_task->q = queue;
  274         res = timeout_task->t.ta_pending;
  275         if (ticks == 0) {
  276                 taskqueue_enqueue_locked(queue, &timeout_task->t);
  277         } else {
  278                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  279                         res++;
  280                 } else {
  281                         queue->tq_callouts++;
  282                         timeout_task->f |= DT_CALLOUT_ARMED;
  283                 }
  284                 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
  285                     timeout_task);
  286         }
  287         TQ_UNLOCK(queue);
  288         return (res);
  289 }
  290 
  291 void
  292 taskqueue_block(struct taskqueue *queue)
  293 {
  294         TQ_LOCK(queue);
  295         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  296         TQ_UNLOCK(queue);
  297 }
  298 
  299 void
  300 taskqueue_unblock(struct taskqueue *queue)
  301 {
  302         TQ_LOCK(queue);
  303         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  304         if (queue->tq_flags & TQ_FLAGS_PENDING) {
  305                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
  306                 if (queue->tq_enqueue)
  307                         queue->tq_enqueue(queue->tq_context);
  308         }
  309         TQ_UNLOCK(queue);
  310 }
  311 
  312 void
  313 taskqueue_run(struct taskqueue *queue, int lock_held)
  314 {
  315         struct task *task;
  316         int pending;
  317 
  318         if (lock_held == 0)
  319                 TQ_LOCK(queue);
  320         while (STAILQ_FIRST(&queue->tq_queue)) {
  321                 /*
  322                  * Carefully remove the first task from the queue and
  323                  * zero its pending count.
  324                  */
  325                 task = STAILQ_FIRST(&queue->tq_queue);
  326                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  327                 pending = task->ta_pending;
  328                 task->ta_pending = 0;
  329                 queue->tq_running = task;
  330                 TQ_UNLOCK(queue);
  331 
  332                 task->ta_func(task->ta_context, pending);
  333 
  334                 TQ_LOCK(queue);
  335                 queue->tq_running = NULL;
  336                 wakeup(task);
  337         }
  338         if (lock_held == 0)
  339                 TQ_UNLOCK(queue);
  340 }
  341 
  342 static int
  343 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
  344     u_int *pendp)
  345 {
  346 
  347         if (task->ta_pending > 0)
  348                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
  349         if (pendp != NULL)
  350                 *pendp = task->ta_pending;
  351         task->ta_pending = 0;
  352         return (task == queue->tq_running ? EBUSY : 0);
  353 }
  354 
  355 int
  356 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
  357 {
  358         u_int pending;
  359         int error;
  360 
  361         TQ_LOCK(queue);
  362         pending = task->ta_pending;
  363         error = taskqueue_cancel_locked(queue, task, pendp);
  364         TQ_UNLOCK(queue);
  365 
  366         return (error);
  367 }
  368 
  369 int
  370 taskqueue_cancel_timeout(struct taskqueue *queue,
  371     struct timeout_task *timeout_task, u_int *pendp)
  372 {
  373         u_int pending, pending1;
  374         int error;
  375 
  376         TQ_LOCK(queue);
  377         pending = !!callout_stop(&timeout_task->c);
  378         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
  379         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
  380                 timeout_task->f &= ~DT_CALLOUT_ARMED;
  381                 queue->tq_callouts--;
  382         }
  383         TQ_UNLOCK(queue);
  384 
  385         if (pendp != NULL)
  386                 *pendp = pending + pending1;
  387         return (error);
  388 }
  389 
  390 void
  391 taskqueue_drain(struct taskqueue *queue, struct task *task)
  392 {
  393         TQ_LOCK(queue);
  394         while (task->ta_pending != 0 || task == queue->tq_running)
  395                 TQ_SLEEP(queue, task, "-");
  396         TQ_UNLOCK(queue);
  397 }
  398 
  399 void
  400 taskqueue_drain_timeout(struct taskqueue *queue,
  401     struct timeout_task *timeout_task)
  402 {
  403 
  404         callout_stop_sync(&timeout_task->c);
  405         taskqueue_drain(queue, &timeout_task->t);
  406 }
  407 
  408 static void
  409 taskqueue_swi_enqueue(void *context)
  410 {
  411         setsofttq();
  412 }
  413 
  414 static void
  415 taskqueue_swi_run(void *arg, void *frame)
  416 {
  417         taskqueue_run(taskqueue_swi, 0);
  418 }
  419 
  420 static void
  421 taskqueue_swi_mp_run(void *arg, void *frame)
  422 {
  423         taskqueue_run(taskqueue_swi_mp, 0);
  424 }
  425 
  426 int
  427 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu,
  428                         const char *fmt, ...)
  429 {
  430         __va_list ap;
  431         struct thread *td;
  432         struct taskqueue *tq;
  433         int i, error, cpu;
  434         char ktname[MAXCOMLEN];
  435 
  436         if (count <= 0)
  437                 return EINVAL;
  438 
  439         tq = *tqp;
  440         cpu = ncpu;
  441 
  442         __va_start(ap, fmt);
  443         kvsnprintf(ktname, MAXCOMLEN, fmt, ap);
  444         __va_end(ap);
  445 
  446         tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE,
  447             M_WAITOK | M_ZERO);
  448 
  449         for (i = 0; i < count; i++) {
  450                 /*
  451                  * If no specific cpu was specified and more than one thread
  452                  * is to be created, we distribute the threads amongst all
  453                  * cpus.
  454                  */
  455                 if ((ncpu <= -1) && (count > 1))
  456                         cpu = i%ncpus;
  457 
  458                 if (count == 1) {
  459                         error = lwkt_create(taskqueue_thread_loop, tqp,
  460                                             &tq->tq_threads[i], NULL,
  461                                             TDF_NOSTART, cpu,
  462                                             "%s", ktname);
  463                 } else {
  464                         error = lwkt_create(taskqueue_thread_loop, tqp,
  465                                             &tq->tq_threads[i], NULL,
  466                                             TDF_NOSTART, cpu,
  467                                             "%s_%d", ktname, i);
  468                 }
  469                 if (error) {
  470                         kprintf("%s: lwkt_create(%s): error %d", __func__,
  471                             ktname, error);
  472                         tq->tq_threads[i] = NULL;
  473                 } else {
  474                         td = tq->tq_threads[i];
  475                         lwkt_setpri_initial(td, pri);
  476                         lwkt_schedule(td);
  477                         tq->tq_tcount++;
  478                 }
  479         }
  480 
  481         return 0;
  482 }
  483 
  484 void
  485 taskqueue_thread_loop(void *arg)
  486 {
  487         struct taskqueue **tqp, *tq;
  488 
  489         tqp = arg;
  490         tq = *tqp;
  491         TQ_LOCK(tq);
  492         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  493                 taskqueue_run(tq, 1);
  494                 TQ_SLEEP(tq, tq, "tqthr");
  495         }
  496 
  497         /* rendezvous with thread that asked us to terminate */
  498         tq->tq_tcount--;
  499         wakeup_one(tq->tq_threads);
  500         TQ_UNLOCK(tq);
  501         lwkt_exit();
  502 }
  503 
  504 void
  505 taskqueue_thread_enqueue(void *context)
  506 {
  507         struct taskqueue **tqp, *tq;
  508 
  509         tqp = context;
  510         tq = *tqp;
  511 
  512         wakeup_one(tq);
  513 }
  514 
  515 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
  516          register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL, -1));
  517 /*
  518  * XXX: possibly use a different SWI_TQ_MP or so.
  519  * related: sys/interrupt.h
  520  * related: platform/XXX/isa/ipl_funcs.c
  521  */
  522 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0,
  523     register_swi_mp(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL, 
  524                     -1));
  525 
  526 struct taskqueue *taskqueue_thread[MAXCPU];
  527 
  528 static void
  529 taskqueue_init(void)
  530 {
  531         int cpu;
  532 
  533         lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0);
  534         STAILQ_INIT(&taskqueue_queues);
  535 
  536         for (cpu = 0; cpu < ncpus; cpu++) {
  537                 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT,
  538                     taskqueue_thread_enqueue, &taskqueue_thread[cpu]);
  539                 taskqueue_start_threads(&taskqueue_thread[cpu], 1,
  540                     TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu);
  541         }
  542 }
  543 
  544 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL);

Cache object: 4010b7678c184f7168d64f99b876f41c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.