The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/net/mp_ring.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2014 Chelsio Communications, Inc.
    3  * All rights reserved.
    4  * Written by: Navdeep Parhar <np@FreeBSD.org>
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions
    8  * are met:
    9  * 1. Redistributions of source code must retain the above copyright
   10  *    notice, this list of conditions and the following disclaimer.
   11  * 2. Redistributions in binary form must reproduce the above copyright
   12  *    notice, this list of conditions and the following disclaimer in the
   13  *    documentation and/or other materials provided with the distribution.
   14  *
   15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   25  * SUCH DAMAGE.
   26  */
   27 
   28 #include <sys/cdefs.h>
   29 __FBSDID("$FreeBSD: releng/11.1/sys/net/mp_ring.c 300215 2016-05-19 16:28:05Z pfg $");
   30 
   31 #include <sys/types.h>
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/counter.h>
   35 #include <sys/lock.h>
   36 #include <sys/mutex.h>
   37 #include <sys/malloc.h>
   38 #include <machine/cpu.h>
   39 
   40 #if defined(__powerpc__) || defined(__mips__)
   41 #define NO_64BIT_ATOMICS
   42 #endif
   43 
   44 #if defined(__i386__)
   45 #define atomic_cmpset_acq_64 atomic_cmpset_64
   46 #define atomic_cmpset_rel_64 atomic_cmpset_64
   47 #endif
   48 
   49 #include <net/mp_ring.h>
   50 
   51 union ring_state {
   52         struct {
   53                 uint16_t pidx_head;
   54                 uint16_t pidx_tail;
   55                 uint16_t cidx;
   56                 uint16_t flags;
   57         };
   58         uint64_t state;
   59 };
   60 
   61 enum {
   62         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
   63         BUSY,           /* consumer is running already, or will be shortly. */
   64         STALLED,        /* consumer stopped due to lack of resources. */
   65         ABDICATED,      /* consumer stopped even though there was work to be
   66                            done because it wants another thread to take over. */
   67 };
   68 
   69 static inline uint16_t
   70 space_available(struct ifmp_ring *r, union ring_state s)
   71 {
   72         uint16_t x = r->size - 1;
   73 
   74         if (s.cidx == s.pidx_head)
   75                 return (x);
   76         else if (s.cidx > s.pidx_head)
   77                 return (s.cidx - s.pidx_head - 1);
   78         else
   79                 return (x - s.pidx_head + s.cidx);
   80 }
   81 
   82 static inline uint16_t
   83 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
   84 {
   85         int x = r->size - idx;
   86 
   87         MPASS(x > 0);
   88         return (x > n ? idx + n : n - x);
   89 }
   90 
   91 /* Consumer is about to update the ring's state to s */
   92 static inline uint16_t
   93 state_to_flags(union ring_state s, int abdicate)
   94 {
   95 
   96         if (s.cidx == s.pidx_tail)
   97                 return (IDLE);
   98         else if (abdicate && s.pidx_tail != s.pidx_head)
   99                 return (ABDICATED);
  100 
  101         return (BUSY);
  102 }
  103 
  104 #ifdef NO_64BIT_ATOMICS
  105 static void
  106 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
  107 {
  108         union ring_state ns;
  109         int n, pending, total;
  110         uint16_t cidx = os.cidx;
  111         uint16_t pidx = os.pidx_tail;
  112 
  113         MPASS(os.flags == BUSY);
  114         MPASS(cidx != pidx);
  115 
  116         if (prev == IDLE)
  117                 counter_u64_add(r->starts, 1);
  118         pending = 0;
  119         total = 0;
  120 
  121         while (cidx != pidx) {
  122 
  123                 /* Items from cidx to pidx are available for consumption. */
  124                 n = r->drain(r, cidx, pidx);
  125                 if (n == 0) {
  126                         os.state = ns.state = r->state;
  127                         ns.cidx = cidx;
  128                         ns.flags = STALLED;
  129                         r->state = ns.state;
  130                         if (prev != STALLED)
  131                                 counter_u64_add(r->stalls, 1);
  132                         else if (total > 0) {
  133                                 counter_u64_add(r->restarts, 1);
  134                                 counter_u64_add(r->stalls, 1);
  135                         }
  136                         break;
  137                 }
  138                 cidx = increment_idx(r, cidx, n);
  139                 pending += n;
  140                 total += n;
  141 
  142                 /*
  143                  * We update the cidx only if we've caught up with the pidx, the
  144                  * real cidx is getting too far ahead of the one visible to
  145                  * everyone else, or we have exceeded our budget.
  146                  */
  147                 if (cidx != pidx && pending < 64 && total < budget)
  148                         continue;
  149 
  150                 os.state = ns.state = r->state;
  151                 ns.cidx = cidx;
  152                 ns.flags = state_to_flags(ns, total >= budget);
  153                 r->state = ns.state;
  154 
  155                 if (ns.flags == ABDICATED)
  156                         counter_u64_add(r->abdications, 1);
  157                 if (ns.flags != BUSY) {
  158                         /* Wrong loop exit if we're going to stall. */
  159                         MPASS(ns.flags != STALLED);
  160                         if (prev == STALLED) {
  161                                 MPASS(total > 0);
  162                                 counter_u64_add(r->restarts, 1);
  163                         }
  164                         break;
  165                 }
  166 
  167                 /*
  168                  * The acquire style atomic above guarantees visibility of items
  169                  * associated with any pidx change that we notice here.
  170                  */
  171                 pidx = ns.pidx_tail;
  172                 pending = 0;
  173         }
  174 }
  175 #else
  176 /*
  177  * Caller passes in a state, with a guarantee that there is work to do and that
  178  * all items up to the pidx_tail in the state are visible.
  179  */
  180 static void
  181 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
  182 {
  183         union ring_state ns;
  184         int n, pending, total;
  185         uint16_t cidx = os.cidx;
  186         uint16_t pidx = os.pidx_tail;
  187 
  188         MPASS(os.flags == BUSY);
  189         MPASS(cidx != pidx);
  190 
  191         if (prev == IDLE)
  192                 counter_u64_add(r->starts, 1);
  193         pending = 0;
  194         total = 0;
  195 
  196         while (cidx != pidx) {
  197 
  198                 /* Items from cidx to pidx are available for consumption. */
  199                 n = r->drain(r, cidx, pidx);
  200                 if (n == 0) {
  201                         critical_enter();
  202                         do {
  203                                 os.state = ns.state = r->state;
  204                                 ns.cidx = cidx;
  205                                 ns.flags = STALLED;
  206                         } while (atomic_cmpset_64(&r->state, os.state,
  207                             ns.state) == 0);
  208                         critical_exit();
  209                         if (prev != STALLED)
  210                                 counter_u64_add(r->stalls, 1);
  211                         else if (total > 0) {
  212                                 counter_u64_add(r->restarts, 1);
  213                                 counter_u64_add(r->stalls, 1);
  214                         }
  215                         break;
  216                 }
  217                 cidx = increment_idx(r, cidx, n);
  218                 pending += n;
  219                 total += n;
  220 
  221                 /*
  222                  * We update the cidx only if we've caught up with the pidx, the
  223                  * real cidx is getting too far ahead of the one visible to
  224                  * everyone else, or we have exceeded our budget.
  225                  */
  226                 if (cidx != pidx && pending < 64 && total < budget)
  227                         continue;
  228                 critical_enter();
  229                 do {
  230                         os.state = ns.state = r->state;
  231                         ns.cidx = cidx;
  232                         ns.flags = state_to_flags(ns, total >= budget);
  233                 } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
  234                 critical_exit();
  235 
  236                 if (ns.flags == ABDICATED)
  237                         counter_u64_add(r->abdications, 1);
  238                 if (ns.flags != BUSY) {
  239                         /* Wrong loop exit if we're going to stall. */
  240                         MPASS(ns.flags != STALLED);
  241                         if (prev == STALLED) {
  242                                 MPASS(total > 0);
  243                                 counter_u64_add(r->restarts, 1);
  244                         }
  245                         break;
  246                 }
  247 
  248                 /*
  249                  * The acquire style atomic above guarantees visibility of items
  250                  * associated with any pidx change that we notice here.
  251                  */
  252                 pidx = ns.pidx_tail;
  253                 pending = 0;
  254         }
  255 }
  256 #endif
  257 
  258 int
  259 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
  260     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
  261 {
  262         struct ifmp_ring *r;
  263 
  264         /* All idx are 16b so size can be 65536 at most */
  265         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
  266             can_drain == NULL)
  267                 return (EINVAL);
  268         *pr = NULL;
  269         flags &= M_NOWAIT | M_WAITOK;
  270         MPASS(flags != 0);
  271 
  272         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
  273         if (r == NULL)
  274                 return (ENOMEM);
  275         r->size = size;
  276         r->cookie = cookie;
  277         r->mt = mt;
  278         r->drain = drain;
  279         r->can_drain = can_drain;
  280         r->enqueues = counter_u64_alloc(flags);
  281         r->drops = counter_u64_alloc(flags);
  282         r->starts = counter_u64_alloc(flags);
  283         r->stalls = counter_u64_alloc(flags);
  284         r->restarts = counter_u64_alloc(flags);
  285         r->abdications = counter_u64_alloc(flags);
  286         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
  287             r->stalls == NULL || r->restarts == NULL ||
  288             r->abdications == NULL) {
  289                 ifmp_ring_free(r);
  290                 return (ENOMEM);
  291         }
  292 
  293         *pr = r;
  294 #ifdef NO_64BIT_ATOMICS
  295         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
  296 #endif
  297         return (0);
  298 }
  299 
  300 void
  301 ifmp_ring_free(struct ifmp_ring *r)
  302 {
  303 
  304         if (r == NULL)
  305                 return;
  306 
  307         if (r->enqueues != NULL)
  308                 counter_u64_free(r->enqueues);
  309         if (r->drops != NULL)
  310                 counter_u64_free(r->drops);
  311         if (r->starts != NULL)
  312                 counter_u64_free(r->starts);
  313         if (r->stalls != NULL)
  314                 counter_u64_free(r->stalls);
  315         if (r->restarts != NULL)
  316                 counter_u64_free(r->restarts);
  317         if (r->abdications != NULL)
  318                 counter_u64_free(r->abdications);
  319 
  320         free(r, r->mt);
  321 }
  322 
  323 /*
  324  * Enqueue n items and maybe drain the ring for some time.
  325  *
  326  * Returns an errno.
  327  */
  328 #ifdef NO_64BIT_ATOMICS
  329 int
  330 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
  331 {
  332         union ring_state os, ns;
  333         uint16_t pidx_start, pidx_stop;
  334         int i;
  335 
  336         MPASS(items != NULL);
  337         MPASS(n > 0);
  338 
  339         mtx_lock(&r->lock);
  340         /*
  341          * Reserve room for the new items.  Our reservation, if successful, is
  342          * from 'pidx_start' to 'pidx_stop'.
  343          */
  344         os.state = r->state;
  345         if (n >= space_available(r, os)) {
  346                 counter_u64_add(r->drops, n);
  347                 MPASS(os.flags != IDLE);
  348                 if (os.flags == STALLED)
  349                         ifmp_ring_check_drainage(r, 0);
  350                 return (ENOBUFS);
  351         }
  352         ns.state = os.state;
  353         ns.pidx_head = increment_idx(r, os.pidx_head, n);
  354         r->state = ns.state;
  355         pidx_start = os.pidx_head;
  356         pidx_stop = ns.pidx_head;
  357 
  358         /*
  359          * Wait for other producers who got in ahead of us to enqueue their
  360          * items, one producer at a time.  It is our turn when the ring's
  361          * pidx_tail reaches the beginning of our reservation (pidx_start).
  362          */
  363         while (ns.pidx_tail != pidx_start) {
  364                 cpu_spinwait();
  365                 ns.state = r->state;
  366         }
  367 
  368         /* Now it is our turn to fill up the area we reserved earlier. */
  369         i = pidx_start;
  370         do {
  371                 r->items[i] = *items++;
  372                 if (__predict_false(++i == r->size))
  373                         i = 0;
  374         } while (i != pidx_stop);
  375 
  376         /*
  377          * Update the ring's pidx_tail.  The release style atomic guarantees
  378          * that the items are visible to any thread that sees the updated pidx.
  379          */
  380         os.state = ns.state = r->state;
  381         ns.pidx_tail = pidx_stop;
  382         ns.flags = BUSY;
  383         r->state = ns.state;
  384         counter_u64_add(r->enqueues, n);
  385 
  386         /*
  387          * Turn into a consumer if some other thread isn't active as a consumer
  388          * already.
  389          */
  390         if (os.flags != BUSY)
  391                 drain_ring_locked(r, ns, os.flags, budget);
  392 
  393         mtx_unlock(&r->lock);
  394         return (0);
  395 }
  396 
  397 #else
  398 int
  399 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
  400 {
  401         union ring_state os, ns;
  402         uint16_t pidx_start, pidx_stop;
  403         int i;
  404 
  405         MPASS(items != NULL);
  406         MPASS(n > 0);
  407 
  408         /*
  409          * Reserve room for the new items.  Our reservation, if successful, is
  410          * from 'pidx_start' to 'pidx_stop'.
  411          */
  412         for (;;) {
  413                 os.state = r->state;
  414                 if (n >= space_available(r, os)) {
  415                         counter_u64_add(r->drops, n);
  416                         MPASS(os.flags != IDLE);
  417                         if (os.flags == STALLED)
  418                                 ifmp_ring_check_drainage(r, 0);
  419                         return (ENOBUFS);
  420                 }
  421                 ns.state = os.state;
  422                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
  423                 critical_enter();
  424                 if (atomic_cmpset_64(&r->state, os.state, ns.state))
  425                         break;
  426                 critical_exit();
  427                 cpu_spinwait();
  428         }
  429         pidx_start = os.pidx_head;
  430         pidx_stop = ns.pidx_head;
  431 
  432         /*
  433          * Wait for other producers who got in ahead of us to enqueue their
  434          * items, one producer at a time.  It is our turn when the ring's
  435          * pidx_tail reaches the beginning of our reservation (pidx_start).
  436          */
  437         while (ns.pidx_tail != pidx_start) {
  438                 cpu_spinwait();
  439                 ns.state = r->state;
  440         }
  441 
  442         /* Now it is our turn to fill up the area we reserved earlier. */
  443         i = pidx_start;
  444         do {
  445                 r->items[i] = *items++;
  446                 if (__predict_false(++i == r->size))
  447                         i = 0;
  448         } while (i != pidx_stop);
  449 
  450         /*
  451          * Update the ring's pidx_tail.  The release style atomic guarantees
  452          * that the items are visible to any thread that sees the updated pidx.
  453          */
  454         do {
  455                 os.state = ns.state = r->state;
  456                 ns.pidx_tail = pidx_stop;
  457                 ns.flags = BUSY;
  458         } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
  459         critical_exit();
  460         counter_u64_add(r->enqueues, n);
  461 
  462         /*
  463          * Turn into a consumer if some other thread isn't active as a consumer
  464          * already.
  465          */
  466         if (os.flags != BUSY)
  467                 drain_ring_lockless(r, ns, os.flags, budget);
  468 
  469         return (0);
  470 }
  471 #endif
  472 
  473 void
  474 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
  475 {
  476         union ring_state os, ns;
  477 
  478         os.state = r->state;
  479         if (os.flags != STALLED || os.pidx_head != os.pidx_tail || r->can_drain(r) == 0)
  480                 return;
  481 
  482         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
  483         ns.state = os.state;
  484         ns.flags = BUSY;
  485 
  486 
  487 #ifdef NO_64BIT_ATOMICS
  488         mtx_lock(&r->lock);
  489         if (r->state != os.state) {
  490                 mtx_unlock(&r->lock);
  491                 return;
  492         }
  493         r->state = ns.state;
  494         drain_ring_locked(r, ns, os.flags, budget);
  495         mtx_unlock(&r->lock);
  496 #else
  497         /*
  498          * The acquire style atomic guarantees visibility of items associated
  499          * with the pidx that we read here.
  500          */
  501         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
  502                 return;
  503 
  504 
  505         drain_ring_lockless(r, ns, os.flags, budget);
  506 #endif
  507 }
  508 
  509 void
  510 ifmp_ring_reset_stats(struct ifmp_ring *r)
  511 {
  512 
  513         counter_u64_zero(r->enqueues);
  514         counter_u64_zero(r->drops);
  515         counter_u64_zero(r->starts);
  516         counter_u64_zero(r->stalls);
  517         counter_u64_zero(r->restarts);
  518         counter_u64_zero(r->abdications);
  519 }
  520 
  521 int
  522 ifmp_ring_is_idle(struct ifmp_ring *r)
  523 {
  524         union ring_state s;
  525 
  526         s.state = r->state;
  527         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
  528             s.flags == IDLE)
  529                 return (1);
  530 
  531         return (0);
  532 }
  533 
  534 int
  535 ifmp_ring_is_stalled(struct ifmp_ring *r)
  536 {
  537         union ring_state s;
  538 
  539         s.state = r->state;
  540         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
  541                 return (1);
  542 
  543         return (0);
  544 }

Cache object: 5f81c015036ac49d935d239feddc109f


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.