The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_gtaskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * Copyright (c) 2014 Jeff Roberson
    4  * Copyright (c) 2016 Matthew Macy
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD$");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/sched.h>
   46 #include <sys/smp.h>
   47 #include <sys/gtaskqueue.h>
   48 #include <sys/unistd.h>
   49 #include <machine/stdarg.h>
   50 
   51 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
   52 static void     gtaskqueue_thread_enqueue(void *);
   53 static void     gtaskqueue_thread_loop(void *arg);
   54 
   55 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
   56 
   57 struct gtaskqueue_busy {
   58         struct gtask            *tb_running;
   59         u_int                    tb_seq;
   60         LIST_ENTRY(gtaskqueue_busy) tb_link;
   61 };
   62 
   63 struct gtaskqueue {
   64         STAILQ_HEAD(, gtask)    tq_queue;
   65         LIST_HEAD(, gtaskqueue_busy) tq_active;
   66         u_int                   tq_seq;
   67         int                     tq_callouts;
   68         struct mtx_padalign     tq_mutex;
   69         gtaskqueue_enqueue_fn   tq_enqueue;
   70         void                    *tq_context;
   71         char                    *tq_name;
   72         struct thread           **tq_threads;
   73         int                     tq_tcount;
   74         int                     tq_spin;
   75         int                     tq_flags;
   76         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   77         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   78 };
   79 
   80 #define TQ_FLAGS_ACTIVE         (1 << 0)
   81 #define TQ_FLAGS_BLOCKED        (1 << 1)
   82 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   83 
   84 #define DT_CALLOUT_ARMED        (1 << 0)
   85 
   86 #define TQ_LOCK(tq)                                                     \
   87         do {                                                            \
   88                 if ((tq)->tq_spin)                                      \
   89                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   90                 else                                                    \
   91                         mtx_lock(&(tq)->tq_mutex);                      \
   92         } while (0)
   93 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   94 
   95 #define TQ_UNLOCK(tq)                                                   \
   96         do {                                                            \
   97                 if ((tq)->tq_spin)                                      \
   98                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   99                 else                                                    \
  100                         mtx_unlock(&(tq)->tq_mutex);                    \
  101         } while (0)
  102 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  103 
  104 #ifdef INVARIANTS
  105 static void
  106 gtask_dump(struct gtask *gtask)
  107 {
  108         printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
  109                gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
  110 }
  111 #endif
  112 
  113 static __inline int
  114 TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm)
  115 {
  116         if (tq->tq_spin)
  117                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
  118         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
  119 }
  120 
  121 static struct gtaskqueue *
  122 _gtaskqueue_create(const char *name, int mflags,
  123                  taskqueue_enqueue_fn enqueue, void *context,
  124                  int mtxflags, const char *mtxname __unused)
  125 {
  126         struct gtaskqueue *queue;
  127         char *tq_name;
  128 
  129         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
  130         if (!tq_name)
  131                 return (NULL);
  132 
  133         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  134 
  135         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
  136         if (!queue) {
  137                 free(tq_name, M_GTASKQUEUE);
  138                 return (NULL);
  139         }
  140 
  141         STAILQ_INIT(&queue->tq_queue);
  142         LIST_INIT(&queue->tq_active);
  143         queue->tq_enqueue = enqueue;
  144         queue->tq_context = context;
  145         queue->tq_name = tq_name;
  146         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  147         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  148         if (enqueue == gtaskqueue_thread_enqueue)
  149                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  150         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  151 
  152         return (queue);
  153 }
  154 
  155 
  156 /*
  157  * Signal a taskqueue thread to terminate.
  158  */
  159 static void
  160 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
  161 {
  162 
  163         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  164                 wakeup(tq);
  165                 TQ_SLEEP(tq, pp, "gtq_destroy");
  166         }
  167 }
  168 
  169 static void
  170 gtaskqueue_free(struct gtaskqueue *queue)
  171 {
  172 
  173         TQ_LOCK(queue);
  174         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  175         gtaskqueue_terminate(queue->tq_threads, queue);
  176         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
  177         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  178         mtx_destroy(&queue->tq_mutex);
  179         free(queue->tq_threads, M_GTASKQUEUE);
  180         free(queue->tq_name, M_GTASKQUEUE);
  181         free(queue, M_GTASKQUEUE);
  182 }
  183 
  184 int
  185 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
  186 {
  187 #ifdef INVARIANTS
  188         if (queue == NULL) {
  189                 gtask_dump(gtask);
  190                 panic("queue == NULL");
  191         }
  192 #endif
  193         TQ_LOCK(queue);
  194         if (gtask->ta_flags & TASK_ENQUEUED) {
  195                 TQ_UNLOCK(queue);
  196                 return (0);
  197         }
  198         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
  199         gtask->ta_flags |= TASK_ENQUEUED;
  200         TQ_UNLOCK(queue);
  201         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  202                 queue->tq_enqueue(queue->tq_context);
  203         return (0);
  204 }
  205 
  206 static void
  207 gtaskqueue_task_nop_fn(void *context)
  208 {
  209 }
  210 
  211 /*
  212  * Block until all currently queued tasks in this taskqueue
  213  * have begun execution.  Tasks queued during execution of
  214  * this function are ignored.
  215  */
  216 static void
  217 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
  218 {
  219         struct gtask t_barrier;
  220 
  221         if (STAILQ_EMPTY(&queue->tq_queue))
  222                 return;
  223 
  224         /*
  225          * Enqueue our barrier after all current tasks, but with
  226          * the highest priority so that newly queued tasks cannot
  227          * pass it.  Because of the high priority, we can not use
  228          * taskqueue_enqueue_locked directly (which drops the lock
  229          * anyway) so just insert it at tail while we have the
  230          * queue lock.
  231          */
  232         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
  233         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  234         t_barrier.ta_flags |= TASK_ENQUEUED;
  235 
  236         /*
  237          * Once the barrier has executed, all previously queued tasks
  238          * have completed or are currently executing.
  239          */
  240         while (t_barrier.ta_flags & TASK_ENQUEUED)
  241                 TQ_SLEEP(queue, &t_barrier, "gtq_qdrain");
  242 }
  243 
  244 /*
  245  * Block until all currently executing tasks for this taskqueue
  246  * complete.  Tasks that begin execution during the execution
  247  * of this function are ignored.
  248  */
  249 static void
  250 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
  251 {
  252         struct gtaskqueue_busy *tb;
  253         u_int seq;
  254 
  255         if (LIST_EMPTY(&queue->tq_active))
  256                 return;
  257 
  258         /* Block taskq_terminate().*/
  259         queue->tq_callouts++;
  260 
  261         /* Wait for any active task with sequence from the past. */
  262         seq = queue->tq_seq;
  263 restart:
  264         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  265                 if ((int)(tb->tb_seq - seq) <= 0) {
  266                         TQ_SLEEP(queue, tb->tb_running, "gtq_adrain");
  267                         goto restart;
  268                 }
  269         }
  270 
  271         /* Release taskqueue_terminate(). */
  272         queue->tq_callouts--;
  273         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  274                 wakeup_one(queue->tq_threads);
  275 }
  276 
  277 void
  278 gtaskqueue_block(struct gtaskqueue *queue)
  279 {
  280 
  281         TQ_LOCK(queue);
  282         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  283         TQ_UNLOCK(queue);
  284 }
  285 
  286 void
  287 gtaskqueue_unblock(struct gtaskqueue *queue)
  288 {
  289 
  290         TQ_LOCK(queue);
  291         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  292         if (!STAILQ_EMPTY(&queue->tq_queue))
  293                 queue->tq_enqueue(queue->tq_context);
  294         TQ_UNLOCK(queue);
  295 }
  296 
  297 static void
  298 gtaskqueue_run_locked(struct gtaskqueue *queue)
  299 {
  300         struct gtaskqueue_busy tb;
  301         struct gtask *gtask;
  302 
  303         KASSERT(queue != NULL, ("tq is NULL"));
  304         TQ_ASSERT_LOCKED(queue);
  305         tb.tb_running = NULL;
  306         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
  307 
  308         while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
  309                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  310                 gtask->ta_flags &= ~TASK_ENQUEUED;
  311                 tb.tb_running = gtask;
  312                 tb.tb_seq = ++queue->tq_seq;
  313                 TQ_UNLOCK(queue);
  314 
  315                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
  316                 gtask->ta_func(gtask->ta_context);
  317 
  318                 TQ_LOCK(queue);
  319                 wakeup(gtask);
  320         }
  321         LIST_REMOVE(&tb, tb_link);
  322 }
  323 
  324 static int
  325 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
  326 {
  327         struct gtaskqueue_busy *tb;
  328 
  329         TQ_ASSERT_LOCKED(queue);
  330         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  331                 if (tb->tb_running == gtask)
  332                         return (1);
  333         }
  334         return (0);
  335 }
  336 
  337 static int
  338 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
  339 {
  340 
  341         if (gtask->ta_flags & TASK_ENQUEUED)
  342                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
  343         gtask->ta_flags &= ~TASK_ENQUEUED;
  344         return (task_is_running(queue, gtask) ? EBUSY : 0);
  345 }
  346 
  347 int
  348 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
  349 {
  350         int error;
  351 
  352         TQ_LOCK(queue);
  353         error = gtaskqueue_cancel_locked(queue, gtask);
  354         TQ_UNLOCK(queue);
  355 
  356         return (error);
  357 }
  358 
  359 void
  360 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
  361 {
  362 
  363         if (!queue->tq_spin)
  364                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  365 
  366         TQ_LOCK(queue);
  367         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
  368                 TQ_SLEEP(queue, gtask, "gtq_drain");
  369         TQ_UNLOCK(queue);
  370 }
  371 
  372 void
  373 gtaskqueue_drain_all(struct gtaskqueue *queue)
  374 {
  375 
  376         if (!queue->tq_spin)
  377                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  378 
  379         TQ_LOCK(queue);
  380         gtaskqueue_drain_tq_queue(queue);
  381         gtaskqueue_drain_tq_active(queue);
  382         TQ_UNLOCK(queue);
  383 }
  384 
  385 static int
  386 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  387     cpuset_t *mask, const char *name, va_list ap)
  388 {
  389         char ktname[MAXCOMLEN + 1];
  390         struct thread *td;
  391         struct gtaskqueue *tq;
  392         int i, error;
  393 
  394         if (count <= 0)
  395                 return (EINVAL);
  396 
  397         vsnprintf(ktname, sizeof(ktname), name, ap);
  398         tq = *tqp;
  399 
  400         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
  401             M_NOWAIT | M_ZERO);
  402         if (tq->tq_threads == NULL) {
  403                 printf("%s: no memory for %s threads\n", __func__, ktname);
  404                 return (ENOMEM);
  405         }
  406 
  407         for (i = 0; i < count; i++) {
  408                 if (count == 1)
  409                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  410                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  411                 else
  412                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  413                             &tq->tq_threads[i], RFSTOPPED, 0,
  414                             "%s_%d", ktname, i);
  415                 if (error) {
  416                         /* should be ok to continue, taskqueue_free will dtrt */
  417                         printf("%s: kthread_add(%s): error %d", __func__,
  418                             ktname, error);
  419                         tq->tq_threads[i] = NULL;               /* paranoid */
  420                 } else
  421                         tq->tq_tcount++;
  422         }
  423         for (i = 0; i < count; i++) {
  424                 if (tq->tq_threads[i] == NULL)
  425                         continue;
  426                 td = tq->tq_threads[i];
  427                 if (mask) {
  428                         error = cpuset_setthread(td->td_tid, mask);
  429                         /*
  430                          * Failing to pin is rarely an actual fatal error;
  431                          * it'll just affect performance.
  432                          */
  433                         if (error)
  434                                 printf("%s: curthread=%llu: can't pin; "
  435                                     "error=%d\n",
  436                                     __func__,
  437                                     (unsigned long long) td->td_tid,
  438                                     error);
  439                 }
  440                 thread_lock(td);
  441                 sched_prio(td, pri);
  442                 sched_add(td, SRQ_BORING);
  443                 thread_unlock(td);
  444         }
  445 
  446         return (0);
  447 }
  448 
  449 static int
  450 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  451     const char *name, ...)
  452 {
  453         va_list ap;
  454         int error;
  455 
  456         va_start(ap, name);
  457         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  458         va_end(ap);
  459         return (error);
  460 }
  461 
  462 static inline void
  463 gtaskqueue_run_callback(struct gtaskqueue *tq,
  464     enum taskqueue_callback_type cb_type)
  465 {
  466         taskqueue_callback_fn tq_callback;
  467 
  468         TQ_ASSERT_UNLOCKED(tq);
  469         tq_callback = tq->tq_callbacks[cb_type];
  470         if (tq_callback != NULL)
  471                 tq_callback(tq->tq_cb_contexts[cb_type]);
  472 }
  473 
  474 static void
  475 gtaskqueue_thread_loop(void *arg)
  476 {
  477         struct gtaskqueue **tqp, *tq;
  478 
  479         tqp = arg;
  480         tq = *tqp;
  481         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  482         TQ_LOCK(tq);
  483         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  484                 /* XXX ? */
  485                 gtaskqueue_run_locked(tq);
  486                 /*
  487                  * Because taskqueue_run() can drop tq_mutex, we need to
  488                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  489                  * meantime, which means we missed a wakeup.
  490                  */
  491                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  492                         break;
  493                 TQ_SLEEP(tq, tq, "-");
  494         }
  495         gtaskqueue_run_locked(tq);
  496         /*
  497          * This thread is on its way out, so just drop the lock temporarily
  498          * in order to call the shutdown callback.  This allows the callback
  499          * to look at the taskqueue, even just before it dies.
  500          */
  501         TQ_UNLOCK(tq);
  502         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  503         TQ_LOCK(tq);
  504 
  505         /* rendezvous with thread that asked us to terminate */
  506         tq->tq_tcount--;
  507         wakeup_one(tq->tq_threads);
  508         TQ_UNLOCK(tq);
  509         kthread_exit();
  510 }
  511 
  512 static void
  513 gtaskqueue_thread_enqueue(void *context)
  514 {
  515         struct gtaskqueue **tqp, *tq;
  516 
  517         tqp = context;
  518         tq = *tqp;
  519         wakeup_any(tq);
  520 }
  521 
  522 
  523 static struct gtaskqueue *
  524 gtaskqueue_create_fast(const char *name, int mflags,
  525                  taskqueue_enqueue_fn enqueue, void *context)
  526 {
  527         return _gtaskqueue_create(name, mflags, enqueue, context,
  528                         MTX_SPIN, "fast_taskqueue");
  529 }
  530 
  531 
  532 struct taskqgroup_cpu {
  533         LIST_HEAD(, grouptask)  tgc_tasks;
  534         struct gtaskqueue       *tgc_taskq;
  535         int     tgc_cnt;
  536         int     tgc_cpu;
  537 };
  538 
  539 struct taskqgroup {
  540         struct taskqgroup_cpu tqg_queue[MAXCPU];
  541         struct mtx      tqg_lock;
  542         char *          tqg_name;
  543         int             tqg_adjusting;
  544         int             tqg_stride;
  545         int             tqg_cnt;
  546 };
  547 
  548 struct taskq_bind_task {
  549         struct gtask bt_task;
  550         int     bt_cpuid;
  551 };
  552 
  553 static void
  554 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
  555 {
  556         struct taskqgroup_cpu *qcpu;
  557 
  558         qcpu = &qgroup->tqg_queue[idx];
  559         LIST_INIT(&qcpu->tgc_tasks);
  560         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
  561             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
  562         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
  563             "%s_%d", qgroup->tqg_name, idx);
  564         qcpu->tgc_cpu = cpu;
  565 }
  566 
  567 static void
  568 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
  569 {
  570 
  571         gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
  572 }
  573 
  574 /*
  575  * Find the taskq with least # of tasks that doesn't currently have any
  576  * other queues from the uniq identifier.
  577  */
  578 static int
  579 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
  580 {
  581         struct grouptask *n;
  582         int i, idx, mincnt;
  583         int strict;
  584 
  585         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  586         if (qgroup->tqg_cnt == 0)
  587                 return (0);
  588         idx = -1;
  589         mincnt = INT_MAX;
  590         /*
  591          * Two passes;  First scan for a queue with the least tasks that
  592          * does not already service this uniq id.  If that fails simply find
  593          * the queue with the least total tasks;
  594          */
  595         for (strict = 1; mincnt == INT_MAX; strict = 0) {
  596                 for (i = 0; i < qgroup->tqg_cnt; i++) {
  597                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
  598                                 continue;
  599                         if (strict) {
  600                                 LIST_FOREACH(n,
  601                                     &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  602                                         if (n->gt_uniq == uniq)
  603                                                 break;
  604                                 if (n != NULL)
  605                                         continue;
  606                         }
  607                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
  608                         idx = i;
  609                 }
  610         }
  611         if (idx == -1)
  612                 panic("taskqgroup_find: Failed to pick a qid.");
  613 
  614         return (idx);
  615 }
  616 
  617 /*
  618  * smp_started is unusable since it is not set for UP kernels or even for
  619  * SMP kernels when there is 1 CPU.  This is usually handled by adding a
  620  * (mp_ncpus == 1) test, but that would be broken here since we need to
  621  * to synchronize with the SI_SUB_SMP ordering.  Even in the pure SMP case
  622  * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP.
  623  *
  624  * So maintain our own flag.  It must be set after all CPUs are started
  625  * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed
  626  * adjustment is properly delayed.  SI_ORDER_FOURTH is clearly before
  627  * SI_ORDER_ANY and unclearly after the CPUs are started.  It would be
  628  * simpler for adjustment to pass a flag indicating if it is delayed.
  629  */ 
  630 
  631 static int tqg_smp_started;
  632 
  633 static void
  634 tqg_record_smp_started(void *arg)
  635 {
  636         tqg_smp_started = 1;
  637 }
  638 
  639 SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH,
  640         tqg_record_smp_started, NULL);
  641 
  642 void
  643 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
  644     void *uniq, int irq, char *name)
  645 {
  646         cpuset_t mask;
  647         int qid, error;
  648 
  649         gtask->gt_uniq = uniq;
  650         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  651         gtask->gt_irq = irq;
  652         gtask->gt_cpu = -1;
  653         mtx_lock(&qgroup->tqg_lock);
  654         qid = taskqgroup_find(qgroup, uniq);
  655         qgroup->tqg_queue[qid].tgc_cnt++;
  656         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  657         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  658         if (irq != -1 && tqg_smp_started) {
  659                 gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu;
  660                 CPU_ZERO(&mask);
  661                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
  662                 mtx_unlock(&qgroup->tqg_lock);
  663                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  664                 if (error)
  665                         printf("%s: setaffinity failed for %s: %d\n", __func__, gtask->gt_name, error);
  666         } else
  667                 mtx_unlock(&qgroup->tqg_lock);
  668 }
  669 
  670 static void
  671 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
  672 {
  673         cpuset_t mask;
  674         int qid, cpu, error;
  675 
  676         mtx_lock(&qgroup->tqg_lock);
  677         qid = taskqgroup_find(qgroup, gtask->gt_uniq);
  678         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  679         if (gtask->gt_irq != -1) {
  680                 mtx_unlock(&qgroup->tqg_lock);
  681 
  682                 CPU_ZERO(&mask);
  683                 CPU_SET(cpu, &mask);
  684                 error = intr_setaffinity(gtask->gt_irq, CPU_WHICH_IRQ, &mask);
  685                 mtx_lock(&qgroup->tqg_lock);
  686                 if (error)
  687                         printf("%s: %s setaffinity failed: %d\n", __func__, gtask->gt_name, error);
  688 
  689         }
  690         qgroup->tqg_queue[qid].tgc_cnt++;
  691 
  692         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
  693                          gt_list);
  694         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
  695         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  696         mtx_unlock(&qgroup->tqg_lock);
  697 }
  698 
  699 int
  700 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
  701         void *uniq, int cpu, int irq, char *name)
  702 {
  703         cpuset_t mask;
  704         int i, qid, error;
  705 
  706         qid = -1;
  707         gtask->gt_uniq = uniq;
  708         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  709         gtask->gt_irq = irq;
  710         gtask->gt_cpu = cpu;
  711         mtx_lock(&qgroup->tqg_lock);
  712         if (tqg_smp_started) {
  713                 for (i = 0; i < qgroup->tqg_cnt; i++)
  714                         if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  715                                 qid = i;
  716                                 break;
  717                         }
  718                 if (qid == -1) {
  719                         mtx_unlock(&qgroup->tqg_lock);
  720                         printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  721                         return (EINVAL);
  722                 }
  723         } else
  724                 qid = 0;
  725         qgroup->tqg_queue[qid].tgc_cnt++;
  726         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  727         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  728         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  729         mtx_unlock(&qgroup->tqg_lock);
  730 
  731         CPU_ZERO(&mask);
  732         CPU_SET(cpu, &mask);
  733         if (irq != -1 && tqg_smp_started) {
  734                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  735                 if (error)
  736                         printf("%s: setaffinity failed: %d\n", __func__, error);
  737         }
  738         return (0);
  739 }
  740 
  741 static int
  742 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
  743 {
  744         cpuset_t mask;
  745         int i, qid, irq, cpu, error;
  746 
  747         qid = -1;
  748         irq = gtask->gt_irq;
  749         cpu = gtask->gt_cpu;
  750         MPASS(tqg_smp_started);
  751         mtx_lock(&qgroup->tqg_lock);
  752         for (i = 0; i < qgroup->tqg_cnt; i++)
  753                 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  754                         qid = i;
  755                         break;
  756                 }
  757         if (qid == -1) {
  758                 mtx_unlock(&qgroup->tqg_lock);
  759                 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  760                 return (EINVAL);
  761         }
  762         qgroup->tqg_queue[qid].tgc_cnt++;
  763         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  764         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
  765         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  766         mtx_unlock(&qgroup->tqg_lock);
  767 
  768         CPU_ZERO(&mask);
  769         CPU_SET(cpu, &mask);
  770 
  771         if (irq != -1) {
  772                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  773                 if (error)
  774                         printf("%s: setaffinity failed: %d\n", __func__, error);
  775         }
  776         return (0);
  777 }
  778 
  779 void
  780 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
  781 {
  782         int i;
  783 
  784         mtx_lock(&qgroup->tqg_lock);
  785         for (i = 0; i < qgroup->tqg_cnt; i++)
  786                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
  787                         break;
  788         if (i == qgroup->tqg_cnt)
  789                 panic("taskqgroup_detach: task %s not in group\n", gtask->gt_name);
  790         qgroup->tqg_queue[i].tgc_cnt--;
  791         LIST_REMOVE(gtask, gt_list);
  792         mtx_unlock(&qgroup->tqg_lock);
  793         gtask->gt_taskqueue = NULL;
  794 }
  795 
  796 static void
  797 taskqgroup_binder(void *ctx)
  798 {
  799         struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
  800         cpuset_t mask;
  801         int error;
  802 
  803         CPU_ZERO(&mask);
  804         CPU_SET(gtask->bt_cpuid, &mask);
  805         error = cpuset_setthread(curthread->td_tid, &mask);
  806         thread_lock(curthread);
  807         sched_bind(curthread, gtask->bt_cpuid);
  808         thread_unlock(curthread);
  809 
  810         if (error)
  811                 printf("%s: setaffinity failed: %d\n", __func__,
  812                     error);
  813         free(gtask, M_DEVBUF);
  814 }
  815 
  816 static void
  817 taskqgroup_bind(struct taskqgroup *qgroup)
  818 {
  819         struct taskq_bind_task *gtask;
  820         int i;
  821 
  822         /*
  823          * Bind taskqueue threads to specific CPUs, if they have been assigned
  824          * one.
  825          */
  826         if (qgroup->tqg_cnt == 1)
  827                 return;
  828 
  829         for (i = 0; i < qgroup->tqg_cnt; i++) {
  830                 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
  831                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
  832                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
  833                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
  834                     &gtask->bt_task);
  835         }
  836 }
  837 
  838 static int
  839 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  840 {
  841         LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
  842         struct grouptask *gtask;
  843         int i, k, old_cnt, old_cpu, cpu;
  844 
  845         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  846 
  847         if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) {
  848                 printf("%s: failed cnt: %d stride: %d "
  849                     "mp_ncpus: %d tqg_smp_started: %d\n",
  850                     __func__, cnt, stride, mp_ncpus, tqg_smp_started);
  851                 return (EINVAL);
  852         }
  853         if (qgroup->tqg_adjusting) {
  854                 printf("%s failed: adjusting\n", __func__);
  855                 return (EBUSY);
  856         }
  857         qgroup->tqg_adjusting = 1;
  858         old_cnt = qgroup->tqg_cnt;
  859         old_cpu = 0;
  860         if (old_cnt < cnt)
  861                 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
  862         mtx_unlock(&qgroup->tqg_lock);
  863         /*
  864          * Set up queue for tasks added before boot.
  865          */
  866         if (old_cnt == 0) {
  867                 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
  868                     grouptask, gt_list);
  869                 qgroup->tqg_queue[0].tgc_cnt = 0;
  870         }
  871 
  872         /*
  873          * If new taskq threads have been added.
  874          */
  875         cpu = old_cpu;
  876         for (i = old_cnt; i < cnt; i++) {
  877                 taskqgroup_cpu_create(qgroup, i, cpu);
  878 
  879                 for (k = 0; k < stride; k++)
  880                         cpu = CPU_NEXT(cpu);
  881         }
  882         mtx_lock(&qgroup->tqg_lock);
  883         qgroup->tqg_cnt = cnt;
  884         qgroup->tqg_stride = stride;
  885 
  886         /*
  887          * Adjust drivers to use new taskqs.
  888          */
  889         for (i = 0; i < old_cnt; i++) {
  890                 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
  891                         LIST_REMOVE(gtask, gt_list);
  892                         qgroup->tqg_queue[i].tgc_cnt--;
  893                         LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
  894                 }
  895         }
  896         mtx_unlock(&qgroup->tqg_lock);
  897 
  898         while ((gtask = LIST_FIRST(&gtask_head))) {
  899                 LIST_REMOVE(gtask, gt_list);
  900                 if (gtask->gt_cpu == -1)
  901                         taskqgroup_attach_deferred(qgroup, gtask);
  902                 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
  903                         taskqgroup_attach_deferred(qgroup, gtask);
  904         }
  905 
  906 #ifdef INVARIANTS
  907         mtx_lock(&qgroup->tqg_lock);
  908         for (i = 0; i < qgroup->tqg_cnt; i++) {
  909                 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
  910                 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  911                         MPASS(gtask->gt_taskqueue != NULL);
  912         }
  913         mtx_unlock(&qgroup->tqg_lock);
  914 #endif
  915         /*
  916          * If taskq thread count has been reduced.
  917          */
  918         for (i = cnt; i < old_cnt; i++)
  919                 taskqgroup_cpu_remove(qgroup, i);
  920 
  921         taskqgroup_bind(qgroup);
  922 
  923         mtx_lock(&qgroup->tqg_lock);
  924         qgroup->tqg_adjusting = 0;
  925 
  926         return (0);
  927 }
  928 
  929 int
  930 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  931 {
  932         int error;
  933 
  934         mtx_lock(&qgroup->tqg_lock);
  935         error = _taskqgroup_adjust(qgroup, cnt, stride);
  936         mtx_unlock(&qgroup->tqg_lock);
  937 
  938         return (error);
  939 }
  940 
  941 struct taskqgroup *
  942 taskqgroup_create(char *name)
  943 {
  944         struct taskqgroup *qgroup;
  945 
  946         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
  947         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
  948         qgroup->tqg_name = name;
  949         LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
  950 
  951         return (qgroup);
  952 }
  953 
  954 void
  955 taskqgroup_destroy(struct taskqgroup *qgroup)
  956 {
  957 
  958 }

Cache object: 6005332184ec73c22635817fee62b222


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.