The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_gtaskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * Copyright (c) 2014 Jeff Roberson
    4  * Copyright (c) 2016 Matthew Macy
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD$");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/sched.h>
   46 #include <sys/smp.h>
   47 #include <sys/gtaskqueue.h>
   48 #include <sys/unistd.h>
   49 #include <machine/stdarg.h>
   50 
   51 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
   52 static void     gtaskqueue_thread_enqueue(void *);
   53 static void     gtaskqueue_thread_loop(void *arg);
   54 static int      task_is_running(struct gtaskqueue *queue, struct gtask *gtask);
   55 static void     gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask);
   56 
   57 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
   58 
   59 struct gtaskqueue_busy {
   60         struct gtask            *tb_running;
   61         u_int                    tb_seq;
   62         LIST_ENTRY(gtaskqueue_busy) tb_link;
   63 };
   64 
   65 typedef void (*gtaskqueue_enqueue_fn)(void *context);
   66 
   67 struct gtaskqueue {
   68         STAILQ_HEAD(, gtask)    tq_queue;
   69         LIST_HEAD(, gtaskqueue_busy) tq_active;
   70         u_int                   tq_seq;
   71         int                     tq_callouts;
   72         struct mtx_padalign     tq_mutex;
   73         gtaskqueue_enqueue_fn   tq_enqueue;
   74         void                    *tq_context;
   75         char                    *tq_name;
   76         struct thread           **tq_threads;
   77         int                     tq_tcount;
   78         int                     tq_spin;
   79         int                     tq_flags;
   80         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   81         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   82 };
   83 
   84 #define TQ_FLAGS_ACTIVE         (1 << 0)
   85 #define TQ_FLAGS_BLOCKED        (1 << 1)
   86 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   87 
   88 #define DT_CALLOUT_ARMED        (1 << 0)
   89 
   90 #define TQ_LOCK(tq)                                                     \
   91         do {                                                            \
   92                 if ((tq)->tq_spin)                                      \
   93                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   94                 else                                                    \
   95                         mtx_lock(&(tq)->tq_mutex);                      \
   96         } while (0)
   97 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   98 
   99 #define TQ_UNLOCK(tq)                                                   \
  100         do {                                                            \
  101                 if ((tq)->tq_spin)                                      \
  102                         mtx_unlock_spin(&(tq)->tq_mutex);               \
  103                 else                                                    \
  104                         mtx_unlock(&(tq)->tq_mutex);                    \
  105         } while (0)
  106 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  107 
  108 #ifdef INVARIANTS
  109 static void
  110 gtask_dump(struct gtask *gtask)
  111 {
  112         printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
  113                gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
  114 }
  115 #endif
  116 
  117 static __inline int
  118 TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm)
  119 {
  120         if (tq->tq_spin)
  121                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
  122         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
  123 }
  124 
  125 static struct gtaskqueue *
  126 _gtaskqueue_create(const char *name, int mflags,
  127                  taskqueue_enqueue_fn enqueue, void *context,
  128                  int mtxflags, const char *mtxname __unused)
  129 {
  130         struct gtaskqueue *queue;
  131         char *tq_name;
  132 
  133         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
  134         if (!tq_name)
  135                 return (NULL);
  136 
  137         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  138 
  139         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
  140         if (!queue) {
  141                 free(tq_name, M_GTASKQUEUE);
  142                 return (NULL);
  143         }
  144 
  145         STAILQ_INIT(&queue->tq_queue);
  146         LIST_INIT(&queue->tq_active);
  147         queue->tq_enqueue = enqueue;
  148         queue->tq_context = context;
  149         queue->tq_name = tq_name;
  150         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  151         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  152         if (enqueue == gtaskqueue_thread_enqueue)
  153                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  154         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  155 
  156         return (queue);
  157 }
  158 
  159 
  160 /*
  161  * Signal a taskqueue thread to terminate.
  162  */
  163 static void
  164 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
  165 {
  166 
  167         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  168                 wakeup(tq);
  169                 TQ_SLEEP(tq, pp, "gtq_destroy");
  170         }
  171 }
  172 
  173 static void
  174 gtaskqueue_free(struct gtaskqueue *queue)
  175 {
  176 
  177         TQ_LOCK(queue);
  178         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  179         gtaskqueue_terminate(queue->tq_threads, queue);
  180         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
  181         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  182         mtx_destroy(&queue->tq_mutex);
  183         free(queue->tq_threads, M_GTASKQUEUE);
  184         free(queue->tq_name, M_GTASKQUEUE);
  185         free(queue, M_GTASKQUEUE);
  186 }
  187 
  188 /*
  189  * Wait for all to complete, then prevent it from being enqueued
  190  */
  191 void
  192 grouptask_block(struct grouptask *grouptask)
  193 {
  194         struct gtaskqueue *queue = grouptask->gt_taskqueue;
  195         struct gtask *gtask = &grouptask->gt_task;
  196 
  197 #ifdef INVARIANTS
  198         if (queue == NULL) {
  199                 gtask_dump(gtask);
  200                 panic("queue == NULL");
  201         }
  202 #endif
  203         TQ_LOCK(queue);
  204         gtask->ta_flags |= TASK_NOENQUEUE;
  205         gtaskqueue_drain_locked(queue, gtask);
  206         TQ_UNLOCK(queue);
  207 }
  208 
  209 void
  210 grouptask_unblock(struct grouptask *grouptask)
  211 {
  212         struct gtaskqueue *queue = grouptask->gt_taskqueue;
  213         struct gtask *gtask = &grouptask->gt_task;
  214 
  215 #ifdef INVARIANTS
  216         if (queue == NULL) {
  217                 gtask_dump(gtask);
  218                 panic("queue == NULL");
  219         }
  220 #endif
  221         TQ_LOCK(queue);
  222         gtask->ta_flags &= ~TASK_NOENQUEUE;
  223         TQ_UNLOCK(queue);
  224 }
  225 
  226 int
  227 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
  228 {
  229 #ifdef INVARIANTS
  230         if (queue == NULL) {
  231                 gtask_dump(gtask);
  232                 panic("queue == NULL");
  233         }
  234 #endif
  235         TQ_LOCK(queue);
  236         if (gtask->ta_flags & TASK_ENQUEUED) {
  237                 TQ_UNLOCK(queue);
  238                 return (0);
  239         }
  240         if (gtask->ta_flags & TASK_NOENQUEUE) {
  241                 TQ_UNLOCK(queue);
  242                 return (EAGAIN);
  243         }
  244         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
  245         gtask->ta_flags |= TASK_ENQUEUED;
  246         TQ_UNLOCK(queue);
  247         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  248                 queue->tq_enqueue(queue->tq_context);
  249         return (0);
  250 }
  251 
  252 static void
  253 gtaskqueue_task_nop_fn(void *context)
  254 {
  255 }
  256 
  257 /*
  258  * Block until all currently queued tasks in this taskqueue
  259  * have begun execution.  Tasks queued during execution of
  260  * this function are ignored.
  261  */
  262 static void
  263 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
  264 {
  265         struct gtask t_barrier;
  266 
  267         if (STAILQ_EMPTY(&queue->tq_queue))
  268                 return;
  269 
  270         /*
  271          * Enqueue our barrier after all current tasks, but with
  272          * the highest priority so that newly queued tasks cannot
  273          * pass it.  Because of the high priority, we can not use
  274          * taskqueue_enqueue_locked directly (which drops the lock
  275          * anyway) so just insert it at tail while we have the
  276          * queue lock.
  277          */
  278         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
  279         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  280         t_barrier.ta_flags |= TASK_ENQUEUED;
  281 
  282         /*
  283          * Once the barrier has executed, all previously queued tasks
  284          * have completed or are currently executing.
  285          */
  286         while (t_barrier.ta_flags & TASK_ENQUEUED)
  287                 TQ_SLEEP(queue, &t_barrier, "gtq_qdrain");
  288 }
  289 
  290 /*
  291  * Block until all currently executing tasks for this taskqueue
  292  * complete.  Tasks that begin execution during the execution
  293  * of this function are ignored.
  294  */
  295 static void
  296 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
  297 {
  298         struct gtaskqueue_busy *tb;
  299         u_int seq;
  300 
  301         if (LIST_EMPTY(&queue->tq_active))
  302                 return;
  303 
  304         /* Block taskq_terminate().*/
  305         queue->tq_callouts++;
  306 
  307         /* Wait for any active task with sequence from the past. */
  308         seq = queue->tq_seq;
  309 restart:
  310         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  311                 if ((int)(tb->tb_seq - seq) <= 0) {
  312                         TQ_SLEEP(queue, tb->tb_running, "gtq_adrain");
  313                         goto restart;
  314                 }
  315         }
  316 
  317         /* Release taskqueue_terminate(). */
  318         queue->tq_callouts--;
  319         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  320                 wakeup_one(queue->tq_threads);
  321 }
  322 
  323 void
  324 gtaskqueue_block(struct gtaskqueue *queue)
  325 {
  326 
  327         TQ_LOCK(queue);
  328         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  329         TQ_UNLOCK(queue);
  330 }
  331 
  332 void
  333 gtaskqueue_unblock(struct gtaskqueue *queue)
  334 {
  335 
  336         TQ_LOCK(queue);
  337         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  338         if (!STAILQ_EMPTY(&queue->tq_queue))
  339                 queue->tq_enqueue(queue->tq_context);
  340         TQ_UNLOCK(queue);
  341 }
  342 
  343 static void
  344 gtaskqueue_run_locked(struct gtaskqueue *queue)
  345 {
  346         struct gtaskqueue_busy tb;
  347         struct gtask *gtask;
  348 
  349         KASSERT(queue != NULL, ("tq is NULL"));
  350         TQ_ASSERT_LOCKED(queue);
  351         tb.tb_running = NULL;
  352         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
  353 
  354         while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
  355                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  356                 gtask->ta_flags &= ~TASK_ENQUEUED;
  357                 tb.tb_running = gtask;
  358                 tb.tb_seq = ++queue->tq_seq;
  359                 TQ_UNLOCK(queue);
  360 
  361                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
  362                 gtask->ta_func(gtask->ta_context);
  363 
  364                 TQ_LOCK(queue);
  365                 wakeup(gtask);
  366         }
  367         LIST_REMOVE(&tb, tb_link);
  368 }
  369 
  370 static int
  371 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
  372 {
  373         struct gtaskqueue_busy *tb;
  374 
  375         TQ_ASSERT_LOCKED(queue);
  376         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  377                 if (tb->tb_running == gtask)
  378                         return (1);
  379         }
  380         return (0);
  381 }
  382 
  383 static int
  384 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
  385 {
  386 
  387         if (gtask->ta_flags & TASK_ENQUEUED)
  388                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
  389         gtask->ta_flags &= ~TASK_ENQUEUED;
  390         return (task_is_running(queue, gtask) ? EBUSY : 0);
  391 }
  392 
  393 int
  394 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
  395 {
  396         int error;
  397 
  398         TQ_LOCK(queue);
  399         error = gtaskqueue_cancel_locked(queue, gtask);
  400         TQ_UNLOCK(queue);
  401 
  402         return (error);
  403 }
  404 
  405 static void
  406 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask)
  407 {
  408         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
  409                 TQ_SLEEP(queue, gtask, "gtq_drain");
  410 }
  411 
  412 void
  413 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
  414 {
  415 
  416         if (!queue->tq_spin)
  417                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  418 
  419         TQ_LOCK(queue);
  420         gtaskqueue_drain_locked(queue, gtask);
  421         TQ_UNLOCK(queue);
  422 }
  423 
  424 void
  425 gtaskqueue_drain_all(struct gtaskqueue *queue)
  426 {
  427 
  428         if (!queue->tq_spin)
  429                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  430 
  431         TQ_LOCK(queue);
  432         gtaskqueue_drain_tq_queue(queue);
  433         gtaskqueue_drain_tq_active(queue);
  434         TQ_UNLOCK(queue);
  435 }
  436 
  437 static int
  438 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  439     cpuset_t *mask, const char *name, va_list ap)
  440 {
  441         char ktname[MAXCOMLEN + 1];
  442         struct thread *td;
  443         struct gtaskqueue *tq;
  444         int i, error;
  445 
  446         if (count <= 0)
  447                 return (EINVAL);
  448 
  449         vsnprintf(ktname, sizeof(ktname), name, ap);
  450         tq = *tqp;
  451 
  452         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
  453             M_NOWAIT | M_ZERO);
  454         if (tq->tq_threads == NULL) {
  455                 printf("%s: no memory for %s threads\n", __func__, ktname);
  456                 return (ENOMEM);
  457         }
  458 
  459         for (i = 0; i < count; i++) {
  460                 if (count == 1)
  461                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  462                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  463                 else
  464                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  465                             &tq->tq_threads[i], RFSTOPPED, 0,
  466                             "%s_%d", ktname, i);
  467                 if (error) {
  468                         /* should be ok to continue, taskqueue_free will dtrt */
  469                         printf("%s: kthread_add(%s): error %d", __func__,
  470                             ktname, error);
  471                         tq->tq_threads[i] = NULL;               /* paranoid */
  472                 } else
  473                         tq->tq_tcount++;
  474         }
  475         for (i = 0; i < count; i++) {
  476                 if (tq->tq_threads[i] == NULL)
  477                         continue;
  478                 td = tq->tq_threads[i];
  479                 if (mask) {
  480                         error = cpuset_setthread(td->td_tid, mask);
  481                         /*
  482                          * Failing to pin is rarely an actual fatal error;
  483                          * it'll just affect performance.
  484                          */
  485                         if (error)
  486                                 printf("%s: curthread=%llu: can't pin; "
  487                                     "error=%d\n",
  488                                     __func__,
  489                                     (unsigned long long) td->td_tid,
  490                                     error);
  491                 }
  492                 thread_lock(td);
  493                 sched_prio(td, pri);
  494                 sched_add(td, SRQ_BORING);
  495                 thread_unlock(td);
  496         }
  497 
  498         return (0);
  499 }
  500 
  501 static int
  502 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  503     const char *name, ...)
  504 {
  505         va_list ap;
  506         int error;
  507 
  508         va_start(ap, name);
  509         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  510         va_end(ap);
  511         return (error);
  512 }
  513 
  514 static inline void
  515 gtaskqueue_run_callback(struct gtaskqueue *tq,
  516     enum taskqueue_callback_type cb_type)
  517 {
  518         taskqueue_callback_fn tq_callback;
  519 
  520         TQ_ASSERT_UNLOCKED(tq);
  521         tq_callback = tq->tq_callbacks[cb_type];
  522         if (tq_callback != NULL)
  523                 tq_callback(tq->tq_cb_contexts[cb_type]);
  524 }
  525 
  526 static void
  527 gtaskqueue_thread_loop(void *arg)
  528 {
  529         struct gtaskqueue **tqp, *tq;
  530 
  531         tqp = arg;
  532         tq = *tqp;
  533         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  534         TQ_LOCK(tq);
  535         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  536                 /* XXX ? */
  537                 gtaskqueue_run_locked(tq);
  538                 /*
  539                  * Because taskqueue_run() can drop tq_mutex, we need to
  540                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  541                  * meantime, which means we missed a wakeup.
  542                  */
  543                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  544                         break;
  545                 TQ_SLEEP(tq, tq, "-");
  546         }
  547         gtaskqueue_run_locked(tq);
  548         /*
  549          * This thread is on its way out, so just drop the lock temporarily
  550          * in order to call the shutdown callback.  This allows the callback
  551          * to look at the taskqueue, even just before it dies.
  552          */
  553         TQ_UNLOCK(tq);
  554         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  555         TQ_LOCK(tq);
  556 
  557         /* rendezvous with thread that asked us to terminate */
  558         tq->tq_tcount--;
  559         wakeup_one(tq->tq_threads);
  560         TQ_UNLOCK(tq);
  561         kthread_exit();
  562 }
  563 
  564 static void
  565 gtaskqueue_thread_enqueue(void *context)
  566 {
  567         struct gtaskqueue **tqp, *tq;
  568 
  569         tqp = context;
  570         tq = *tqp;
  571         wakeup_any(tq);
  572 }
  573 
  574 
  575 static struct gtaskqueue *
  576 gtaskqueue_create_fast(const char *name, int mflags,
  577                  taskqueue_enqueue_fn enqueue, void *context)
  578 {
  579         return _gtaskqueue_create(name, mflags, enqueue, context,
  580                         MTX_SPIN, "fast_taskqueue");
  581 }
  582 
  583 
  584 struct taskqgroup_cpu {
  585         LIST_HEAD(, grouptask)  tgc_tasks;
  586         struct gtaskqueue       *tgc_taskq;
  587         int     tgc_cnt;
  588         int     tgc_cpu;
  589 };
  590 
  591 struct taskqgroup {
  592         struct taskqgroup_cpu tqg_queue[MAXCPU];
  593         struct mtx      tqg_lock;
  594         const char *    tqg_name;
  595         int             tqg_adjusting;
  596         int             tqg_stride;
  597         int             tqg_cnt;
  598 };
  599 
  600 struct taskq_bind_task {
  601         struct gtask bt_task;
  602         int     bt_cpuid;
  603 };
  604 
  605 static void
  606 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
  607 {
  608         struct taskqgroup_cpu *qcpu;
  609 
  610         qcpu = &qgroup->tqg_queue[idx];
  611         LIST_INIT(&qcpu->tgc_tasks);
  612         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
  613             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
  614         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
  615             "%s_%d", qgroup->tqg_name, idx);
  616         qcpu->tgc_cpu = cpu;
  617 }
  618 
  619 static void
  620 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
  621 {
  622 
  623         gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
  624 }
  625 
  626 /*
  627  * Find the taskq with least # of tasks that doesn't currently have any
  628  * other queues from the uniq identifier.
  629  */
  630 static int
  631 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
  632 {
  633         struct grouptask *n;
  634         int i, idx, mincnt;
  635         int strict;
  636 
  637         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  638         if (qgroup->tqg_cnt == 0)
  639                 return (0);
  640         idx = -1;
  641         mincnt = INT_MAX;
  642         /*
  643          * Two passes;  First scan for a queue with the least tasks that
  644          * does not already service this uniq id.  If that fails simply find
  645          * the queue with the least total tasks;
  646          */
  647         for (strict = 1; mincnt == INT_MAX; strict = 0) {
  648                 for (i = 0; i < qgroup->tqg_cnt; i++) {
  649                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
  650                                 continue;
  651                         if (strict) {
  652                                 LIST_FOREACH(n,
  653                                     &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  654                                         if (n->gt_uniq == uniq)
  655                                                 break;
  656                                 if (n != NULL)
  657                                         continue;
  658                         }
  659                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
  660                         idx = i;
  661                 }
  662         }
  663         if (idx == -1)
  664                 panic("%s: failed to pick a qid.", __func__);
  665 
  666         return (idx);
  667 }
  668 
  669 /*
  670  * smp_started is unusable since it is not set for UP kernels or even for
  671  * SMP kernels when there is 1 CPU.  This is usually handled by adding a
  672  * (mp_ncpus == 1) test, but that would be broken here since we need to
  673  * to synchronize with the SI_SUB_SMP ordering.  Even in the pure SMP case
  674  * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP.
  675  *
  676  * So maintain our own flag.  It must be set after all CPUs are started
  677  * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed
  678  * adjustment is properly delayed.  SI_ORDER_FOURTH is clearly before
  679  * SI_ORDER_ANY and unclearly after the CPUs are started.  It would be
  680  * simpler for adjustment to pass a flag indicating if it is delayed.
  681  */ 
  682 
  683 static int tqg_smp_started;
  684 
  685 static void
  686 tqg_record_smp_started(void *arg)
  687 {
  688         tqg_smp_started = 1;
  689 }
  690 
  691 SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH,
  692         tqg_record_smp_started, NULL);
  693 
  694 void
  695 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
  696     void *uniq, int irq, const char *name)
  697 {
  698         cpuset_t mask;
  699         int qid, error;
  700 
  701         gtask->gt_uniq = uniq;
  702         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  703         gtask->gt_irq = irq;
  704         gtask->gt_cpu = -1;
  705         mtx_lock(&qgroup->tqg_lock);
  706         qid = taskqgroup_find(qgroup, uniq);
  707         qgroup->tqg_queue[qid].tgc_cnt++;
  708         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  709         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  710         if (irq != -1 && tqg_smp_started) {
  711                 gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu;
  712                 CPU_ZERO(&mask);
  713                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
  714                 mtx_unlock(&qgroup->tqg_lock);
  715                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  716                 if (error)
  717                         printf("%s: binding interrupt failed for %s: %d\n",
  718                             __func__, gtask->gt_name, error);
  719         } else
  720                 mtx_unlock(&qgroup->tqg_lock);
  721 }
  722 
  723 static void
  724 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
  725 {
  726         cpuset_t mask;
  727         int qid, cpu, error;
  728 
  729         mtx_lock(&qgroup->tqg_lock);
  730         qid = taskqgroup_find(qgroup, gtask->gt_uniq);
  731         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  732         if (gtask->gt_irq != -1) {
  733                 mtx_unlock(&qgroup->tqg_lock);
  734 
  735                 CPU_ZERO(&mask);
  736                 CPU_SET(cpu, &mask);
  737                 error = intr_setaffinity(gtask->gt_irq, CPU_WHICH_IRQ, &mask);
  738                 mtx_lock(&qgroup->tqg_lock);
  739                 if (error)
  740                         printf("%s: binding interrupt failed for %s: %d\n",
  741                             __func__, gtask->gt_name, error);
  742 
  743         }
  744         qgroup->tqg_queue[qid].tgc_cnt++;
  745         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  746         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
  747         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  748         mtx_unlock(&qgroup->tqg_lock);
  749 }
  750 
  751 int
  752 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
  753     void *uniq, int cpu, int irq, const char *name)
  754 {
  755         cpuset_t mask;
  756         int i, qid, error;
  757 
  758         qid = -1;
  759         gtask->gt_uniq = uniq;
  760         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  761         gtask->gt_irq = irq;
  762         gtask->gt_cpu = cpu;
  763         mtx_lock(&qgroup->tqg_lock);
  764         if (tqg_smp_started) {
  765                 for (i = 0; i < qgroup->tqg_cnt; i++)
  766                         if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  767                                 qid = i;
  768                                 break;
  769                         }
  770                 if (qid == -1) {
  771                         mtx_unlock(&qgroup->tqg_lock);
  772                         printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  773                         return (EINVAL);
  774                 }
  775         } else
  776                 qid = 0;
  777         qgroup->tqg_queue[qid].tgc_cnt++;
  778         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  779         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  780         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  781         mtx_unlock(&qgroup->tqg_lock);
  782 
  783         CPU_ZERO(&mask);
  784         CPU_SET(cpu, &mask);
  785         if (irq != -1 && tqg_smp_started) {
  786                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  787                 if (error)
  788                         printf("%s: binding interrupt failed for %s: %d\n",
  789                             __func__, gtask->gt_name, error);
  790         }
  791         return (0);
  792 }
  793 
  794 static int
  795 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
  796 {
  797         cpuset_t mask;
  798         int i, qid, irq, cpu, error;
  799 
  800         qid = -1;
  801         irq = gtask->gt_irq;
  802         cpu = gtask->gt_cpu;
  803         MPASS(tqg_smp_started);
  804         mtx_lock(&qgroup->tqg_lock);
  805         for (i = 0; i < qgroup->tqg_cnt; i++)
  806                 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  807                         qid = i;
  808                         break;
  809                 }
  810         if (qid == -1) {
  811                 mtx_unlock(&qgroup->tqg_lock);
  812                 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  813                 return (EINVAL);
  814         }
  815         qgroup->tqg_queue[qid].tgc_cnt++;
  816         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  817         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
  818         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  819         mtx_unlock(&qgroup->tqg_lock);
  820 
  821         CPU_ZERO(&mask);
  822         CPU_SET(cpu, &mask);
  823 
  824         if (irq != -1) {
  825                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  826                 if (error)
  827                         printf("%s: binding interrupt failed for %s: %d\n",
  828                             __func__, gtask->gt_name, error);
  829         }
  830         return (0);
  831 }
  832 
  833 void
  834 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
  835 {
  836         int i;
  837 
  838         grouptask_block(gtask);
  839         mtx_lock(&qgroup->tqg_lock);
  840         for (i = 0; i < qgroup->tqg_cnt; i++)
  841                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
  842                         break;
  843         if (i == qgroup->tqg_cnt)
  844                 panic("%s: task %s not in group", __func__, gtask->gt_name);
  845         qgroup->tqg_queue[i].tgc_cnt--;
  846         LIST_REMOVE(gtask, gt_list);
  847         mtx_unlock(&qgroup->tqg_lock);
  848         gtask->gt_taskqueue = NULL;
  849         gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE;
  850 }
  851 
  852 static void
  853 taskqgroup_binder(void *ctx)
  854 {
  855         struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
  856         cpuset_t mask;
  857         int error;
  858 
  859         CPU_ZERO(&mask);
  860         CPU_SET(gtask->bt_cpuid, &mask);
  861         error = cpuset_setthread(curthread->td_tid, &mask);
  862         thread_lock(curthread);
  863         sched_bind(curthread, gtask->bt_cpuid);
  864         thread_unlock(curthread);
  865 
  866         if (error)
  867                 printf("%s: binding curthread failed: %d\n", __func__, error);
  868         free(gtask, M_DEVBUF);
  869 }
  870 
  871 static void
  872 taskqgroup_bind(struct taskqgroup *qgroup)
  873 {
  874         struct taskq_bind_task *gtask;
  875         int i;
  876 
  877         /*
  878          * Bind taskqueue threads to specific CPUs, if they have been assigned
  879          * one.
  880          */
  881         if (qgroup->tqg_cnt == 1)
  882                 return;
  883 
  884         for (i = 0; i < qgroup->tqg_cnt; i++) {
  885                 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
  886                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
  887                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
  888                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
  889                     &gtask->bt_task);
  890         }
  891 }
  892 
  893 static int
  894 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  895 {
  896         LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
  897         struct grouptask *gtask;
  898         int i, k, old_cnt, old_cpu, cpu;
  899 
  900         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  901 
  902         if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) {
  903                 printf("%s: failed cnt: %d stride: %d "
  904                     "mp_ncpus: %d tqg_smp_started: %d\n",
  905                     __func__, cnt, stride, mp_ncpus, tqg_smp_started);
  906                 return (EINVAL);
  907         }
  908         if (qgroup->tqg_adjusting) {
  909                 printf("%s failed: adjusting\n", __func__);
  910                 return (EBUSY);
  911         }
  912         qgroup->tqg_adjusting = 1;
  913         old_cnt = qgroup->tqg_cnt;
  914         old_cpu = 0;
  915         if (old_cnt < cnt)
  916                 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
  917         mtx_unlock(&qgroup->tqg_lock);
  918         /*
  919          * Set up queue for tasks added before boot.
  920          */
  921         if (old_cnt == 0) {
  922                 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
  923                     grouptask, gt_list);
  924                 qgroup->tqg_queue[0].tgc_cnt = 0;
  925         }
  926 
  927         /*
  928          * If new taskq threads have been added.
  929          */
  930         cpu = old_cpu;
  931         for (i = old_cnt; i < cnt; i++) {
  932                 taskqgroup_cpu_create(qgroup, i, cpu);
  933 
  934                 for (k = 0; k < stride; k++)
  935                         cpu = CPU_NEXT(cpu);
  936         }
  937         mtx_lock(&qgroup->tqg_lock);
  938         qgroup->tqg_cnt = cnt;
  939         qgroup->tqg_stride = stride;
  940 
  941         /*
  942          * Adjust drivers to use new taskqs.
  943          */
  944         for (i = 0; i < old_cnt; i++) {
  945                 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
  946                         LIST_REMOVE(gtask, gt_list);
  947                         qgroup->tqg_queue[i].tgc_cnt--;
  948                         LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
  949                 }
  950         }
  951         mtx_unlock(&qgroup->tqg_lock);
  952 
  953         while ((gtask = LIST_FIRST(&gtask_head))) {
  954                 LIST_REMOVE(gtask, gt_list);
  955                 if (gtask->gt_cpu == -1)
  956                         taskqgroup_attach_deferred(qgroup, gtask);
  957                 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
  958                         taskqgroup_attach_deferred(qgroup, gtask);
  959         }
  960 
  961 #ifdef INVARIANTS
  962         mtx_lock(&qgroup->tqg_lock);
  963         for (i = 0; i < qgroup->tqg_cnt; i++) {
  964                 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
  965                 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  966                         MPASS(gtask->gt_taskqueue != NULL);
  967         }
  968         mtx_unlock(&qgroup->tqg_lock);
  969 #endif
  970         /*
  971          * If taskq thread count has been reduced.
  972          */
  973         for (i = cnt; i < old_cnt; i++)
  974                 taskqgroup_cpu_remove(qgroup, i);
  975 
  976         taskqgroup_bind(qgroup);
  977 
  978         mtx_lock(&qgroup->tqg_lock);
  979         qgroup->tqg_adjusting = 0;
  980 
  981         return (0);
  982 }
  983 
  984 int
  985 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  986 {
  987         int error;
  988 
  989         mtx_lock(&qgroup->tqg_lock);
  990         error = _taskqgroup_adjust(qgroup, cnt, stride);
  991         mtx_unlock(&qgroup->tqg_lock);
  992 
  993         return (error);
  994 }
  995 
  996 struct taskqgroup *
  997 taskqgroup_create(const char *name)
  998 {
  999         struct taskqgroup *qgroup;
 1000 
 1001         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
 1002         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
 1003         qgroup->tqg_name = name;
 1004         LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
 1005 
 1006         return (qgroup);
 1007 }
 1008 
 1009 void
 1010 taskqgroup_destroy(struct taskqgroup *qgroup)
 1011 {
 1012 }
 1013 

Cache object: f140613864400c6c19f1262ff9b2ed17


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.