The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_gtaskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * Copyright (c) 2014 Jeff Roberson
    4  * Copyright (c) 2016 Matthew Macy
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD: releng/11.1/sys/kern/subr_gtaskqueue.c 305250 2016-09-01 22:27:47Z nwhitehorn $");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/sched.h>
   46 #include <sys/smp.h>
   47 #include <sys/gtaskqueue.h>
   48 #include <sys/unistd.h>
   49 #include <machine/stdarg.h>
   50 
   51 static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues");
   52 static void     gtaskqueue_thread_enqueue(void *);
   53 static void     gtaskqueue_thread_loop(void *arg);
   54 
   55 
   56 struct gtaskqueue_busy {
   57         struct gtask    *tb_running;
   58         TAILQ_ENTRY(gtaskqueue_busy) tb_link;
   59 };
   60 
   61 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
   62 
   63 struct gtaskqueue {
   64         STAILQ_HEAD(, gtask)    tq_queue;
   65         gtaskqueue_enqueue_fn   tq_enqueue;
   66         void                    *tq_context;
   67         char                    *tq_name;
   68         TAILQ_HEAD(, gtaskqueue_busy) tq_active;
   69         struct mtx              tq_mutex;
   70         struct thread           **tq_threads;
   71         int                     tq_tcount;
   72         int                     tq_spin;
   73         int                     tq_flags;
   74         int                     tq_callouts;
   75         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   76         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   77 };
   78 
   79 #define TQ_FLAGS_ACTIVE         (1 << 0)
   80 #define TQ_FLAGS_BLOCKED        (1 << 1)
   81 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   82 
   83 #define DT_CALLOUT_ARMED        (1 << 0)
   84 
   85 #define TQ_LOCK(tq)                                                     \
   86         do {                                                            \
   87                 if ((tq)->tq_spin)                                      \
   88                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   89                 else                                                    \
   90                         mtx_lock(&(tq)->tq_mutex);                      \
   91         } while (0)
   92 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   93 
   94 #define TQ_UNLOCK(tq)                                                   \
   95         do {                                                            \
   96                 if ((tq)->tq_spin)                                      \
   97                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   98                 else                                                    \
   99                         mtx_unlock(&(tq)->tq_mutex);                    \
  100         } while (0)
  101 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  102 
  103 static __inline int
  104 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  105     int t)
  106 {
  107         if (tq->tq_spin)
  108                 return (msleep_spin(p, m, wm, t));
  109         return (msleep(p, m, pri, wm, t));
  110 }
  111 
  112 static struct gtaskqueue *
  113 _gtaskqueue_create(const char *name, int mflags,
  114                  taskqueue_enqueue_fn enqueue, void *context,
  115                  int mtxflags, const char *mtxname __unused)
  116 {
  117         struct gtaskqueue *queue;
  118         char *tq_name;
  119 
  120         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
  121         if (!tq_name)
  122                 return (NULL);
  123 
  124         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  125 
  126         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
  127         if (!queue)
  128                 return (NULL);
  129 
  130         STAILQ_INIT(&queue->tq_queue);
  131         TAILQ_INIT(&queue->tq_active);
  132         queue->tq_enqueue = enqueue;
  133         queue->tq_context = context;
  134         queue->tq_name = tq_name;
  135         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  136         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  137         if (enqueue == gtaskqueue_thread_enqueue)
  138                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  139         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  140 
  141         return (queue);
  142 }
  143 
  144 
  145 /*
  146  * Signal a taskqueue thread to terminate.
  147  */
  148 static void
  149 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
  150 {
  151 
  152         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  153                 wakeup(tq);
  154                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  155         }
  156 }
  157 
  158 static void
  159 gtaskqueue_free(struct gtaskqueue *queue)
  160 {
  161 
  162         TQ_LOCK(queue);
  163         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  164         gtaskqueue_terminate(queue->tq_threads, queue);
  165         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  166         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  167         mtx_destroy(&queue->tq_mutex);
  168         free(queue->tq_threads, M_GTASKQUEUE);
  169         free(queue->tq_name, M_GTASKQUEUE);
  170         free(queue, M_GTASKQUEUE);
  171 }
  172 
  173 int
  174 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
  175 {
  176         TQ_LOCK(queue);
  177         if (gtask->ta_flags & TASK_ENQUEUED) {
  178                 TQ_UNLOCK(queue);
  179                 return (0);
  180         }
  181         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
  182         gtask->ta_flags |= TASK_ENQUEUED;
  183         TQ_UNLOCK(queue);
  184         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  185                 queue->tq_enqueue(queue->tq_context);
  186         return (0);
  187 }
  188 
  189 static void
  190 gtaskqueue_task_nop_fn(void *context)
  191 {
  192 }
  193 
  194 /*
  195  * Block until all currently queued tasks in this taskqueue
  196  * have begun execution.  Tasks queued during execution of
  197  * this function are ignored.
  198  */
  199 static void
  200 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
  201 {
  202         struct gtask t_barrier;
  203 
  204         if (STAILQ_EMPTY(&queue->tq_queue))
  205                 return;
  206 
  207         /*
  208          * Enqueue our barrier after all current tasks, but with
  209          * the highest priority so that newly queued tasks cannot
  210          * pass it.  Because of the high priority, we can not use
  211          * taskqueue_enqueue_locked directly (which drops the lock
  212          * anyway) so just insert it at tail while we have the
  213          * queue lock.
  214          */
  215         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
  216         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  217         t_barrier.ta_flags |= TASK_ENQUEUED;
  218 
  219         /*
  220          * Once the barrier has executed, all previously queued tasks
  221          * have completed or are currently executing.
  222          */
  223         while (t_barrier.ta_flags & TASK_ENQUEUED)
  224                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
  225 }
  226 
  227 /*
  228  * Block until all currently executing tasks for this taskqueue
  229  * complete.  Tasks that begin execution during the execution
  230  * of this function are ignored.
  231  */
  232 static void
  233 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
  234 {
  235         struct gtaskqueue_busy tb_marker, *tb_first;
  236 
  237         if (TAILQ_EMPTY(&queue->tq_active))
  238                 return;
  239 
  240         /* Block taskq_terminate().*/
  241         queue->tq_callouts++;
  242 
  243         /*
  244          * Wait for all currently executing taskqueue threads
  245          * to go idle.
  246          */
  247         tb_marker.tb_running = TB_DRAIN_WAITER;
  248         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
  249         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
  250                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
  251         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
  252 
  253         /*
  254          * Wakeup any other drain waiter that happened to queue up
  255          * without any intervening active thread.
  256          */
  257         tb_first = TAILQ_FIRST(&queue->tq_active);
  258         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
  259                 wakeup(tb_first);
  260 
  261         /* Release taskqueue_terminate(). */
  262         queue->tq_callouts--;
  263         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  264                 wakeup_one(queue->tq_threads);
  265 }
  266 
  267 void
  268 gtaskqueue_block(struct gtaskqueue *queue)
  269 {
  270 
  271         TQ_LOCK(queue);
  272         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  273         TQ_UNLOCK(queue);
  274 }
  275 
  276 void
  277 gtaskqueue_unblock(struct gtaskqueue *queue)
  278 {
  279 
  280         TQ_LOCK(queue);
  281         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  282         if (!STAILQ_EMPTY(&queue->tq_queue))
  283                 queue->tq_enqueue(queue->tq_context);
  284         TQ_UNLOCK(queue);
  285 }
  286 
  287 static void
  288 gtaskqueue_run_locked(struct gtaskqueue *queue)
  289 {
  290         struct gtaskqueue_busy tb;
  291         struct gtaskqueue_busy *tb_first;
  292         struct gtask *gtask;
  293 
  294         KASSERT(queue != NULL, ("tq is NULL"));
  295         TQ_ASSERT_LOCKED(queue);
  296         tb.tb_running = NULL;
  297 
  298         while (STAILQ_FIRST(&queue->tq_queue)) {
  299                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  300 
  301                 /*
  302                  * Carefully remove the first task from the queue and
  303                  * clear its TASK_ENQUEUED flag
  304                  */
  305                 gtask = STAILQ_FIRST(&queue->tq_queue);
  306                 KASSERT(gtask != NULL, ("task is NULL"));
  307                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  308                 gtask->ta_flags &= ~TASK_ENQUEUED;
  309                 tb.tb_running = gtask;
  310                 TQ_UNLOCK(queue);
  311 
  312                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
  313                 gtask->ta_func(gtask->ta_context);
  314 
  315                 TQ_LOCK(queue);
  316                 tb.tb_running = NULL;
  317                 wakeup(gtask);
  318 
  319                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  320                 tb_first = TAILQ_FIRST(&queue->tq_active);
  321                 if (tb_first != NULL &&
  322                     tb_first->tb_running == TB_DRAIN_WAITER)
  323                         wakeup(tb_first);
  324         }
  325 }
  326 
  327 static int
  328 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
  329 {
  330         struct gtaskqueue_busy *tb;
  331 
  332         TQ_ASSERT_LOCKED(queue);
  333         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  334                 if (tb->tb_running == gtask)
  335                         return (1);
  336         }
  337         return (0);
  338 }
  339 
  340 static int
  341 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
  342 {
  343 
  344         if (gtask->ta_flags & TASK_ENQUEUED)
  345                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
  346         gtask->ta_flags &= ~TASK_ENQUEUED;
  347         return (task_is_running(queue, gtask) ? EBUSY : 0);
  348 }
  349 
  350 int
  351 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
  352 {
  353         int error;
  354 
  355         TQ_LOCK(queue);
  356         error = gtaskqueue_cancel_locked(queue, gtask);
  357         TQ_UNLOCK(queue);
  358 
  359         return (error);
  360 }
  361 
  362 void
  363 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
  364 {
  365 
  366         if (!queue->tq_spin)
  367                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  368 
  369         TQ_LOCK(queue);
  370         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
  371                 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
  372         TQ_UNLOCK(queue);
  373 }
  374 
  375 void
  376 gtaskqueue_drain_all(struct gtaskqueue *queue)
  377 {
  378 
  379         if (!queue->tq_spin)
  380                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  381 
  382         TQ_LOCK(queue);
  383         gtaskqueue_drain_tq_queue(queue);
  384         gtaskqueue_drain_tq_active(queue);
  385         TQ_UNLOCK(queue);
  386 }
  387 
  388 static int
  389 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  390     cpuset_t *mask, const char *name, va_list ap)
  391 {
  392         char ktname[MAXCOMLEN + 1];
  393         struct thread *td;
  394         struct gtaskqueue *tq;
  395         int i, error;
  396 
  397         if (count <= 0)
  398                 return (EINVAL);
  399 
  400         vsnprintf(ktname, sizeof(ktname), name, ap);
  401         tq = *tqp;
  402 
  403         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
  404             M_NOWAIT | M_ZERO);
  405         if (tq->tq_threads == NULL) {
  406                 printf("%s: no memory for %s threads\n", __func__, ktname);
  407                 return (ENOMEM);
  408         }
  409 
  410         for (i = 0; i < count; i++) {
  411                 if (count == 1)
  412                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  413                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  414                 else
  415                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  416                             &tq->tq_threads[i], RFSTOPPED, 0,
  417                             "%s_%d", ktname, i);
  418                 if (error) {
  419                         /* should be ok to continue, taskqueue_free will dtrt */
  420                         printf("%s: kthread_add(%s): error %d", __func__,
  421                             ktname, error);
  422                         tq->tq_threads[i] = NULL;               /* paranoid */
  423                 } else
  424                         tq->tq_tcount++;
  425         }
  426         for (i = 0; i < count; i++) {
  427                 if (tq->tq_threads[i] == NULL)
  428                         continue;
  429                 td = tq->tq_threads[i];
  430                 if (mask) {
  431                         error = cpuset_setthread(td->td_tid, mask);
  432                         /*
  433                          * Failing to pin is rarely an actual fatal error;
  434                          * it'll just affect performance.
  435                          */
  436                         if (error)
  437                                 printf("%s: curthread=%llu: can't pin; "
  438                                     "error=%d\n",
  439                                     __func__,
  440                                     (unsigned long long) td->td_tid,
  441                                     error);
  442                 }
  443                 thread_lock(td);
  444                 sched_prio(td, pri);
  445                 sched_add(td, SRQ_BORING);
  446                 thread_unlock(td);
  447         }
  448 
  449         return (0);
  450 }
  451 
  452 static int
  453 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  454     const char *name, ...)
  455 {
  456         va_list ap;
  457         int error;
  458 
  459         va_start(ap, name);
  460         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  461         va_end(ap);
  462         return (error);
  463 }
  464 
  465 static inline void
  466 gtaskqueue_run_callback(struct gtaskqueue *tq,
  467     enum taskqueue_callback_type cb_type)
  468 {
  469         taskqueue_callback_fn tq_callback;
  470 
  471         TQ_ASSERT_UNLOCKED(tq);
  472         tq_callback = tq->tq_callbacks[cb_type];
  473         if (tq_callback != NULL)
  474                 tq_callback(tq->tq_cb_contexts[cb_type]);
  475 }
  476 
  477 static void
  478 gtaskqueue_thread_loop(void *arg)
  479 {
  480         struct gtaskqueue **tqp, *tq;
  481 
  482         tqp = arg;
  483         tq = *tqp;
  484         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  485         TQ_LOCK(tq);
  486         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  487                 /* XXX ? */
  488                 gtaskqueue_run_locked(tq);
  489                 /*
  490                  * Because taskqueue_run() can drop tq_mutex, we need to
  491                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  492                  * meantime, which means we missed a wakeup.
  493                  */
  494                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  495                         break;
  496                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  497         }
  498         gtaskqueue_run_locked(tq);
  499         /*
  500          * This thread is on its way out, so just drop the lock temporarily
  501          * in order to call the shutdown callback.  This allows the callback
  502          * to look at the taskqueue, even just before it dies.
  503          */
  504         TQ_UNLOCK(tq);
  505         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  506         TQ_LOCK(tq);
  507 
  508         /* rendezvous with thread that asked us to terminate */
  509         tq->tq_tcount--;
  510         wakeup_one(tq->tq_threads);
  511         TQ_UNLOCK(tq);
  512         kthread_exit();
  513 }
  514 
  515 static void
  516 gtaskqueue_thread_enqueue(void *context)
  517 {
  518         struct gtaskqueue **tqp, *tq;
  519 
  520         tqp = context;
  521         tq = *tqp;
  522         wakeup_one(tq);
  523 }
  524 
  525 
  526 static struct gtaskqueue *
  527 gtaskqueue_create_fast(const char *name, int mflags,
  528                  taskqueue_enqueue_fn enqueue, void *context)
  529 {
  530         return _gtaskqueue_create(name, mflags, enqueue, context,
  531                         MTX_SPIN, "fast_taskqueue");
  532 }
  533 
  534 
  535 struct taskqgroup_cpu {
  536         LIST_HEAD(, grouptask)  tgc_tasks;
  537         struct gtaskqueue       *tgc_taskq;
  538         int     tgc_cnt;
  539         int     tgc_cpu;
  540 };
  541 
  542 struct taskqgroup {
  543         struct taskqgroup_cpu tqg_queue[MAXCPU];
  544         struct mtx      tqg_lock;
  545         char *          tqg_name;
  546         int             tqg_adjusting;
  547         int             tqg_stride;
  548         int             tqg_cnt;
  549 };
  550 
  551 struct taskq_bind_task {
  552         struct gtask bt_task;
  553         int     bt_cpuid;
  554 };
  555 
  556 static void
  557 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx)
  558 {
  559         struct taskqgroup_cpu *qcpu;
  560 
  561         qcpu = &qgroup->tqg_queue[idx];
  562         LIST_INIT(&qcpu->tgc_tasks);
  563         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
  564             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
  565         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
  566             "%s_%d", qgroup->tqg_name, idx);
  567         qcpu->tgc_cpu = idx * qgroup->tqg_stride;
  568 }
  569 
  570 static void
  571 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
  572 {
  573 
  574         gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
  575 }
  576 
  577 /*
  578  * Find the taskq with least # of tasks that doesn't currently have any
  579  * other queues from the uniq identifier.
  580  */
  581 static int
  582 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
  583 {
  584         struct grouptask *n;
  585         int i, idx, mincnt;
  586         int strict;
  587 
  588         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  589         if (qgroup->tqg_cnt == 0)
  590                 return (0);
  591         idx = -1;
  592         mincnt = INT_MAX;
  593         /*
  594          * Two passes;  First scan for a queue with the least tasks that
  595          * does not already service this uniq id.  If that fails simply find
  596          * the queue with the least total tasks;
  597          */
  598         for (strict = 1; mincnt == INT_MAX; strict = 0) {
  599                 for (i = 0; i < qgroup->tqg_cnt; i++) {
  600                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
  601                                 continue;
  602                         if (strict) {
  603                                 LIST_FOREACH(n,
  604                                     &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  605                                         if (n->gt_uniq == uniq)
  606                                                 break;
  607                                 if (n != NULL)
  608                                         continue;
  609                         }
  610                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
  611                         idx = i;
  612                 }
  613         }
  614         if (idx == -1)
  615                 panic("taskqgroup_find: Failed to pick a qid.");
  616 
  617         return (idx);
  618 }
  619 
  620 void
  621 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
  622     void *uniq, int irq, char *name)
  623 {
  624         cpuset_t mask;
  625         int qid;
  626 
  627         gtask->gt_uniq = uniq;
  628         gtask->gt_name = name;
  629         gtask->gt_irq = irq;
  630         gtask->gt_cpu = -1;
  631         mtx_lock(&qgroup->tqg_lock);
  632         qid = taskqgroup_find(qgroup, uniq);
  633         qgroup->tqg_queue[qid].tgc_cnt++;
  634         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  635         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  636         if (irq != -1 && smp_started) {
  637                 CPU_ZERO(&mask);
  638                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
  639                 mtx_unlock(&qgroup->tqg_lock);
  640                 intr_setaffinity(irq, &mask);
  641         } else
  642                 mtx_unlock(&qgroup->tqg_lock);
  643 }
  644 
  645 int
  646 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
  647         void *uniq, int cpu, int irq, char *name)
  648 {
  649         cpuset_t mask;
  650         int i, qid;
  651 
  652         qid = -1;
  653         gtask->gt_uniq = uniq;
  654         gtask->gt_name = name;
  655         gtask->gt_irq = irq;
  656         gtask->gt_cpu = cpu;
  657         mtx_lock(&qgroup->tqg_lock);
  658         if (smp_started) {
  659                 for (i = 0; i < qgroup->tqg_cnt; i++)
  660                         if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  661                                 qid = i;
  662                                 break;
  663                         }
  664                 if (qid == -1) {
  665                         mtx_unlock(&qgroup->tqg_lock);
  666                         return (EINVAL);
  667                 }
  668         } else
  669                 qid = 0;
  670         qgroup->tqg_queue[qid].tgc_cnt++;
  671         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  672         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  673         if (irq != -1 && smp_started) {
  674                 CPU_ZERO(&mask);
  675                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
  676                 mtx_unlock(&qgroup->tqg_lock);
  677                 intr_setaffinity(irq, &mask);
  678         } else
  679                 mtx_unlock(&qgroup->tqg_lock);
  680         return (0);
  681 }
  682 
  683 void
  684 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
  685 {
  686         int i;
  687 
  688         mtx_lock(&qgroup->tqg_lock);
  689         for (i = 0; i < qgroup->tqg_cnt; i++)
  690                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
  691                         break;
  692         if (i == qgroup->tqg_cnt)
  693                 panic("taskqgroup_detach: task not in group\n");
  694         qgroup->tqg_queue[i].tgc_cnt--;
  695         LIST_REMOVE(gtask, gt_list);
  696         mtx_unlock(&qgroup->tqg_lock);
  697         gtask->gt_taskqueue = NULL;
  698 }
  699 
  700 static void
  701 taskqgroup_binder(void *ctx)
  702 {
  703         struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
  704         cpuset_t mask;
  705         int error;
  706 
  707         CPU_ZERO(&mask);
  708         CPU_SET(gtask->bt_cpuid, &mask);
  709         error = cpuset_setthread(curthread->td_tid, &mask);
  710         thread_lock(curthread);
  711         sched_bind(curthread, gtask->bt_cpuid);
  712         thread_unlock(curthread);
  713 
  714         if (error)
  715                 printf("taskqgroup_binder: setaffinity failed: %d\n",
  716                     error);
  717         free(gtask, M_DEVBUF);
  718 }
  719 
  720 static void
  721 taskqgroup_bind(struct taskqgroup *qgroup)
  722 {
  723         struct taskq_bind_task *gtask;
  724         int i;
  725 
  726         /*
  727          * Bind taskqueue threads to specific CPUs, if they have been assigned
  728          * one.
  729          */
  730         for (i = 0; i < qgroup->tqg_cnt; i++) {
  731                 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_NOWAIT);
  732                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
  733                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
  734                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
  735                     &gtask->bt_task);
  736         }
  737 }
  738 
  739 static int
  740 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  741 {
  742         LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
  743         cpuset_t mask;
  744         struct grouptask *gtask;
  745         int i, k, old_cnt, qid, cpu;
  746 
  747         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  748 
  749         if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
  750                 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n",
  751                            cnt, stride, mp_ncpus, smp_started);
  752                 return (EINVAL);
  753         }
  754         if (qgroup->tqg_adjusting) {
  755                 printf("taskqgroup_adjust failed: adjusting\n");
  756                 return (EBUSY);
  757         }
  758         qgroup->tqg_adjusting = 1;
  759         old_cnt = qgroup->tqg_cnt;
  760         mtx_unlock(&qgroup->tqg_lock);
  761         /*
  762          * Set up queue for tasks added before boot.
  763          */
  764         if (old_cnt == 0) {
  765                 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
  766                     grouptask, gt_list);
  767                 qgroup->tqg_queue[0].tgc_cnt = 0;
  768         }
  769 
  770         /*
  771          * If new taskq threads have been added.
  772          */
  773         for (i = old_cnt; i < cnt; i++)
  774                 taskqgroup_cpu_create(qgroup, i);
  775         mtx_lock(&qgroup->tqg_lock);
  776         qgroup->tqg_cnt = cnt;
  777         qgroup->tqg_stride = stride;
  778 
  779         /*
  780          * Adjust drivers to use new taskqs.
  781          */
  782         for (i = 0; i < old_cnt; i++) {
  783                 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
  784                         LIST_REMOVE(gtask, gt_list);
  785                         qgroup->tqg_queue[i].tgc_cnt--;
  786                         LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
  787                 }
  788         }
  789 
  790         while ((gtask = LIST_FIRST(&gtask_head))) {
  791                 LIST_REMOVE(gtask, gt_list);
  792                 if (gtask->gt_cpu == -1)
  793                         qid = taskqgroup_find(qgroup, gtask->gt_uniq);
  794                 else {
  795                         for (i = 0; i < qgroup->tqg_cnt; i++)
  796                                 if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) {
  797                                         qid = i;
  798                                         break;
  799                                 }
  800                 }
  801                 qgroup->tqg_queue[qid].tgc_cnt++;
  802                 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
  803                     gt_list);
  804                 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  805         }
  806         /*
  807          * Set new CPU and IRQ affinity
  808          */
  809         cpu = CPU_FIRST();
  810         for (i = 0; i < cnt; i++) {
  811                 qgroup->tqg_queue[i].tgc_cpu = cpu;
  812                 for (k = 0; k < qgroup->tqg_stride; k++)
  813                         cpu = CPU_NEXT(cpu);
  814                 CPU_ZERO(&mask);
  815                 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask);
  816                 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) {
  817                         if (gtask->gt_irq == -1)
  818                                 continue;
  819                         intr_setaffinity(gtask->gt_irq, &mask);
  820                 }
  821         }
  822         mtx_unlock(&qgroup->tqg_lock);
  823 
  824         /*
  825          * If taskq thread count has been reduced.
  826          */
  827         for (i = cnt; i < old_cnt; i++)
  828                 taskqgroup_cpu_remove(qgroup, i);
  829 
  830         mtx_lock(&qgroup->tqg_lock);
  831         qgroup->tqg_adjusting = 0;
  832 
  833         taskqgroup_bind(qgroup);
  834 
  835         return (0);
  836 }
  837 
  838 int
  839 taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride)
  840 {
  841         int error;
  842 
  843         mtx_lock(&qgroup->tqg_lock);
  844         error = _taskqgroup_adjust(qgroup, cpu, stride);
  845         mtx_unlock(&qgroup->tqg_lock);
  846 
  847         return (error);
  848 }
  849 
  850 struct taskqgroup *
  851 taskqgroup_create(char *name)
  852 {
  853         struct taskqgroup *qgroup;
  854 
  855         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
  856         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
  857         qgroup->tqg_name = name;
  858         LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
  859 
  860         return (qgroup);
  861 }
  862 
  863 void
  864 taskqgroup_destroy(struct taskqgroup *qgroup)
  865 {
  866 
  867 }

Cache object: 7313d74084a2cd3aaf9a9db33e4f468f


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.