The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_gtaskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * Copyright (c) 2014 Jeff Roberson
    4  * Copyright (c) 2016 Matthew Macy
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD: releng/11.2/sys/kern/subr_gtaskqueue.c 333338 2018-05-07 21:42:22Z shurd $");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/interrupt.h>
   37 #include <sys/kernel.h>
   38 #include <sys/kthread.h>
   39 #include <sys/libkern.h>
   40 #include <sys/limits.h>
   41 #include <sys/lock.h>
   42 #include <sys/malloc.h>
   43 #include <sys/mutex.h>
   44 #include <sys/proc.h>
   45 #include <sys/sched.h>
   46 #include <sys/smp.h>
   47 #include <sys/gtaskqueue.h>
   48 #include <sys/unistd.h>
   49 #include <machine/stdarg.h>
   50 
   51 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
   52 static void     gtaskqueue_thread_enqueue(void *);
   53 static void     gtaskqueue_thread_loop(void *arg);
   54 
   55 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
   56 
   57 struct gtaskqueue_busy {
   58         struct gtask    *tb_running;
   59         TAILQ_ENTRY(gtaskqueue_busy) tb_link;
   60 };
   61 
   62 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
   63 
   64 struct gtaskqueue {
   65         STAILQ_HEAD(, gtask)    tq_queue;
   66         gtaskqueue_enqueue_fn   tq_enqueue;
   67         void                    *tq_context;
   68         char                    *tq_name;
   69         TAILQ_HEAD(, gtaskqueue_busy) tq_active;
   70         struct mtx              tq_mutex;
   71         struct thread           **tq_threads;
   72         int                     tq_tcount;
   73         int                     tq_spin;
   74         int                     tq_flags;
   75         int                     tq_callouts;
   76         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   77         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   78 };
   79 
   80 #define TQ_FLAGS_ACTIVE         (1 << 0)
   81 #define TQ_FLAGS_BLOCKED        (1 << 1)
   82 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   83 
   84 #define DT_CALLOUT_ARMED        (1 << 0)
   85 
   86 #define TQ_LOCK(tq)                                                     \
   87         do {                                                            \
   88                 if ((tq)->tq_spin)                                      \
   89                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   90                 else                                                    \
   91                         mtx_lock(&(tq)->tq_mutex);                      \
   92         } while (0)
   93 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   94 
   95 #define TQ_UNLOCK(tq)                                                   \
   96         do {                                                            \
   97                 if ((tq)->tq_spin)                                      \
   98                         mtx_unlock_spin(&(tq)->tq_mutex);               \
   99                 else                                                    \
  100                         mtx_unlock(&(tq)->tq_mutex);                    \
  101         } while (0)
  102 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  103 
  104 #ifdef INVARIANTS
  105 static void
  106 gtask_dump(struct gtask *gtask)
  107 {
  108         printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
  109                gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
  110 }
  111 #endif
  112 
  113 static __inline int
  114 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
  115     int t)
  116 {
  117         if (tq->tq_spin)
  118                 return (msleep_spin(p, m, wm, t));
  119         return (msleep(p, m, pri, wm, t));
  120 }
  121 
  122 static struct gtaskqueue *
  123 _gtaskqueue_create(const char *name, int mflags,
  124                  taskqueue_enqueue_fn enqueue, void *context,
  125                  int mtxflags, const char *mtxname __unused)
  126 {
  127         struct gtaskqueue *queue;
  128         char *tq_name;
  129 
  130         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
  131         if (!tq_name)
  132                 return (NULL);
  133 
  134         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  135 
  136         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
  137         if (!queue) {
  138                 free(tq_name, M_GTASKQUEUE);
  139                 return (NULL);
  140         }
  141 
  142         STAILQ_INIT(&queue->tq_queue);
  143         TAILQ_INIT(&queue->tq_active);
  144         queue->tq_enqueue = enqueue;
  145         queue->tq_context = context;
  146         queue->tq_name = tq_name;
  147         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  148         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  149         if (enqueue == gtaskqueue_thread_enqueue)
  150                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  151         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  152 
  153         return (queue);
  154 }
  155 
  156 
  157 /*
  158  * Signal a taskqueue thread to terminate.
  159  */
  160 static void
  161 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
  162 {
  163 
  164         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  165                 wakeup(tq);
  166                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
  167         }
  168 }
  169 
  170 static void
  171 gtaskqueue_free(struct gtaskqueue *queue)
  172 {
  173 
  174         TQ_LOCK(queue);
  175         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  176         gtaskqueue_terminate(queue->tq_threads, queue);
  177         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
  178         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  179         mtx_destroy(&queue->tq_mutex);
  180         free(queue->tq_threads, M_GTASKQUEUE);
  181         free(queue->tq_name, M_GTASKQUEUE);
  182         free(queue, M_GTASKQUEUE);
  183 }
  184 
  185 int
  186 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
  187 {
  188 #ifdef INVARIANTS
  189         if (queue == NULL) {
  190                 gtask_dump(gtask);
  191                 panic("queue == NULL");
  192         }
  193 #endif
  194         TQ_LOCK(queue);
  195         if (gtask->ta_flags & TASK_ENQUEUED) {
  196                 TQ_UNLOCK(queue);
  197                 return (0);
  198         }
  199         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
  200         gtask->ta_flags |= TASK_ENQUEUED;
  201         TQ_UNLOCK(queue);
  202         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  203                 queue->tq_enqueue(queue->tq_context);
  204         return (0);
  205 }
  206 
  207 static void
  208 gtaskqueue_task_nop_fn(void *context)
  209 {
  210 }
  211 
  212 /*
  213  * Block until all currently queued tasks in this taskqueue
  214  * have begun execution.  Tasks queued during execution of
  215  * this function are ignored.
  216  */
  217 static void
  218 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
  219 {
  220         struct gtask t_barrier;
  221 
  222         if (STAILQ_EMPTY(&queue->tq_queue))
  223                 return;
  224 
  225         /*
  226          * Enqueue our barrier after all current tasks, but with
  227          * the highest priority so that newly queued tasks cannot
  228          * pass it.  Because of the high priority, we can not use
  229          * taskqueue_enqueue_locked directly (which drops the lock
  230          * anyway) so just insert it at tail while we have the
  231          * queue lock.
  232          */
  233         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
  234         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  235         t_barrier.ta_flags |= TASK_ENQUEUED;
  236 
  237         /*
  238          * Once the barrier has executed, all previously queued tasks
  239          * have completed or are currently executing.
  240          */
  241         while (t_barrier.ta_flags & TASK_ENQUEUED)
  242                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
  243 }
  244 
  245 /*
  246  * Block until all currently executing tasks for this taskqueue
  247  * complete.  Tasks that begin execution during the execution
  248  * of this function are ignored.
  249  */
  250 static void
  251 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
  252 {
  253         struct gtaskqueue_busy tb_marker, *tb_first;
  254 
  255         if (TAILQ_EMPTY(&queue->tq_active))
  256                 return;
  257 
  258         /* Block taskq_terminate().*/
  259         queue->tq_callouts++;
  260 
  261         /*
  262          * Wait for all currently executing taskqueue threads
  263          * to go idle.
  264          */
  265         tb_marker.tb_running = TB_DRAIN_WAITER;
  266         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
  267         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
  268                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
  269         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
  270 
  271         /*
  272          * Wakeup any other drain waiter that happened to queue up
  273          * without any intervening active thread.
  274          */
  275         tb_first = TAILQ_FIRST(&queue->tq_active);
  276         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
  277                 wakeup(tb_first);
  278 
  279         /* Release taskqueue_terminate(). */
  280         queue->tq_callouts--;
  281         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  282                 wakeup_one(queue->tq_threads);
  283 }
  284 
  285 void
  286 gtaskqueue_block(struct gtaskqueue *queue)
  287 {
  288 
  289         TQ_LOCK(queue);
  290         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  291         TQ_UNLOCK(queue);
  292 }
  293 
  294 void
  295 gtaskqueue_unblock(struct gtaskqueue *queue)
  296 {
  297 
  298         TQ_LOCK(queue);
  299         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  300         if (!STAILQ_EMPTY(&queue->tq_queue))
  301                 queue->tq_enqueue(queue->tq_context);
  302         TQ_UNLOCK(queue);
  303 }
  304 
  305 static void
  306 gtaskqueue_run_locked(struct gtaskqueue *queue)
  307 {
  308         struct gtaskqueue_busy tb;
  309         struct gtaskqueue_busy *tb_first;
  310         struct gtask *gtask;
  311 
  312         KASSERT(queue != NULL, ("tq is NULL"));
  313         TQ_ASSERT_LOCKED(queue);
  314         tb.tb_running = NULL;
  315 
  316         while (STAILQ_FIRST(&queue->tq_queue)) {
  317                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
  318 
  319                 /*
  320                  * Carefully remove the first task from the queue and
  321                  * clear its TASK_ENQUEUED flag
  322                  */
  323                 gtask = STAILQ_FIRST(&queue->tq_queue);
  324                 KASSERT(gtask != NULL, ("task is NULL"));
  325                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  326                 gtask->ta_flags &= ~TASK_ENQUEUED;
  327                 tb.tb_running = gtask;
  328                 TQ_UNLOCK(queue);
  329 
  330                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
  331                 gtask->ta_func(gtask->ta_context);
  332 
  333                 TQ_LOCK(queue);
  334                 tb.tb_running = NULL;
  335                 wakeup(gtask);
  336 
  337                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
  338                 tb_first = TAILQ_FIRST(&queue->tq_active);
  339                 if (tb_first != NULL &&
  340                     tb_first->tb_running == TB_DRAIN_WAITER)
  341                         wakeup(tb_first);
  342         }
  343 }
  344 
  345 static int
  346 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
  347 {
  348         struct gtaskqueue_busy *tb;
  349 
  350         TQ_ASSERT_LOCKED(queue);
  351         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
  352                 if (tb->tb_running == gtask)
  353                         return (1);
  354         }
  355         return (0);
  356 }
  357 
  358 static int
  359 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
  360 {
  361 
  362         if (gtask->ta_flags & TASK_ENQUEUED)
  363                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
  364         gtask->ta_flags &= ~TASK_ENQUEUED;
  365         return (task_is_running(queue, gtask) ? EBUSY : 0);
  366 }
  367 
  368 int
  369 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
  370 {
  371         int error;
  372 
  373         TQ_LOCK(queue);
  374         error = gtaskqueue_cancel_locked(queue, gtask);
  375         TQ_UNLOCK(queue);
  376 
  377         return (error);
  378 }
  379 
  380 void
  381 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
  382 {
  383 
  384         if (!queue->tq_spin)
  385                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  386 
  387         TQ_LOCK(queue);
  388         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
  389                 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
  390         TQ_UNLOCK(queue);
  391 }
  392 
  393 void
  394 gtaskqueue_drain_all(struct gtaskqueue *queue)
  395 {
  396 
  397         if (!queue->tq_spin)
  398                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  399 
  400         TQ_LOCK(queue);
  401         gtaskqueue_drain_tq_queue(queue);
  402         gtaskqueue_drain_tq_active(queue);
  403         TQ_UNLOCK(queue);
  404 }
  405 
  406 static int
  407 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  408     cpuset_t *mask, const char *name, va_list ap)
  409 {
  410         char ktname[MAXCOMLEN + 1];
  411         struct thread *td;
  412         struct gtaskqueue *tq;
  413         int i, error;
  414 
  415         if (count <= 0)
  416                 return (EINVAL);
  417 
  418         vsnprintf(ktname, sizeof(ktname), name, ap);
  419         tq = *tqp;
  420 
  421         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
  422             M_NOWAIT | M_ZERO);
  423         if (tq->tq_threads == NULL) {
  424                 printf("%s: no memory for %s threads\n", __func__, ktname);
  425                 return (ENOMEM);
  426         }
  427 
  428         for (i = 0; i < count; i++) {
  429                 if (count == 1)
  430                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  431                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  432                 else
  433                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  434                             &tq->tq_threads[i], RFSTOPPED, 0,
  435                             "%s_%d", ktname, i);
  436                 if (error) {
  437                         /* should be ok to continue, taskqueue_free will dtrt */
  438                         printf("%s: kthread_add(%s): error %d", __func__,
  439                             ktname, error);
  440                         tq->tq_threads[i] = NULL;               /* paranoid */
  441                 } else
  442                         tq->tq_tcount++;
  443         }
  444         for (i = 0; i < count; i++) {
  445                 if (tq->tq_threads[i] == NULL)
  446                         continue;
  447                 td = tq->tq_threads[i];
  448                 if (mask) {
  449                         error = cpuset_setthread(td->td_tid, mask);
  450                         /*
  451                          * Failing to pin is rarely an actual fatal error;
  452                          * it'll just affect performance.
  453                          */
  454                         if (error)
  455                                 printf("%s: curthread=%llu: can't pin; "
  456                                     "error=%d\n",
  457                                     __func__,
  458                                     (unsigned long long) td->td_tid,
  459                                     error);
  460                 }
  461                 thread_lock(td);
  462                 sched_prio(td, pri);
  463                 sched_add(td, SRQ_BORING);
  464                 thread_unlock(td);
  465         }
  466 
  467         return (0);
  468 }
  469 
  470 static int
  471 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  472     const char *name, ...)
  473 {
  474         va_list ap;
  475         int error;
  476 
  477         va_start(ap, name);
  478         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  479         va_end(ap);
  480         return (error);
  481 }
  482 
  483 static inline void
  484 gtaskqueue_run_callback(struct gtaskqueue *tq,
  485     enum taskqueue_callback_type cb_type)
  486 {
  487         taskqueue_callback_fn tq_callback;
  488 
  489         TQ_ASSERT_UNLOCKED(tq);
  490         tq_callback = tq->tq_callbacks[cb_type];
  491         if (tq_callback != NULL)
  492                 tq_callback(tq->tq_cb_contexts[cb_type]);
  493 }
  494 
  495 static void
  496 gtaskqueue_thread_loop(void *arg)
  497 {
  498         struct gtaskqueue **tqp, *tq;
  499 
  500         tqp = arg;
  501         tq = *tqp;
  502         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  503         TQ_LOCK(tq);
  504         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  505                 /* XXX ? */
  506                 gtaskqueue_run_locked(tq);
  507                 /*
  508                  * Because taskqueue_run() can drop tq_mutex, we need to
  509                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  510                  * meantime, which means we missed a wakeup.
  511                  */
  512                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  513                         break;
  514                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
  515         }
  516         gtaskqueue_run_locked(tq);
  517         /*
  518          * This thread is on its way out, so just drop the lock temporarily
  519          * in order to call the shutdown callback.  This allows the callback
  520          * to look at the taskqueue, even just before it dies.
  521          */
  522         TQ_UNLOCK(tq);
  523         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  524         TQ_LOCK(tq);
  525 
  526         /* rendezvous with thread that asked us to terminate */
  527         tq->tq_tcount--;
  528         wakeup_one(tq->tq_threads);
  529         TQ_UNLOCK(tq);
  530         kthread_exit();
  531 }
  532 
  533 static void
  534 gtaskqueue_thread_enqueue(void *context)
  535 {
  536         struct gtaskqueue **tqp, *tq;
  537 
  538         tqp = context;
  539         tq = *tqp;
  540         wakeup_one(tq);
  541 }
  542 
  543 
  544 static struct gtaskqueue *
  545 gtaskqueue_create_fast(const char *name, int mflags,
  546                  taskqueue_enqueue_fn enqueue, void *context)
  547 {
  548         return _gtaskqueue_create(name, mflags, enqueue, context,
  549                         MTX_SPIN, "fast_taskqueue");
  550 }
  551 
  552 
  553 struct taskqgroup_cpu {
  554         LIST_HEAD(, grouptask)  tgc_tasks;
  555         struct gtaskqueue       *tgc_taskq;
  556         int     tgc_cnt;
  557         int     tgc_cpu;
  558 };
  559 
  560 struct taskqgroup {
  561         struct taskqgroup_cpu tqg_queue[MAXCPU];
  562         struct mtx      tqg_lock;
  563         char *          tqg_name;
  564         int             tqg_adjusting;
  565         int             tqg_stride;
  566         int             tqg_cnt;
  567 };
  568 
  569 struct taskq_bind_task {
  570         struct gtask bt_task;
  571         int     bt_cpuid;
  572 };
  573 
  574 static void
  575 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
  576 {
  577         struct taskqgroup_cpu *qcpu;
  578 
  579         qcpu = &qgroup->tqg_queue[idx];
  580         LIST_INIT(&qcpu->tgc_tasks);
  581         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
  582             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
  583         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
  584             "%s_%d", qgroup->tqg_name, idx);
  585         qcpu->tgc_cpu = cpu;
  586 }
  587 
  588 static void
  589 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
  590 {
  591 
  592         gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
  593 }
  594 
  595 /*
  596  * Find the taskq with least # of tasks that doesn't currently have any
  597  * other queues from the uniq identifier.
  598  */
  599 static int
  600 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
  601 {
  602         struct grouptask *n;
  603         int i, idx, mincnt;
  604         int strict;
  605 
  606         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  607         if (qgroup->tqg_cnt == 0)
  608                 return (0);
  609         idx = -1;
  610         mincnt = INT_MAX;
  611         /*
  612          * Two passes;  First scan for a queue with the least tasks that
  613          * does not already service this uniq id.  If that fails simply find
  614          * the queue with the least total tasks;
  615          */
  616         for (strict = 1; mincnt == INT_MAX; strict = 0) {
  617                 for (i = 0; i < qgroup->tqg_cnt; i++) {
  618                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
  619                                 continue;
  620                         if (strict) {
  621                                 LIST_FOREACH(n,
  622                                     &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  623                                         if (n->gt_uniq == uniq)
  624                                                 break;
  625                                 if (n != NULL)
  626                                         continue;
  627                         }
  628                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
  629                         idx = i;
  630                 }
  631         }
  632         if (idx == -1)
  633                 panic("taskqgroup_find: Failed to pick a qid.");
  634 
  635         return (idx);
  636 }
  637 
  638 /*
  639  * smp_started is unusable since it is not set for UP kernels or even for
  640  * SMP kernels when there is 1 CPU.  This is usually handled by adding a
  641  * (mp_ncpus == 1) test, but that would be broken here since we need to
  642  * to synchronize with the SI_SUB_SMP ordering.  Even in the pure SMP case
  643  * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP.
  644  *
  645  * So maintain our own flag.  It must be set after all CPUs are started
  646  * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed
  647  * adjustment is properly delayed.  SI_ORDER_FOURTH is clearly before
  648  * SI_ORDER_ANY and unclearly after the CPUs are started.  It would be
  649  * simpler for adjustment to pass a flag indicating if it is delayed.
  650  */ 
  651 
  652 static int tqg_smp_started;
  653 
  654 static void
  655 tqg_record_smp_started(void *arg)
  656 {
  657         tqg_smp_started = 1;
  658 }
  659 
  660 SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH,
  661         tqg_record_smp_started, NULL);
  662 
  663 void
  664 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
  665     void *uniq, int irq, char *name)
  666 {
  667         cpuset_t mask;
  668         int qid, error;
  669 
  670         gtask->gt_uniq = uniq;
  671         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  672         gtask->gt_irq = irq;
  673         gtask->gt_cpu = -1;
  674         mtx_lock(&qgroup->tqg_lock);
  675         qid = taskqgroup_find(qgroup, uniq);
  676         qgroup->tqg_queue[qid].tgc_cnt++;
  677         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  678         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  679         if (irq != -1 && tqg_smp_started) {
  680                 gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu;
  681                 CPU_ZERO(&mask);
  682                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
  683                 mtx_unlock(&qgroup->tqg_lock);
  684                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  685                 if (error)
  686                         printf("%s: setaffinity failed for %s: %d\n", __func__, gtask->gt_name, error);
  687         } else
  688                 mtx_unlock(&qgroup->tqg_lock);
  689 }
  690 
  691 static void
  692 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
  693 {
  694         cpuset_t mask;
  695         int qid, cpu, error;
  696 
  697         mtx_lock(&qgroup->tqg_lock);
  698         qid = taskqgroup_find(qgroup, gtask->gt_uniq);
  699         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  700         if (gtask->gt_irq != -1) {
  701                 mtx_unlock(&qgroup->tqg_lock);
  702 
  703                 CPU_ZERO(&mask);
  704                 CPU_SET(cpu, &mask);
  705                 error = intr_setaffinity(gtask->gt_irq, CPU_WHICH_IRQ, &mask);
  706                 mtx_lock(&qgroup->tqg_lock);
  707                 if (error)
  708                         printf("%s: %s setaffinity failed: %d\n", __func__, gtask->gt_name, error);
  709 
  710         }
  711         qgroup->tqg_queue[qid].tgc_cnt++;
  712 
  713         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
  714                          gt_list);
  715         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
  716         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  717         mtx_unlock(&qgroup->tqg_lock);
  718 }
  719 
  720 int
  721 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
  722         void *uniq, int cpu, int irq, char *name)
  723 {
  724         cpuset_t mask;
  725         int i, qid, error;
  726 
  727         qid = -1;
  728         gtask->gt_uniq = uniq;
  729         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  730         gtask->gt_irq = irq;
  731         gtask->gt_cpu = cpu;
  732         mtx_lock(&qgroup->tqg_lock);
  733         if (tqg_smp_started) {
  734                 for (i = 0; i < qgroup->tqg_cnt; i++)
  735                         if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  736                                 qid = i;
  737                                 break;
  738                         }
  739                 if (qid == -1) {
  740                         mtx_unlock(&qgroup->tqg_lock);
  741                         printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  742                         return (EINVAL);
  743                 }
  744         } else
  745                 qid = 0;
  746         qgroup->tqg_queue[qid].tgc_cnt++;
  747         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  748         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  749         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  750         mtx_unlock(&qgroup->tqg_lock);
  751 
  752         CPU_ZERO(&mask);
  753         CPU_SET(cpu, &mask);
  754         if (irq != -1 && tqg_smp_started) {
  755                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  756                 if (error)
  757                         printf("%s: setaffinity failed: %d\n", __func__, error);
  758         }
  759         return (0);
  760 }
  761 
  762 static int
  763 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
  764 {
  765         cpuset_t mask;
  766         int i, qid, irq, cpu, error;
  767 
  768         qid = -1;
  769         irq = gtask->gt_irq;
  770         cpu = gtask->gt_cpu;
  771         MPASS(tqg_smp_started);
  772         mtx_lock(&qgroup->tqg_lock);
  773         for (i = 0; i < qgroup->tqg_cnt; i++)
  774                 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  775                         qid = i;
  776                         break;
  777                 }
  778         if (qid == -1) {
  779                 mtx_unlock(&qgroup->tqg_lock);
  780                 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  781                 return (EINVAL);
  782         }
  783         qgroup->tqg_queue[qid].tgc_cnt++;
  784         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  785         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
  786         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  787         mtx_unlock(&qgroup->tqg_lock);
  788 
  789         CPU_ZERO(&mask);
  790         CPU_SET(cpu, &mask);
  791 
  792         if (irq != -1) {
  793                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
  794                 if (error)
  795                         printf("%s: setaffinity failed: %d\n", __func__, error);
  796         }
  797         return (0);
  798 }
  799 
  800 void
  801 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
  802 {
  803         int i;
  804 
  805         mtx_lock(&qgroup->tqg_lock);
  806         for (i = 0; i < qgroup->tqg_cnt; i++)
  807                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
  808                         break;
  809         if (i == qgroup->tqg_cnt)
  810                 panic("taskqgroup_detach: task %s not in group\n", gtask->gt_name);
  811         qgroup->tqg_queue[i].tgc_cnt--;
  812         LIST_REMOVE(gtask, gt_list);
  813         mtx_unlock(&qgroup->tqg_lock);
  814         gtask->gt_taskqueue = NULL;
  815 }
  816 
  817 static void
  818 taskqgroup_binder(void *ctx)
  819 {
  820         struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
  821         cpuset_t mask;
  822         int error;
  823 
  824         CPU_ZERO(&mask);
  825         CPU_SET(gtask->bt_cpuid, &mask);
  826         error = cpuset_setthread(curthread->td_tid, &mask);
  827         thread_lock(curthread);
  828         sched_bind(curthread, gtask->bt_cpuid);
  829         thread_unlock(curthread);
  830 
  831         if (error)
  832                 printf("%s: setaffinity failed: %d\n", __func__,
  833                     error);
  834         free(gtask, M_DEVBUF);
  835 }
  836 
  837 static void
  838 taskqgroup_bind(struct taskqgroup *qgroup)
  839 {
  840         struct taskq_bind_task *gtask;
  841         int i;
  842 
  843         /*
  844          * Bind taskqueue threads to specific CPUs, if they have been assigned
  845          * one.
  846          */
  847         if (qgroup->tqg_cnt == 1)
  848                 return;
  849 
  850         for (i = 0; i < qgroup->tqg_cnt; i++) {
  851                 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
  852                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
  853                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
  854                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
  855                     &gtask->bt_task);
  856         }
  857 }
  858 
  859 static int
  860 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  861 {
  862         LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
  863         struct grouptask *gtask;
  864         int i, k, old_cnt, old_cpu, cpu;
  865 
  866         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  867 
  868         if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) {
  869                 printf("%s: failed cnt: %d stride: %d "
  870                     "mp_ncpus: %d tqg_smp_started: %d\n",
  871                     __func__, cnt, stride, mp_ncpus, tqg_smp_started);
  872                 return (EINVAL);
  873         }
  874         if (qgroup->tqg_adjusting) {
  875                 printf("%s failed: adjusting\n", __func__);
  876                 return (EBUSY);
  877         }
  878         qgroup->tqg_adjusting = 1;
  879         old_cnt = qgroup->tqg_cnt;
  880         old_cpu = 0;
  881         if (old_cnt < cnt)
  882                 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
  883         mtx_unlock(&qgroup->tqg_lock);
  884         /*
  885          * Set up queue for tasks added before boot.
  886          */
  887         if (old_cnt == 0) {
  888                 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
  889                     grouptask, gt_list);
  890                 qgroup->tqg_queue[0].tgc_cnt = 0;
  891         }
  892 
  893         /*
  894          * If new taskq threads have been added.
  895          */
  896         cpu = old_cpu;
  897         for (i = old_cnt; i < cnt; i++) {
  898                 taskqgroup_cpu_create(qgroup, i, cpu);
  899 
  900                 for (k = 0; k < stride; k++)
  901                         cpu = CPU_NEXT(cpu);
  902         }
  903         mtx_lock(&qgroup->tqg_lock);
  904         qgroup->tqg_cnt = cnt;
  905         qgroup->tqg_stride = stride;
  906 
  907         /*
  908          * Adjust drivers to use new taskqs.
  909          */
  910         for (i = 0; i < old_cnt; i++) {
  911                 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
  912                         LIST_REMOVE(gtask, gt_list);
  913                         qgroup->tqg_queue[i].tgc_cnt--;
  914                         LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
  915                 }
  916         }
  917         mtx_unlock(&qgroup->tqg_lock);
  918 
  919         while ((gtask = LIST_FIRST(&gtask_head))) {
  920                 LIST_REMOVE(gtask, gt_list);
  921                 if (gtask->gt_cpu == -1)
  922                         taskqgroup_attach_deferred(qgroup, gtask);
  923                 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
  924                         taskqgroup_attach_deferred(qgroup, gtask);
  925         }
  926 
  927 #ifdef INVARIANTS
  928         mtx_lock(&qgroup->tqg_lock);
  929         for (i = 0; i < qgroup->tqg_cnt; i++) {
  930                 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
  931                 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
  932                         MPASS(gtask->gt_taskqueue != NULL);
  933         }
  934         mtx_unlock(&qgroup->tqg_lock);
  935 #endif
  936         /*
  937          * If taskq thread count has been reduced.
  938          */
  939         for (i = cnt; i < old_cnt; i++)
  940                 taskqgroup_cpu_remove(qgroup, i);
  941 
  942         taskqgroup_bind(qgroup);
  943 
  944         mtx_lock(&qgroup->tqg_lock);
  945         qgroup->tqg_adjusting = 0;
  946 
  947         return (0);
  948 }
  949 
  950 int
  951 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
  952 {
  953         int error;
  954 
  955         mtx_lock(&qgroup->tqg_lock);
  956         error = _taskqgroup_adjust(qgroup, cnt, stride);
  957         mtx_unlock(&qgroup->tqg_lock);
  958 
  959         return (error);
  960 }
  961 
  962 struct taskqgroup *
  963 taskqgroup_create(char *name)
  964 {
  965         struct taskqgroup *qgroup;
  966 
  967         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
  968         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
  969         qgroup->tqg_name = name;
  970         LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
  971 
  972         return (qgroup);
  973 }
  974 
  975 void
  976 taskqgroup_destroy(struct taskqgroup *qgroup)
  977 {
  978 
  979 }

Cache object: 21689523f97a0948291903bc69672301


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.