The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/net/mp_ring.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2014 Chelsio Communications, Inc.
    3  * All rights reserved.
    4  * Written by: Navdeep Parhar <np@FreeBSD.org>
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions
    8  * are met:
    9  * 1. Redistributions of source code must retain the above copyright
   10  *    notice, this list of conditions and the following disclaimer.
   11  * 2. Redistributions in binary form must reproduce the above copyright
   12  *    notice, this list of conditions and the following disclaimer in the
   13  *    documentation and/or other materials provided with the distribution.
   14  *
   15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   25  * SUCH DAMAGE.
   26  */
   27 
   28 #include <sys/cdefs.h>
   29 __FBSDID("$FreeBSD$");
   30 
   31 #include <sys/types.h>
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/counter.h>
   35 #include <sys/lock.h>
   36 #include <sys/mutex.h>
   37 #include <sys/malloc.h>
   38 #include <machine/cpu.h>
   39 #include <net/mp_ring.h>
   40 
   41 union ring_state {
   42         struct {
   43                 uint16_t pidx_head;
   44                 uint16_t pidx_tail;
   45                 uint16_t cidx;
   46                 uint16_t flags;
   47         };
   48         uint64_t state;
   49 };
   50 
   51 enum {
   52         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
   53         BUSY,           /* consumer is running already, or will be shortly. */
   54         STALLED,        /* consumer stopped due to lack of resources. */
   55         ABDICATED,      /* consumer stopped even though there was work to be
   56                            done because it wants another thread to take over. */
   57 };
   58 
   59 static inline uint16_t
   60 space_available(struct ifmp_ring *r, union ring_state s)
   61 {
   62         uint16_t x = r->size - 1;
   63 
   64         if (s.cidx == s.pidx_head)
   65                 return (x);
   66         else if (s.cidx > s.pidx_head)
   67                 return (s.cidx - s.pidx_head - 1);
   68         else
   69                 return (x - s.pidx_head + s.cidx);
   70 }
   71 
   72 static inline uint16_t
   73 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
   74 {
   75         int x = r->size - idx;
   76 
   77         MPASS(x > 0);
   78         return (x > n ? idx + n : n - x);
   79 }
   80 
   81 /* Consumer is about to update the ring's state to s */
   82 static inline uint16_t
   83 state_to_flags(union ring_state s, int abdicate)
   84 {
   85 
   86         if (s.cidx == s.pidx_tail)
   87                 return (IDLE);
   88         else if (abdicate && s.pidx_tail != s.pidx_head)
   89                 return (ABDICATED);
   90 
   91         return (BUSY);
   92 }
   93 
   94 #ifdef MP_RING_NO_64BIT_ATOMICS
   95 static void
   96 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
   97 {
   98         union ring_state ns;
   99         int n, pending, total;
  100         uint16_t cidx = os.cidx;
  101         uint16_t pidx = os.pidx_tail;
  102 
  103         MPASS(os.flags == BUSY);
  104         MPASS(cidx != pidx);
  105 
  106         if (prev == IDLE)
  107                 counter_u64_add(r->starts, 1);
  108         pending = 0;
  109         total = 0;
  110 
  111         while (cidx != pidx) {
  112 
  113                 /* Items from cidx to pidx are available for consumption. */
  114                 n = r->drain(r, cidx, pidx);
  115                 if (n == 0) {
  116                         os.state = ns.state = r->state;
  117                         ns.cidx = cidx;
  118                         ns.flags = STALLED;
  119                         r->state = ns.state;
  120                         if (prev != STALLED)
  121                                 counter_u64_add(r->stalls, 1);
  122                         else if (total > 0) {
  123                                 counter_u64_add(r->restarts, 1);
  124                                 counter_u64_add(r->stalls, 1);
  125                         }
  126                         break;
  127                 }
  128                 cidx = increment_idx(r, cidx, n);
  129                 pending += n;
  130                 total += n;
  131 
  132                 /*
  133                  * We update the cidx only if we've caught up with the pidx, the
  134                  * real cidx is getting too far ahead of the one visible to
  135                  * everyone else, or we have exceeded our budget.
  136                  */
  137                 if (cidx != pidx && pending < 64 && total < budget)
  138                         continue;
  139 
  140                 os.state = ns.state = r->state;
  141                 ns.cidx = cidx;
  142                 ns.flags = state_to_flags(ns, total >= budget);
  143                 r->state = ns.state;
  144 
  145                 if (ns.flags == ABDICATED)
  146                         counter_u64_add(r->abdications, 1);
  147                 if (ns.flags != BUSY) {
  148                         /* Wrong loop exit if we're going to stall. */
  149                         MPASS(ns.flags != STALLED);
  150                         if (prev == STALLED) {
  151                                 MPASS(total > 0);
  152                                 counter_u64_add(r->restarts, 1);
  153                         }
  154                         break;
  155                 }
  156 
  157                 /*
  158                  * The acquire style atomic above guarantees visibility of items
  159                  * associated with any pidx change that we notice here.
  160                  */
  161                 pidx = ns.pidx_tail;
  162                 pending = 0;
  163         }
  164 }
  165 #else
  166 /*
  167  * Caller passes in a state, with a guarantee that there is work to do and that
  168  * all items up to the pidx_tail in the state are visible.
  169  */
  170 static void
  171 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
  172 {
  173         union ring_state ns;
  174         int n, pending, total;
  175         uint16_t cidx = os.cidx;
  176         uint16_t pidx = os.pidx_tail;
  177 
  178         MPASS(os.flags == BUSY);
  179         MPASS(cidx != pidx);
  180 
  181         if (prev == IDLE)
  182                 counter_u64_add(r->starts, 1);
  183         pending = 0;
  184         total = 0;
  185 
  186         while (cidx != pidx) {
  187 
  188                 /* Items from cidx to pidx are available for consumption. */
  189                 n = r->drain(r, cidx, pidx);
  190                 if (n == 0) {
  191                         critical_enter();
  192                         os.state = r->state;
  193                         do {
  194                                 ns.state = os.state;
  195                                 ns.cidx = cidx;
  196                                 ns.flags = STALLED;
  197                         } while (atomic_fcmpset_64(&r->state, &os.state,
  198                             ns.state) == 0);
  199                         critical_exit();
  200                         if (prev != STALLED)
  201                                 counter_u64_add(r->stalls, 1);
  202                         else if (total > 0) {
  203                                 counter_u64_add(r->restarts, 1);
  204                                 counter_u64_add(r->stalls, 1);
  205                         }
  206                         break;
  207                 }
  208                 cidx = increment_idx(r, cidx, n);
  209                 pending += n;
  210                 total += n;
  211 
  212                 /*
  213                  * We update the cidx only if we've caught up with the pidx, the
  214                  * real cidx is getting too far ahead of the one visible to
  215                  * everyone else, or we have exceeded our budget.
  216                  */
  217                 if (cidx != pidx && pending < 64 && total < budget)
  218                         continue;
  219                 critical_enter();
  220                 os.state = r->state;
  221                 do {
  222                         ns.state = os.state;
  223                         ns.cidx = cidx;
  224                         ns.flags = state_to_flags(ns, total >= budget);
  225                 } while (atomic_fcmpset_acq_64(&r->state, &os.state,
  226                     ns.state) == 0);
  227                 critical_exit();
  228 
  229                 if (ns.flags == ABDICATED)
  230                         counter_u64_add(r->abdications, 1);
  231                 if (ns.flags != BUSY) {
  232                         /* Wrong loop exit if we're going to stall. */
  233                         MPASS(ns.flags != STALLED);
  234                         if (prev == STALLED) {
  235                                 MPASS(total > 0);
  236                                 counter_u64_add(r->restarts, 1);
  237                         }
  238                         break;
  239                 }
  240 
  241                 /*
  242                  * The acquire style atomic above guarantees visibility of items
  243                  * associated with any pidx change that we notice here.
  244                  */
  245                 pidx = ns.pidx_tail;
  246                 pending = 0;
  247         }
  248 }
  249 #endif
  250 
  251 int
  252 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
  253     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
  254 {
  255         struct ifmp_ring *r;
  256 
  257         /* All idx are 16b so size can be 65536 at most */
  258         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
  259             can_drain == NULL)
  260                 return (EINVAL);
  261         *pr = NULL;
  262         flags &= M_NOWAIT | M_WAITOK;
  263         MPASS(flags != 0);
  264 
  265         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
  266         if (r == NULL)
  267                 return (ENOMEM);
  268         r->size = size;
  269         r->cookie = cookie;
  270         r->mt = mt;
  271         r->drain = drain;
  272         r->can_drain = can_drain;
  273         r->enqueues = counter_u64_alloc(flags);
  274         r->drops = counter_u64_alloc(flags);
  275         r->starts = counter_u64_alloc(flags);
  276         r->stalls = counter_u64_alloc(flags);
  277         r->restarts = counter_u64_alloc(flags);
  278         r->abdications = counter_u64_alloc(flags);
  279         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
  280             r->stalls == NULL || r->restarts == NULL ||
  281             r->abdications == NULL) {
  282                 ifmp_ring_free(r);
  283                 return (ENOMEM);
  284         }
  285 
  286         *pr = r;
  287 #ifdef MP_RING_NO_64BIT_ATOMICS
  288         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
  289 #endif
  290         return (0);
  291 }
  292 
  293 void
  294 ifmp_ring_free(struct ifmp_ring *r)
  295 {
  296 
  297         if (r == NULL)
  298                 return;
  299 
  300         if (r->enqueues != NULL)
  301                 counter_u64_free(r->enqueues);
  302         if (r->drops != NULL)
  303                 counter_u64_free(r->drops);
  304         if (r->starts != NULL)
  305                 counter_u64_free(r->starts);
  306         if (r->stalls != NULL)
  307                 counter_u64_free(r->stalls);
  308         if (r->restarts != NULL)
  309                 counter_u64_free(r->restarts);
  310         if (r->abdications != NULL)
  311                 counter_u64_free(r->abdications);
  312 
  313         free(r, r->mt);
  314 }
  315 
  316 /*
  317  * Enqueue n items and maybe drain the ring for some time.
  318  *
  319  * Returns an errno.
  320  */
  321 #ifdef MP_RING_NO_64BIT_ATOMICS
  322 int
  323 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
  324 {
  325         union ring_state os, ns;
  326         uint16_t pidx_start, pidx_stop;
  327         int i;
  328 
  329         MPASS(items != NULL);
  330         MPASS(n > 0);
  331 
  332         mtx_lock(&r->lock);
  333         /*
  334          * Reserve room for the new items.  Our reservation, if successful, is
  335          * from 'pidx_start' to 'pidx_stop'.
  336          */
  337         os.state = r->state;
  338         if (n >= space_available(r, os)) {
  339                 counter_u64_add(r->drops, n);
  340                 MPASS(os.flags != IDLE);
  341                 mtx_unlock(&r->lock);
  342                 if (os.flags == STALLED)
  343                         ifmp_ring_check_drainage(r, 0);
  344                 return (ENOBUFS);
  345         }
  346         ns.state = os.state;
  347         ns.pidx_head = increment_idx(r, os.pidx_head, n);
  348         r->state = ns.state;
  349         pidx_start = os.pidx_head;
  350         pidx_stop = ns.pidx_head;
  351 
  352         /*
  353          * Wait for other producers who got in ahead of us to enqueue their
  354          * items, one producer at a time.  It is our turn when the ring's
  355          * pidx_tail reaches the beginning of our reservation (pidx_start).
  356          */
  357         while (ns.pidx_tail != pidx_start) {
  358                 cpu_spinwait();
  359                 ns.state = r->state;
  360         }
  361 
  362         /* Now it is our turn to fill up the area we reserved earlier. */
  363         i = pidx_start;
  364         do {
  365                 r->items[i] = *items++;
  366                 if (__predict_false(++i == r->size))
  367                         i = 0;
  368         } while (i != pidx_stop);
  369 
  370         /*
  371          * Update the ring's pidx_tail.  The release style atomic guarantees
  372          * that the items are visible to any thread that sees the updated pidx.
  373          */
  374         os.state = ns.state = r->state;
  375         ns.pidx_tail = pidx_stop;
  376         if (abdicate) {
  377                 if (os.flags == IDLE)
  378                         ns.flags = ABDICATED;
  379         } else
  380                 ns.flags = BUSY;
  381         r->state = ns.state;
  382         counter_u64_add(r->enqueues, n);
  383 
  384         if (!abdicate) {
  385                 /*
  386                  * Turn into a consumer if some other thread isn't active as a consumer
  387                  * already.
  388                  */
  389                 if (os.flags != BUSY)
  390                         drain_ring_locked(r, ns, os.flags, budget);
  391         }
  392 
  393         mtx_unlock(&r->lock);
  394         return (0);
  395 }
  396 #else
  397 int
  398 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
  399 {
  400         union ring_state os, ns;
  401         uint16_t pidx_start, pidx_stop;
  402         int i;
  403 
  404         MPASS(items != NULL);
  405         MPASS(n > 0);
  406 
  407         /*
  408          * Reserve room for the new items.  Our reservation, if successful, is
  409          * from 'pidx_start' to 'pidx_stop'.
  410          */
  411         os.state = r->state;
  412         for (;;) {
  413                 if (n >= space_available(r, os)) {
  414                         counter_u64_add(r->drops, n);
  415                         MPASS(os.flags != IDLE);
  416                         if (os.flags == STALLED)
  417                                 ifmp_ring_check_drainage(r, 0);
  418                         return (ENOBUFS);
  419                 }
  420                 ns.state = os.state;
  421                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
  422                 critical_enter();
  423                 if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
  424                         break;
  425                 critical_exit();
  426                 cpu_spinwait();
  427         }
  428         pidx_start = os.pidx_head;
  429         pidx_stop = ns.pidx_head;
  430 
  431         /*
  432          * Wait for other producers who got in ahead of us to enqueue their
  433          * items, one producer at a time.  It is our turn when the ring's
  434          * pidx_tail reaches the beginning of our reservation (pidx_start).
  435          */
  436         while (ns.pidx_tail != pidx_start) {
  437                 cpu_spinwait();
  438                 ns.state = r->state;
  439         }
  440 
  441         /* Now it is our turn to fill up the area we reserved earlier. */
  442         i = pidx_start;
  443         do {
  444                 r->items[i] = *items++;
  445                 if (__predict_false(++i == r->size))
  446                         i = 0;
  447         } while (i != pidx_stop);
  448 
  449         /*
  450          * Update the ring's pidx_tail.  The release style atomic guarantees
  451          * that the items are visible to any thread that sees the updated pidx.
  452          */
  453         os.state = r->state;
  454         do {
  455                 ns.state = os.state;
  456                 ns.pidx_tail = pidx_stop;
  457                 if (abdicate) {
  458                         if (os.flags == IDLE)
  459                                 ns.flags = ABDICATED;
  460                 } else
  461                         ns.flags = BUSY;
  462         } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
  463         critical_exit();
  464         counter_u64_add(r->enqueues, n);
  465 
  466         if (!abdicate) {
  467                 /*
  468                  * Turn into a consumer if some other thread isn't active as a consumer
  469                  * already.
  470                  */
  471                 if (os.flags != BUSY)
  472                         drain_ring_lockless(r, ns, os.flags, budget);
  473         }
  474 
  475         return (0);
  476 }
  477 #endif
  478 
  479 void
  480 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
  481 {
  482         union ring_state os, ns;
  483 
  484         os.state = r->state;
  485         if ((os.flags != STALLED && os.flags != ABDICATED) ||   // Only continue in STALLED and ABDICATED
  486             os.pidx_head != os.pidx_tail ||                     // Require work to be available
  487             (os.flags != ABDICATED && r->can_drain(r) == 0))    // Can either drain, or everyone left
  488                 return;
  489 
  490         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
  491         ns.state = os.state;
  492         ns.flags = BUSY;
  493 
  494 
  495 #ifdef MP_RING_NO_64BIT_ATOMICS
  496         mtx_lock(&r->lock);
  497         if (r->state != os.state) {
  498                 mtx_unlock(&r->lock);
  499                 return;
  500         }
  501         r->state = ns.state;
  502         drain_ring_locked(r, ns, os.flags, budget);
  503         mtx_unlock(&r->lock);
  504 #else
  505         /*
  506          * The acquire style atomic guarantees visibility of items associated
  507          * with the pidx that we read here.
  508          */
  509         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
  510                 return;
  511 
  512 
  513         drain_ring_lockless(r, ns, os.flags, budget);
  514 #endif
  515 }
  516 
  517 void
  518 ifmp_ring_reset_stats(struct ifmp_ring *r)
  519 {
  520 
  521         counter_u64_zero(r->enqueues);
  522         counter_u64_zero(r->drops);
  523         counter_u64_zero(r->starts);
  524         counter_u64_zero(r->stalls);
  525         counter_u64_zero(r->restarts);
  526         counter_u64_zero(r->abdications);
  527 }
  528 
  529 int
  530 ifmp_ring_is_idle(struct ifmp_ring *r)
  531 {
  532         union ring_state s;
  533 
  534         s.state = r->state;
  535         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
  536             s.flags == IDLE)
  537                 return (1);
  538 
  539         return (0);
  540 }
  541 
  542 int
  543 ifmp_ring_is_stalled(struct ifmp_ring *r)
  544 {
  545         union ring_state s;
  546 
  547         s.state = r->state;
  548         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
  549                 return (1);
  550 
  551         return (0);
  552 }

Cache object: a9ca5fc1d437a6dd9b92900fb18474ab


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.