The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/subr_gtaskqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2000 Doug Rabson
    3  * Copyright (c) 2014 Jeff Roberson
    4  * Copyright (c) 2016 Matthew Macy
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without
    8  * modification, are permitted provided that the following conditions
    9  * are met:
   10  * 1. Redistributions of source code must retain the above copyright
   11  *    notice, this list of conditions and the following disclaimer.
   12  * 2. Redistributions in binary form must reproduce the above copyright
   13  *    notice, this list of conditions and the following disclaimer in the
   14  *    documentation and/or other materials provided with the distribution.
   15  *
   16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   26  * SUCH DAMAGE.
   27  */
   28 
   29 #include <sys/cdefs.h>
   30 __FBSDID("$FreeBSD$");
   31 
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/bus.h>
   35 #include <sys/cpuset.h>
   36 #include <sys/kernel.h>
   37 #include <sys/kthread.h>
   38 #include <sys/libkern.h>
   39 #include <sys/limits.h>
   40 #include <sys/lock.h>
   41 #include <sys/malloc.h>
   42 #include <sys/mutex.h>
   43 #include <sys/proc.h>
   44 #include <sys/epoch.h>
   45 #include <sys/sched.h>
   46 #include <sys/smp.h>
   47 #include <sys/gtaskqueue.h>
   48 #include <sys/unistd.h>
   49 #include <machine/stdarg.h>
   50 
   51 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
   52 static void     gtaskqueue_thread_enqueue(void *);
   53 static void     gtaskqueue_thread_loop(void *arg);
   54 static int      task_is_running(struct gtaskqueue *queue, struct gtask *gtask);
   55 static void     gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask);
   56 
   57 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
   58 
   59 struct gtaskqueue_busy {
   60         struct gtask            *tb_running;
   61         u_int                    tb_seq;
   62         LIST_ENTRY(gtaskqueue_busy) tb_link;
   63 };
   64 
   65 typedef void (*gtaskqueue_enqueue_fn)(void *context);
   66 
   67 struct gtaskqueue {
   68         STAILQ_HEAD(, gtask)    tq_queue;
   69         LIST_HEAD(, gtaskqueue_busy) tq_active;
   70         u_int                   tq_seq;
   71         int                     tq_callouts;
   72         struct mtx_padalign     tq_mutex;
   73         gtaskqueue_enqueue_fn   tq_enqueue;
   74         void                    *tq_context;
   75         char                    *tq_name;
   76         struct thread           **tq_threads;
   77         int                     tq_tcount;
   78         int                     tq_spin;
   79         int                     tq_flags;
   80         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
   81         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
   82 };
   83 
   84 #define TQ_FLAGS_ACTIVE         (1 << 0)
   85 #define TQ_FLAGS_BLOCKED        (1 << 1)
   86 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
   87 
   88 #define DT_CALLOUT_ARMED        (1 << 0)
   89 
   90 #define TQ_LOCK(tq)                                                     \
   91         do {                                                            \
   92                 if ((tq)->tq_spin)                                      \
   93                         mtx_lock_spin(&(tq)->tq_mutex);                 \
   94                 else                                                    \
   95                         mtx_lock(&(tq)->tq_mutex);                      \
   96         } while (0)
   97 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
   98 
   99 #define TQ_UNLOCK(tq)                                                   \
  100         do {                                                            \
  101                 if ((tq)->tq_spin)                                      \
  102                         mtx_unlock_spin(&(tq)->tq_mutex);               \
  103                 else                                                    \
  104                         mtx_unlock(&(tq)->tq_mutex);                    \
  105         } while (0)
  106 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
  107 
  108 #ifdef INVARIANTS
  109 static void
  110 gtask_dump(struct gtask *gtask)
  111 {
  112         printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
  113                gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
  114 }
  115 #endif
  116 
  117 static __inline int
  118 TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm)
  119 {
  120         if (tq->tq_spin)
  121                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
  122         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
  123 }
  124 
  125 static struct gtaskqueue *
  126 _gtaskqueue_create(const char *name, int mflags,
  127                  taskqueue_enqueue_fn enqueue, void *context,
  128                  int mtxflags, const char *mtxname __unused)
  129 {
  130         struct gtaskqueue *queue;
  131         char *tq_name;
  132 
  133         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
  134         if (!tq_name)
  135                 return (NULL);
  136 
  137         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
  138 
  139         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
  140         if (!queue) {
  141                 free(tq_name, M_GTASKQUEUE);
  142                 return (NULL);
  143         }
  144 
  145         STAILQ_INIT(&queue->tq_queue);
  146         LIST_INIT(&queue->tq_active);
  147         queue->tq_enqueue = enqueue;
  148         queue->tq_context = context;
  149         queue->tq_name = tq_name;
  150         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
  151         queue->tq_flags |= TQ_FLAGS_ACTIVE;
  152         if (enqueue == gtaskqueue_thread_enqueue)
  153                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
  154         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
  155 
  156         return (queue);
  157 }
  158 
  159 /*
  160  * Signal a taskqueue thread to terminate.
  161  */
  162 static void
  163 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
  164 {
  165 
  166         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
  167                 wakeup(tq);
  168                 TQ_SLEEP(tq, pp, "gtq_destroy");
  169         }
  170 }
  171 
  172 static void __unused
  173 gtaskqueue_free(struct gtaskqueue *queue)
  174 {
  175 
  176         TQ_LOCK(queue);
  177         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
  178         gtaskqueue_terminate(queue->tq_threads, queue);
  179         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
  180         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
  181         mtx_destroy(&queue->tq_mutex);
  182         free(queue->tq_threads, M_GTASKQUEUE);
  183         free(queue->tq_name, M_GTASKQUEUE);
  184         free(queue, M_GTASKQUEUE);
  185 }
  186 
  187 /*
  188  * Wait for all to complete, then prevent it from being enqueued
  189  */
  190 void
  191 grouptask_block(struct grouptask *grouptask)
  192 {
  193         struct gtaskqueue *queue = grouptask->gt_taskqueue;
  194         struct gtask *gtask = &grouptask->gt_task;
  195 
  196 #ifdef INVARIANTS
  197         if (queue == NULL) {
  198                 gtask_dump(gtask);
  199                 panic("queue == NULL");
  200         }
  201 #endif
  202         TQ_LOCK(queue);
  203         gtask->ta_flags |= TASK_NOENQUEUE;
  204         gtaskqueue_drain_locked(queue, gtask);
  205         TQ_UNLOCK(queue);
  206 }
  207 
  208 void
  209 grouptask_unblock(struct grouptask *grouptask)
  210 {
  211         struct gtaskqueue *queue = grouptask->gt_taskqueue;
  212         struct gtask *gtask = &grouptask->gt_task;
  213 
  214 #ifdef INVARIANTS
  215         if (queue == NULL) {
  216                 gtask_dump(gtask);
  217                 panic("queue == NULL");
  218         }
  219 #endif
  220         TQ_LOCK(queue);
  221         gtask->ta_flags &= ~TASK_NOENQUEUE;
  222         TQ_UNLOCK(queue);
  223 }
  224 
  225 int
  226 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
  227 {
  228 #ifdef INVARIANTS
  229         if (queue == NULL) {
  230                 gtask_dump(gtask);
  231                 panic("queue == NULL");
  232         }
  233 #endif
  234         TQ_LOCK(queue);
  235         if (gtask->ta_flags & TASK_ENQUEUED) {
  236                 TQ_UNLOCK(queue);
  237                 return (0);
  238         }
  239         if (gtask->ta_flags & TASK_NOENQUEUE) {
  240                 TQ_UNLOCK(queue);
  241                 return (EAGAIN);
  242         }
  243         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
  244         gtask->ta_flags |= TASK_ENQUEUED;
  245         TQ_UNLOCK(queue);
  246         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
  247                 queue->tq_enqueue(queue->tq_context);
  248         return (0);
  249 }
  250 
  251 static void
  252 gtaskqueue_task_nop_fn(void *context)
  253 {
  254 }
  255 
  256 /*
  257  * Block until all currently queued tasks in this taskqueue
  258  * have begun execution.  Tasks queued during execution of
  259  * this function are ignored.
  260  */
  261 static void
  262 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
  263 {
  264         struct gtask t_barrier;
  265 
  266         if (STAILQ_EMPTY(&queue->tq_queue))
  267                 return;
  268 
  269         /*
  270          * Enqueue our barrier after all current tasks, but with
  271          * the highest priority so that newly queued tasks cannot
  272          * pass it.  Because of the high priority, we can not use
  273          * taskqueue_enqueue_locked directly (which drops the lock
  274          * anyway) so just insert it at tail while we have the
  275          * queue lock.
  276          */
  277         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
  278         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
  279         t_barrier.ta_flags |= TASK_ENQUEUED;
  280 
  281         /*
  282          * Once the barrier has executed, all previously queued tasks
  283          * have completed or are currently executing.
  284          */
  285         while (t_barrier.ta_flags & TASK_ENQUEUED)
  286                 TQ_SLEEP(queue, &t_barrier, "gtq_qdrain");
  287 }
  288 
  289 /*
  290  * Block until all currently executing tasks for this taskqueue
  291  * complete.  Tasks that begin execution during the execution
  292  * of this function are ignored.
  293  */
  294 static void
  295 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
  296 {
  297         struct gtaskqueue_busy *tb;
  298         u_int seq;
  299 
  300         if (LIST_EMPTY(&queue->tq_active))
  301                 return;
  302 
  303         /* Block taskq_terminate().*/
  304         queue->tq_callouts++;
  305 
  306         /* Wait for any active task with sequence from the past. */
  307         seq = queue->tq_seq;
  308 restart:
  309         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  310                 if ((int)(tb->tb_seq - seq) <= 0) {
  311                         TQ_SLEEP(queue, tb->tb_running, "gtq_adrain");
  312                         goto restart;
  313                 }
  314         }
  315 
  316         /* Release taskqueue_terminate(). */
  317         queue->tq_callouts--;
  318         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  319                 wakeup_one(queue->tq_threads);
  320 }
  321 
  322 void
  323 gtaskqueue_block(struct gtaskqueue *queue)
  324 {
  325 
  326         TQ_LOCK(queue);
  327         queue->tq_flags |= TQ_FLAGS_BLOCKED;
  328         TQ_UNLOCK(queue);
  329 }
  330 
  331 void
  332 gtaskqueue_unblock(struct gtaskqueue *queue)
  333 {
  334 
  335         TQ_LOCK(queue);
  336         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
  337         if (!STAILQ_EMPTY(&queue->tq_queue))
  338                 queue->tq_enqueue(queue->tq_context);
  339         TQ_UNLOCK(queue);
  340 }
  341 
  342 static void
  343 gtaskqueue_run_locked(struct gtaskqueue *queue)
  344 {
  345         struct epoch_tracker et;
  346         struct gtaskqueue_busy tb;
  347         struct gtask *gtask;
  348         bool in_net_epoch;
  349 
  350         KASSERT(queue != NULL, ("tq is NULL"));
  351         TQ_ASSERT_LOCKED(queue);
  352         tb.tb_running = NULL;
  353         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
  354         in_net_epoch = false;
  355 
  356         while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
  357                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
  358                 gtask->ta_flags &= ~TASK_ENQUEUED;
  359                 tb.tb_running = gtask;
  360                 tb.tb_seq = ++queue->tq_seq;
  361                 TQ_UNLOCK(queue);
  362 
  363                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
  364                 if (!in_net_epoch && TASK_IS_NET(gtask)) {
  365                         in_net_epoch = true;
  366                         NET_EPOCH_ENTER(et);
  367                 } else if (in_net_epoch && !TASK_IS_NET(gtask)) {
  368                         NET_EPOCH_EXIT(et);
  369                         in_net_epoch = false;
  370                 }
  371                 gtask->ta_func(gtask->ta_context);
  372 
  373                 TQ_LOCK(queue);
  374                 wakeup(gtask);
  375         }
  376         if (in_net_epoch)
  377                 NET_EPOCH_EXIT(et);
  378         LIST_REMOVE(&tb, tb_link);
  379 }
  380 
  381 static int
  382 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
  383 {
  384         struct gtaskqueue_busy *tb;
  385 
  386         TQ_ASSERT_LOCKED(queue);
  387         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
  388                 if (tb->tb_running == gtask)
  389                         return (1);
  390         }
  391         return (0);
  392 }
  393 
  394 static int
  395 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
  396 {
  397 
  398         if (gtask->ta_flags & TASK_ENQUEUED)
  399                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
  400         gtask->ta_flags &= ~TASK_ENQUEUED;
  401         return (task_is_running(queue, gtask) ? EBUSY : 0);
  402 }
  403 
  404 int
  405 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
  406 {
  407         int error;
  408 
  409         TQ_LOCK(queue);
  410         error = gtaskqueue_cancel_locked(queue, gtask);
  411         TQ_UNLOCK(queue);
  412 
  413         return (error);
  414 }
  415 
  416 static void
  417 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask)
  418 {
  419         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
  420                 TQ_SLEEP(queue, gtask, "gtq_drain");
  421 }
  422 
  423 void
  424 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
  425 {
  426 
  427         if (!queue->tq_spin)
  428                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  429 
  430         TQ_LOCK(queue);
  431         gtaskqueue_drain_locked(queue, gtask);
  432         TQ_UNLOCK(queue);
  433 }
  434 
  435 void
  436 gtaskqueue_drain_all(struct gtaskqueue *queue)
  437 {
  438 
  439         if (!queue->tq_spin)
  440                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
  441 
  442         TQ_LOCK(queue);
  443         gtaskqueue_drain_tq_queue(queue);
  444         gtaskqueue_drain_tq_active(queue);
  445         TQ_UNLOCK(queue);
  446 }
  447 
  448 static int
  449 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  450     cpuset_t *mask, const char *name, va_list ap)
  451 {
  452         char ktname[MAXCOMLEN + 1];
  453         struct thread *td;
  454         struct gtaskqueue *tq;
  455         int i, error;
  456 
  457         if (count <= 0)
  458                 return (EINVAL);
  459 
  460         vsnprintf(ktname, sizeof(ktname), name, ap);
  461         tq = *tqp;
  462 
  463         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
  464             M_NOWAIT | M_ZERO);
  465         if (tq->tq_threads == NULL) {
  466                 printf("%s: no memory for %s threads\n", __func__, ktname);
  467                 return (ENOMEM);
  468         }
  469 
  470         for (i = 0; i < count; i++) {
  471                 if (count == 1)
  472                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  473                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
  474                 else
  475                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
  476                             &tq->tq_threads[i], RFSTOPPED, 0,
  477                             "%s_%d", ktname, i);
  478                 if (error) {
  479                         /* should be ok to continue, taskqueue_free will dtrt */
  480                         printf("%s: kthread_add(%s): error %d", __func__,
  481                             ktname, error);
  482                         tq->tq_threads[i] = NULL;               /* paranoid */
  483                 } else
  484                         tq->tq_tcount++;
  485         }
  486         for (i = 0; i < count; i++) {
  487                 if (tq->tq_threads[i] == NULL)
  488                         continue;
  489                 td = tq->tq_threads[i];
  490                 if (mask) {
  491                         error = cpuset_setthread(td->td_tid, mask);
  492                         /*
  493                          * Failing to pin is rarely an actual fatal error;
  494                          * it'll just affect performance.
  495                          */
  496                         if (error)
  497                                 printf("%s: curthread=%llu: can't pin; "
  498                                     "error=%d\n",
  499                                     __func__,
  500                                     (unsigned long long) td->td_tid,
  501                                     error);
  502                 }
  503                 thread_lock(td);
  504                 sched_prio(td, pri);
  505                 sched_add(td, SRQ_BORING);
  506         }
  507 
  508         return (0);
  509 }
  510 
  511 static int
  512 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
  513     const char *name, ...)
  514 {
  515         va_list ap;
  516         int error;
  517 
  518         va_start(ap, name);
  519         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
  520         va_end(ap);
  521         return (error);
  522 }
  523 
  524 static inline void
  525 gtaskqueue_run_callback(struct gtaskqueue *tq,
  526     enum taskqueue_callback_type cb_type)
  527 {
  528         taskqueue_callback_fn tq_callback;
  529 
  530         TQ_ASSERT_UNLOCKED(tq);
  531         tq_callback = tq->tq_callbacks[cb_type];
  532         if (tq_callback != NULL)
  533                 tq_callback(tq->tq_cb_contexts[cb_type]);
  534 }
  535 
  536 static void
  537 gtaskqueue_thread_loop(void *arg)
  538 {
  539         struct gtaskqueue **tqp, *tq;
  540 
  541         tqp = arg;
  542         tq = *tqp;
  543         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
  544         TQ_LOCK(tq);
  545         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
  546                 /* XXX ? */
  547                 gtaskqueue_run_locked(tq);
  548                 /*
  549                  * Because taskqueue_run() can drop tq_mutex, we need to
  550                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
  551                  * meantime, which means we missed a wakeup.
  552                  */
  553                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
  554                         break;
  555                 TQ_SLEEP(tq, tq, "-");
  556         }
  557         gtaskqueue_run_locked(tq);
  558         /*
  559          * This thread is on its way out, so just drop the lock temporarily
  560          * in order to call the shutdown callback.  This allows the callback
  561          * to look at the taskqueue, even just before it dies.
  562          */
  563         TQ_UNLOCK(tq);
  564         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
  565         TQ_LOCK(tq);
  566 
  567         /* rendezvous with thread that asked us to terminate */
  568         tq->tq_tcount--;
  569         wakeup_one(tq->tq_threads);
  570         TQ_UNLOCK(tq);
  571         kthread_exit();
  572 }
  573 
  574 static void
  575 gtaskqueue_thread_enqueue(void *context)
  576 {
  577         struct gtaskqueue **tqp, *tq;
  578 
  579         tqp = context;
  580         tq = *tqp;
  581         wakeup_any(tq);
  582 }
  583 
  584 static struct gtaskqueue *
  585 gtaskqueue_create_fast(const char *name, int mflags,
  586                  taskqueue_enqueue_fn enqueue, void *context)
  587 {
  588         return _gtaskqueue_create(name, mflags, enqueue, context,
  589                         MTX_SPIN, "fast_taskqueue");
  590 }
  591 
  592 struct taskqgroup_cpu {
  593         LIST_HEAD(, grouptask) tgc_tasks;
  594         struct gtaskqueue *tgc_taskq;
  595         int             tgc_cnt;
  596         int             tgc_cpu;
  597 };
  598 
  599 struct taskqgroup {
  600         struct taskqgroup_cpu tqg_queue[MAXCPU];
  601         struct mtx      tqg_lock;
  602         const char *    tqg_name;
  603         int             tqg_cnt;
  604 };
  605 
  606 struct taskq_bind_task {
  607         struct gtask bt_task;
  608         int     bt_cpuid;
  609 };
  610 
  611 static void
  612 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
  613 {
  614         struct taskqgroup_cpu *qcpu;
  615 
  616         qcpu = &qgroup->tqg_queue[idx];
  617         LIST_INIT(&qcpu->tgc_tasks);
  618         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
  619             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
  620         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
  621             "%s_%d", qgroup->tqg_name, idx);
  622         qcpu->tgc_cpu = cpu;
  623 }
  624 
  625 /*
  626  * Find the taskq with least # of tasks that doesn't currently have any
  627  * other queues from the uniq identifier.
  628  */
  629 static int
  630 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
  631 {
  632         struct grouptask *n;
  633         int i, idx, mincnt;
  634         int strict;
  635 
  636         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
  637         KASSERT(qgroup->tqg_cnt != 0,
  638             ("qgroup %s has no queues", qgroup->tqg_name));
  639 
  640         /*
  641          * Two passes: first scan for a queue with the least tasks that
  642          * does not already service this uniq id.  If that fails simply find
  643          * the queue with the least total tasks.
  644          */
  645         for (idx = -1, mincnt = INT_MAX, strict = 1; mincnt == INT_MAX;
  646             strict = 0) {
  647                 for (i = 0; i < qgroup->tqg_cnt; i++) {
  648                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
  649                                 continue;
  650                         if (strict) {
  651                                 LIST_FOREACH(n, &qgroup->tqg_queue[i].tgc_tasks,
  652                                     gt_list)
  653                                         if (n->gt_uniq == uniq)
  654                                                 break;
  655                                 if (n != NULL)
  656                                         continue;
  657                         }
  658                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
  659                         idx = i;
  660                 }
  661         }
  662         if (idx == -1)
  663                 panic("%s: failed to pick a qid.", __func__);
  664 
  665         return (idx);
  666 }
  667 
  668 void
  669 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
  670     void *uniq, device_t dev, struct resource *irq, const char *name)
  671 {
  672         int cpu, qid, error;
  673 
  674         KASSERT(qgroup->tqg_cnt > 0,
  675             ("qgroup %s has no queues", qgroup->tqg_name));
  676 
  677         gtask->gt_uniq = uniq;
  678         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  679         gtask->gt_dev = dev;
  680         gtask->gt_irq = irq;
  681         gtask->gt_cpu = -1;
  682         mtx_lock(&qgroup->tqg_lock);
  683         qid = taskqgroup_find(qgroup, uniq);
  684         qgroup->tqg_queue[qid].tgc_cnt++;
  685         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  686         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  687         if (dev != NULL && irq != NULL) {
  688                 cpu = qgroup->tqg_queue[qid].tgc_cpu;
  689                 gtask->gt_cpu = cpu;
  690                 mtx_unlock(&qgroup->tqg_lock);
  691                 error = bus_bind_intr(dev, irq, cpu);
  692                 if (error)
  693                         printf("%s: binding interrupt failed for %s: %d\n",
  694                             __func__, gtask->gt_name, error);
  695         } else
  696                 mtx_unlock(&qgroup->tqg_lock);
  697 }
  698 
  699 int
  700 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
  701     void *uniq, int cpu, device_t dev, struct resource *irq, const char *name)
  702 {
  703         int i, qid, error;
  704 
  705         gtask->gt_uniq = uniq;
  706         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
  707         gtask->gt_dev = dev;
  708         gtask->gt_irq = irq;
  709         gtask->gt_cpu = cpu;
  710         mtx_lock(&qgroup->tqg_lock);
  711         for (i = 0, qid = -1; i < qgroup->tqg_cnt; i++)
  712                 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
  713                         qid = i;
  714                         break;
  715                 }
  716         if (qid == -1) {
  717                 mtx_unlock(&qgroup->tqg_lock);
  718                 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
  719                 return (EINVAL);
  720         }
  721         qgroup->tqg_queue[qid].tgc_cnt++;
  722         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
  723         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
  724         cpu = qgroup->tqg_queue[qid].tgc_cpu;
  725         mtx_unlock(&qgroup->tqg_lock);
  726 
  727         if (dev != NULL && irq != NULL) {
  728                 error = bus_bind_intr(dev, irq, cpu);
  729                 if (error)
  730                         printf("%s: binding interrupt failed for %s: %d\n",
  731                             __func__, gtask->gt_name, error);
  732         }
  733         return (0);
  734 }
  735 
  736 void
  737 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
  738 {
  739         int i;
  740 
  741         grouptask_block(gtask);
  742         mtx_lock(&qgroup->tqg_lock);
  743         for (i = 0; i < qgroup->tqg_cnt; i++)
  744                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
  745                         break;
  746         if (i == qgroup->tqg_cnt)
  747                 panic("%s: task %s not in group", __func__, gtask->gt_name);
  748         qgroup->tqg_queue[i].tgc_cnt--;
  749         LIST_REMOVE(gtask, gt_list);
  750         mtx_unlock(&qgroup->tqg_lock);
  751         gtask->gt_taskqueue = NULL;
  752         gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE;
  753 }
  754 
  755 static void
  756 taskqgroup_binder(void *ctx)
  757 {
  758         struct taskq_bind_task *gtask;
  759         cpuset_t mask;
  760         int error;
  761 
  762         gtask = ctx;
  763         CPU_ZERO(&mask);
  764         CPU_SET(gtask->bt_cpuid, &mask);
  765         error = cpuset_setthread(curthread->td_tid, &mask);
  766         thread_lock(curthread);
  767         sched_bind(curthread, gtask->bt_cpuid);
  768         thread_unlock(curthread);
  769 
  770         if (error)
  771                 printf("%s: binding curthread failed: %d\n", __func__, error);
  772         free(gtask, M_DEVBUF);
  773 }
  774 
  775 void
  776 taskqgroup_bind(struct taskqgroup *qgroup)
  777 {
  778         struct taskq_bind_task *gtask;
  779         int i;
  780 
  781         /*
  782          * Bind taskqueue threads to specific CPUs, if they have been assigned
  783          * one.
  784          */
  785         if (qgroup->tqg_cnt == 1)
  786                 return;
  787 
  788         for (i = 0; i < qgroup->tqg_cnt; i++) {
  789                 gtask = malloc(sizeof(*gtask), M_DEVBUF, M_WAITOK);
  790                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
  791                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
  792                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
  793                     &gtask->bt_task);
  794         }
  795 }
  796 
  797 struct taskqgroup *
  798 taskqgroup_create(const char *name, int cnt, int stride)
  799 {
  800         struct taskqgroup *qgroup;
  801         int cpu, i, j;
  802 
  803         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
  804         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
  805         qgroup->tqg_name = name;
  806         qgroup->tqg_cnt = cnt;
  807 
  808         for (cpu = i = 0; i < cnt; i++) {
  809                 taskqgroup_cpu_create(qgroup, i, cpu);
  810                 for (j = 0; j < stride; j++)
  811                         cpu = CPU_NEXT(cpu);
  812         }
  813         return (qgroup);
  814 }
  815 
  816 void
  817 taskqgroup_destroy(struct taskqgroup *qgroup)
  818 {
  819 }
  820 
  821 void
  822 taskqgroup_drain_all(struct taskqgroup *tqg)
  823 {
  824         struct gtaskqueue *q;
  825 
  826         for (int i = 0; i < mp_ncpus; i++) {
  827                 q = tqg->tqg_queue[i].tgc_taskq;
  828                 if (q == NULL)
  829                         continue;
  830                 gtaskqueue_drain_all(q);
  831         }
  832 }

Cache object: 808aa0907d05719bf7beb41f5bd40ef7


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.