The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/net/mp_ring.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2014 Chelsio Communications, Inc.
    3  * All rights reserved.
    4  * Written by: Navdeep Parhar <np@FreeBSD.org>
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions
    8  * are met:
    9  * 1. Redistributions of source code must retain the above copyright
   10  *    notice, this list of conditions and the following disclaimer.
   11  * 2. Redistributions in binary form must reproduce the above copyright
   12  *    notice, this list of conditions and the following disclaimer in the
   13  *    documentation and/or other materials provided with the distribution.
   14  *
   15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   25  * SUCH DAMAGE.
   26  */
   27 
   28 #include <sys/cdefs.h>
   29 __FBSDID("$FreeBSD$");
   30 
   31 #include <sys/types.h>
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/counter.h>
   35 #include <sys/lock.h>
   36 #include <sys/mutex.h>
   37 #include <sys/malloc.h>
   38 #include <machine/cpu.h>
   39 #include <net/mp_ring.h>
   40 
   41 union ring_state {
   42         struct {
   43                 uint16_t pidx_head;
   44                 uint16_t pidx_tail;
   45                 uint16_t cidx;
   46                 uint16_t flags;
   47         };
   48         uint64_t state;
   49 };
   50 
   51 enum {
   52         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
   53         BUSY,           /* consumer is running already, or will be shortly. */
   54         STALLED,        /* consumer stopped due to lack of resources. */
   55         ABDICATED,      /* consumer stopped even though there was work to be
   56                            done because it wants another thread to take over. */
   57 };
   58 
   59 static inline uint16_t
   60 space_available(struct ifmp_ring *r, union ring_state s)
   61 {
   62         uint16_t x = r->size - 1;
   63 
   64         if (s.cidx == s.pidx_head)
   65                 return (x);
   66         else if (s.cidx > s.pidx_head)
   67                 return (s.cidx - s.pidx_head - 1);
   68         else
   69                 return (x - s.pidx_head + s.cidx);
   70 }
   71 
   72 static inline uint16_t
   73 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
   74 {
   75         int x = r->size - idx;
   76 
   77         MPASS(x > 0);
   78         return (x > n ? idx + n : n - x);
   79 }
   80 
   81 /* Consumer is about to update the ring's state to s */
   82 static inline uint16_t
   83 state_to_flags(union ring_state s, int abdicate)
   84 {
   85 
   86         if (s.cidx == s.pidx_tail)
   87                 return (IDLE);
   88         else if (abdicate && s.pidx_tail != s.pidx_head)
   89                 return (ABDICATED);
   90 
   91         return (BUSY);
   92 }
   93 
   94 #ifdef MP_RING_NO_64BIT_ATOMICS
   95 static void
   96 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
   97 {
   98         union ring_state ns;
   99         int n, pending, total;
  100         uint16_t cidx = os.cidx;
  101         uint16_t pidx = os.pidx_tail;
  102 
  103         MPASS(os.flags == BUSY);
  104         MPASS(cidx != pidx);
  105 
  106         if (prev == IDLE)
  107                 counter_u64_add(r->starts, 1);
  108         pending = 0;
  109         total = 0;
  110 
  111         while (cidx != pidx) {
  112                 /* Items from cidx to pidx are available for consumption. */
  113                 n = r->drain(r, cidx, pidx);
  114                 if (n == 0) {
  115                         os.state = ns.state = r->state;
  116                         ns.cidx = cidx;
  117                         ns.flags = STALLED;
  118                         r->state = ns.state;
  119                         if (prev != STALLED)
  120                                 counter_u64_add(r->stalls, 1);
  121                         else if (total > 0) {
  122                                 counter_u64_add(r->restarts, 1);
  123                                 counter_u64_add(r->stalls, 1);
  124                         }
  125                         break;
  126                 }
  127                 cidx = increment_idx(r, cidx, n);
  128                 pending += n;
  129                 total += n;
  130 
  131                 /*
  132                  * We update the cidx only if we've caught up with the pidx, the
  133                  * real cidx is getting too far ahead of the one visible to
  134                  * everyone else, or we have exceeded our budget.
  135                  */
  136                 if (cidx != pidx && pending < 64 && total < budget)
  137                         continue;
  138 
  139                 os.state = ns.state = r->state;
  140                 ns.cidx = cidx;
  141                 ns.flags = state_to_flags(ns, total >= budget);
  142                 r->state = ns.state;
  143 
  144                 if (ns.flags == ABDICATED)
  145                         counter_u64_add(r->abdications, 1);
  146                 if (ns.flags != BUSY) {
  147                         /* Wrong loop exit if we're going to stall. */
  148                         MPASS(ns.flags != STALLED);
  149                         if (prev == STALLED) {
  150                                 MPASS(total > 0);
  151                                 counter_u64_add(r->restarts, 1);
  152                         }
  153                         break;
  154                 }
  155 
  156                 /*
  157                  * The acquire style atomic above guarantees visibility of items
  158                  * associated with any pidx change that we notice here.
  159                  */
  160                 pidx = ns.pidx_tail;
  161                 pending = 0;
  162         }
  163 }
  164 #else
  165 /*
  166  * Caller passes in a state, with a guarantee that there is work to do and that
  167  * all items up to the pidx_tail in the state are visible.
  168  */
  169 static void
  170 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
  171 {
  172         union ring_state ns;
  173         int n, pending, total;
  174         uint16_t cidx = os.cidx;
  175         uint16_t pidx = os.pidx_tail;
  176 
  177         MPASS(os.flags == BUSY);
  178         MPASS(cidx != pidx);
  179 
  180         if (prev == IDLE)
  181                 counter_u64_add(r->starts, 1);
  182         pending = 0;
  183         total = 0;
  184 
  185         while (cidx != pidx) {
  186                 /* Items from cidx to pidx are available for consumption. */
  187                 n = r->drain(r, cidx, pidx);
  188                 if (n == 0) {
  189                         critical_enter();
  190                         os.state = r->state;
  191                         do {
  192                                 ns.state = os.state;
  193                                 ns.cidx = cidx;
  194                                 ns.flags = STALLED;
  195                         } while (atomic_fcmpset_64(&r->state, &os.state,
  196                             ns.state) == 0);
  197                         critical_exit();
  198                         if (prev != STALLED)
  199                                 counter_u64_add(r->stalls, 1);
  200                         else if (total > 0) {
  201                                 counter_u64_add(r->restarts, 1);
  202                                 counter_u64_add(r->stalls, 1);
  203                         }
  204                         break;
  205                 }
  206                 cidx = increment_idx(r, cidx, n);
  207                 pending += n;
  208                 total += n;
  209 
  210                 /*
  211                  * We update the cidx only if we've caught up with the pidx, the
  212                  * real cidx is getting too far ahead of the one visible to
  213                  * everyone else, or we have exceeded our budget.
  214                  */
  215                 if (cidx != pidx && pending < 64 && total < budget)
  216                         continue;
  217                 critical_enter();
  218                 os.state = r->state;
  219                 do {
  220                         ns.state = os.state;
  221                         ns.cidx = cidx;
  222                         ns.flags = state_to_flags(ns, total >= budget);
  223                 } while (atomic_fcmpset_acq_64(&r->state, &os.state,
  224                     ns.state) == 0);
  225                 critical_exit();
  226 
  227                 if (ns.flags == ABDICATED)
  228                         counter_u64_add(r->abdications, 1);
  229                 if (ns.flags != BUSY) {
  230                         /* Wrong loop exit if we're going to stall. */
  231                         MPASS(ns.flags != STALLED);
  232                         if (prev == STALLED) {
  233                                 MPASS(total > 0);
  234                                 counter_u64_add(r->restarts, 1);
  235                         }
  236                         break;
  237                 }
  238 
  239                 /*
  240                  * The acquire style atomic above guarantees visibility of items
  241                  * associated with any pidx change that we notice here.
  242                  */
  243                 pidx = ns.pidx_tail;
  244                 pending = 0;
  245         }
  246 }
  247 #endif
  248 
  249 int
  250 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
  251     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
  252 {
  253         struct ifmp_ring *r;
  254 
  255         /* All idx are 16b so size can be 65536 at most */
  256         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
  257             can_drain == NULL)
  258                 return (EINVAL);
  259         *pr = NULL;
  260         flags &= M_NOWAIT | M_WAITOK;
  261         MPASS(flags != 0);
  262 
  263         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
  264         if (r == NULL)
  265                 return (ENOMEM);
  266         r->size = size;
  267         r->cookie = cookie;
  268         r->mt = mt;
  269         r->drain = drain;
  270         r->can_drain = can_drain;
  271         r->enqueues = counter_u64_alloc(flags);
  272         r->drops = counter_u64_alloc(flags);
  273         r->starts = counter_u64_alloc(flags);
  274         r->stalls = counter_u64_alloc(flags);
  275         r->restarts = counter_u64_alloc(flags);
  276         r->abdications = counter_u64_alloc(flags);
  277         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
  278             r->stalls == NULL || r->restarts == NULL ||
  279             r->abdications == NULL) {
  280                 ifmp_ring_free(r);
  281                 return (ENOMEM);
  282         }
  283 
  284         *pr = r;
  285 #ifdef MP_RING_NO_64BIT_ATOMICS
  286         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
  287 #endif
  288         return (0);
  289 }
  290 
  291 void
  292 ifmp_ring_free(struct ifmp_ring *r)
  293 {
  294 
  295         if (r == NULL)
  296                 return;
  297 
  298         if (r->enqueues != NULL)
  299                 counter_u64_free(r->enqueues);
  300         if (r->drops != NULL)
  301                 counter_u64_free(r->drops);
  302         if (r->starts != NULL)
  303                 counter_u64_free(r->starts);
  304         if (r->stalls != NULL)
  305                 counter_u64_free(r->stalls);
  306         if (r->restarts != NULL)
  307                 counter_u64_free(r->restarts);
  308         if (r->abdications != NULL)
  309                 counter_u64_free(r->abdications);
  310 
  311         free(r, r->mt);
  312 }
  313 
  314 /*
  315  * Enqueue n items and maybe drain the ring for some time.
  316  *
  317  * Returns an errno.
  318  */
  319 #ifdef MP_RING_NO_64BIT_ATOMICS
  320 int
  321 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
  322 {
  323         union ring_state os, ns;
  324         uint16_t pidx_start, pidx_stop;
  325         int i;
  326 
  327         MPASS(items != NULL);
  328         MPASS(n > 0);
  329 
  330         mtx_lock(&r->lock);
  331         /*
  332          * Reserve room for the new items.  Our reservation, if successful, is
  333          * from 'pidx_start' to 'pidx_stop'.
  334          */
  335         os.state = r->state;
  336         if (n >= space_available(r, os)) {
  337                 counter_u64_add(r->drops, n);
  338                 MPASS(os.flags != IDLE);
  339                 mtx_unlock(&r->lock);
  340                 if (os.flags == STALLED)
  341                         ifmp_ring_check_drainage(r, 0);
  342                 return (ENOBUFS);
  343         }
  344         ns.state = os.state;
  345         ns.pidx_head = increment_idx(r, os.pidx_head, n);
  346         r->state = ns.state;
  347         pidx_start = os.pidx_head;
  348         pidx_stop = ns.pidx_head;
  349 
  350         /*
  351          * Wait for other producers who got in ahead of us to enqueue their
  352          * items, one producer at a time.  It is our turn when the ring's
  353          * pidx_tail reaches the beginning of our reservation (pidx_start).
  354          */
  355         while (ns.pidx_tail != pidx_start) {
  356                 cpu_spinwait();
  357                 ns.state = r->state;
  358         }
  359 
  360         /* Now it is our turn to fill up the area we reserved earlier. */
  361         i = pidx_start;
  362         do {
  363                 r->items[i] = *items++;
  364                 if (__predict_false(++i == r->size))
  365                         i = 0;
  366         } while (i != pidx_stop);
  367 
  368         /*
  369          * Update the ring's pidx_tail.  The release style atomic guarantees
  370          * that the items are visible to any thread that sees the updated pidx.
  371          */
  372         os.state = ns.state = r->state;
  373         ns.pidx_tail = pidx_stop;
  374         if (abdicate) {
  375                 if (os.flags == IDLE)
  376                         ns.flags = ABDICATED;
  377         } else
  378                 ns.flags = BUSY;
  379         r->state = ns.state;
  380         counter_u64_add(r->enqueues, n);
  381 
  382         if (!abdicate) {
  383                 /*
  384                  * Turn into a consumer if some other thread isn't active as a consumer
  385                  * already.
  386                  */
  387                 if (os.flags != BUSY)
  388                         drain_ring_locked(r, ns, os.flags, budget);
  389         }
  390 
  391         mtx_unlock(&r->lock);
  392         return (0);
  393 }
  394 #else
  395 int
  396 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
  397 {
  398         union ring_state os, ns;
  399         uint16_t pidx_start, pidx_stop;
  400         int i;
  401 
  402         MPASS(items != NULL);
  403         MPASS(n > 0);
  404 
  405         /*
  406          * Reserve room for the new items.  Our reservation, if successful, is
  407          * from 'pidx_start' to 'pidx_stop'.
  408          */
  409         os.state = r->state;
  410         for (;;) {
  411                 if (n >= space_available(r, os)) {
  412                         counter_u64_add(r->drops, n);
  413                         MPASS(os.flags != IDLE);
  414                         if (os.flags == STALLED)
  415                                 ifmp_ring_check_drainage(r, 0);
  416                         return (ENOBUFS);
  417                 }
  418                 ns.state = os.state;
  419                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
  420                 critical_enter();
  421                 if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
  422                         break;
  423                 critical_exit();
  424                 cpu_spinwait();
  425         }
  426         pidx_start = os.pidx_head;
  427         pidx_stop = ns.pidx_head;
  428 
  429         /*
  430          * Wait for other producers who got in ahead of us to enqueue their
  431          * items, one producer at a time.  It is our turn when the ring's
  432          * pidx_tail reaches the beginning of our reservation (pidx_start).
  433          */
  434         while (ns.pidx_tail != pidx_start) {
  435                 cpu_spinwait();
  436                 ns.state = r->state;
  437         }
  438 
  439         /* Now it is our turn to fill up the area we reserved earlier. */
  440         i = pidx_start;
  441         do {
  442                 r->items[i] = *items++;
  443                 if (__predict_false(++i == r->size))
  444                         i = 0;
  445         } while (i != pidx_stop);
  446 
  447         /*
  448          * Update the ring's pidx_tail.  The release style atomic guarantees
  449          * that the items are visible to any thread that sees the updated pidx.
  450          */
  451         os.state = r->state;
  452         do {
  453                 ns.state = os.state;
  454                 ns.pidx_tail = pidx_stop;
  455                 if (abdicate) {
  456                         if (os.flags == IDLE)
  457                                 ns.flags = ABDICATED;
  458                 } else
  459                         ns.flags = BUSY;
  460         } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
  461         critical_exit();
  462         counter_u64_add(r->enqueues, n);
  463 
  464         if (!abdicate) {
  465                 /*
  466                  * Turn into a consumer if some other thread isn't active as a consumer
  467                  * already.
  468                  */
  469                 if (os.flags != BUSY)
  470                         drain_ring_lockless(r, ns, os.flags, budget);
  471         }
  472 
  473         return (0);
  474 }
  475 #endif
  476 
  477 void
  478 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
  479 {
  480         union ring_state os, ns;
  481 
  482         os.state = r->state;
  483         if ((os.flags != STALLED && os.flags != ABDICATED) ||   // Only continue in STALLED and ABDICATED
  484             os.pidx_head != os.pidx_tail ||                     // Require work to be available
  485             (os.flags != ABDICATED && r->can_drain(r) == 0))    // Can either drain, or everyone left
  486                 return;
  487 
  488         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
  489         ns.state = os.state;
  490         ns.flags = BUSY;
  491 
  492 #ifdef MP_RING_NO_64BIT_ATOMICS
  493         mtx_lock(&r->lock);
  494         if (r->state != os.state) {
  495                 mtx_unlock(&r->lock);
  496                 return;
  497         }
  498         r->state = ns.state;
  499         drain_ring_locked(r, ns, os.flags, budget);
  500         mtx_unlock(&r->lock);
  501 #else
  502         /*
  503          * The acquire style atomic guarantees visibility of items associated
  504          * with the pidx that we read here.
  505          */
  506         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
  507                 return;
  508 
  509         drain_ring_lockless(r, ns, os.flags, budget);
  510 #endif
  511 }
  512 
  513 void
  514 ifmp_ring_reset_stats(struct ifmp_ring *r)
  515 {
  516 
  517         counter_u64_zero(r->enqueues);
  518         counter_u64_zero(r->drops);
  519         counter_u64_zero(r->starts);
  520         counter_u64_zero(r->stalls);
  521         counter_u64_zero(r->restarts);
  522         counter_u64_zero(r->abdications);
  523 }
  524 
  525 int
  526 ifmp_ring_is_idle(struct ifmp_ring *r)
  527 {
  528         union ring_state s;
  529 
  530         s.state = r->state;
  531         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
  532             s.flags == IDLE)
  533                 return (1);
  534 
  535         return (0);
  536 }
  537 
  538 int
  539 ifmp_ring_is_stalled(struct ifmp_ring *r)
  540 {
  541         union ring_state s;
  542 
  543         s.state = r->state;
  544         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
  545                 return (1);
  546 
  547         return (0);
  548 }

Cache object: 042c43172acb5aa67fb55bf2edcac19c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.