The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/net/mp_ring.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2014 Chelsio Communications, Inc.
    3  * All rights reserved.
    4  * Written by: Navdeep Parhar <np@FreeBSD.org>
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions
    8  * are met:
    9  * 1. Redistributions of source code must retain the above copyright
   10  *    notice, this list of conditions and the following disclaimer.
   11  * 2. Redistributions in binary form must reproduce the above copyright
   12  *    notice, this list of conditions and the following disclaimer in the
   13  *    documentation and/or other materials provided with the distribution.
   14  *
   15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   25  * SUCH DAMAGE.
   26  */
   27 
   28 #include <sys/cdefs.h>
   29 __FBSDID("$FreeBSD$");
   30 
   31 #include <sys/types.h>
   32 #include <sys/param.h>
   33 #include <sys/systm.h>
   34 #include <sys/counter.h>
   35 #include <sys/lock.h>
   36 #include <sys/mutex.h>
   37 #include <sys/malloc.h>
   38 #include <machine/cpu.h>
   39 
   40 #if defined(__i386__)
   41 #define atomic_cmpset_acq_64 atomic_cmpset_64
   42 #define atomic_cmpset_rel_64 atomic_cmpset_64
   43 #endif
   44 
   45 #include <net/mp_ring.h>
   46 
   47 union ring_state {
   48         struct {
   49                 uint16_t pidx_head;
   50                 uint16_t pidx_tail;
   51                 uint16_t cidx;
   52                 uint16_t flags;
   53         };
   54         uint64_t state;
   55 };
   56 
   57 enum {
   58         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
   59         BUSY,           /* consumer is running already, or will be shortly. */
   60         STALLED,        /* consumer stopped due to lack of resources. */
   61         ABDICATED,      /* consumer stopped even though there was work to be
   62                            done because it wants another thread to take over. */
   63 };
   64 
   65 static inline uint16_t
   66 space_available(struct ifmp_ring *r, union ring_state s)
   67 {
   68         uint16_t x = r->size - 1;
   69 
   70         if (s.cidx == s.pidx_head)
   71                 return (x);
   72         else if (s.cidx > s.pidx_head)
   73                 return (s.cidx - s.pidx_head - 1);
   74         else
   75                 return (x - s.pidx_head + s.cidx);
   76 }
   77 
   78 static inline uint16_t
   79 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
   80 {
   81         int x = r->size - idx;
   82 
   83         MPASS(x > 0);
   84         return (x > n ? idx + n : n - x);
   85 }
   86 
   87 /* Consumer is about to update the ring's state to s */
   88 static inline uint16_t
   89 state_to_flags(union ring_state s, int abdicate)
   90 {
   91 
   92         if (s.cidx == s.pidx_tail)
   93                 return (IDLE);
   94         else if (abdicate && s.pidx_tail != s.pidx_head)
   95                 return (ABDICATED);
   96 
   97         return (BUSY);
   98 }
   99 
  100 #ifdef MP_RING_NO_64BIT_ATOMICS
  101 static void
  102 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
  103 {
  104         union ring_state ns;
  105         int n, pending, total;
  106         uint16_t cidx = os.cidx;
  107         uint16_t pidx = os.pidx_tail;
  108 
  109         MPASS(os.flags == BUSY);
  110         MPASS(cidx != pidx);
  111 
  112         if (prev == IDLE)
  113                 counter_u64_add(r->starts, 1);
  114         pending = 0;
  115         total = 0;
  116 
  117         while (cidx != pidx) {
  118 
  119                 /* Items from cidx to pidx are available for consumption. */
  120                 n = r->drain(r, cidx, pidx);
  121                 if (n == 0) {
  122                         os.state = ns.state = r->state;
  123                         ns.cidx = cidx;
  124                         ns.flags = STALLED;
  125                         r->state = ns.state;
  126                         if (prev != STALLED)
  127                                 counter_u64_add(r->stalls, 1);
  128                         else if (total > 0) {
  129                                 counter_u64_add(r->restarts, 1);
  130                                 counter_u64_add(r->stalls, 1);
  131                         }
  132                         break;
  133                 }
  134                 cidx = increment_idx(r, cidx, n);
  135                 pending += n;
  136                 total += n;
  137 
  138                 /*
  139                  * We update the cidx only if we've caught up with the pidx, the
  140                  * real cidx is getting too far ahead of the one visible to
  141                  * everyone else, or we have exceeded our budget.
  142                  */
  143                 if (cidx != pidx && pending < 64 && total < budget)
  144                         continue;
  145 
  146                 os.state = ns.state = r->state;
  147                 ns.cidx = cidx;
  148                 ns.flags = state_to_flags(ns, total >= budget);
  149                 r->state = ns.state;
  150 
  151                 if (ns.flags == ABDICATED)
  152                         counter_u64_add(r->abdications, 1);
  153                 if (ns.flags != BUSY) {
  154                         /* Wrong loop exit if we're going to stall. */
  155                         MPASS(ns.flags != STALLED);
  156                         if (prev == STALLED) {
  157                                 MPASS(total > 0);
  158                                 counter_u64_add(r->restarts, 1);
  159                         }
  160                         break;
  161                 }
  162 
  163                 /*
  164                  * The acquire style atomic above guarantees visibility of items
  165                  * associated with any pidx change that we notice here.
  166                  */
  167                 pidx = ns.pidx_tail;
  168                 pending = 0;
  169         }
  170 }
  171 #else
  172 /*
  173  * Caller passes in a state, with a guarantee that there is work to do and that
  174  * all items up to the pidx_tail in the state are visible.
  175  */
  176 static void
  177 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
  178 {
  179         union ring_state ns;
  180         int n, pending, total;
  181         uint16_t cidx = os.cidx;
  182         uint16_t pidx = os.pidx_tail;
  183 
  184         MPASS(os.flags == BUSY);
  185         MPASS(cidx != pidx);
  186 
  187         if (prev == IDLE)
  188                 counter_u64_add(r->starts, 1);
  189         pending = 0;
  190         total = 0;
  191 
  192         while (cidx != pidx) {
  193 
  194                 /* Items from cidx to pidx are available for consumption. */
  195                 n = r->drain(r, cidx, pidx);
  196                 if (n == 0) {
  197                         critical_enter();
  198                         do {
  199                                 os.state = ns.state = r->state;
  200                                 ns.cidx = cidx;
  201                                 ns.flags = STALLED;
  202                         } while (atomic_cmpset_64(&r->state, os.state,
  203                             ns.state) == 0);
  204                         critical_exit();
  205                         if (prev != STALLED)
  206                                 counter_u64_add(r->stalls, 1);
  207                         else if (total > 0) {
  208                                 counter_u64_add(r->restarts, 1);
  209                                 counter_u64_add(r->stalls, 1);
  210                         }
  211                         break;
  212                 }
  213                 cidx = increment_idx(r, cidx, n);
  214                 pending += n;
  215                 total += n;
  216 
  217                 /*
  218                  * We update the cidx only if we've caught up with the pidx, the
  219                  * real cidx is getting too far ahead of the one visible to
  220                  * everyone else, or we have exceeded our budget.
  221                  */
  222                 if (cidx != pidx && pending < 64 && total < budget)
  223                         continue;
  224                 critical_enter();
  225                 do {
  226                         os.state = ns.state = r->state;
  227                         ns.cidx = cidx;
  228                         ns.flags = state_to_flags(ns, total >= budget);
  229                 } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
  230                 critical_exit();
  231 
  232                 if (ns.flags == ABDICATED)
  233                         counter_u64_add(r->abdications, 1);
  234                 if (ns.flags != BUSY) {
  235                         /* Wrong loop exit if we're going to stall. */
  236                         MPASS(ns.flags != STALLED);
  237                         if (prev == STALLED) {
  238                                 MPASS(total > 0);
  239                                 counter_u64_add(r->restarts, 1);
  240                         }
  241                         break;
  242                 }
  243 
  244                 /*
  245                  * The acquire style atomic above guarantees visibility of items
  246                  * associated with any pidx change that we notice here.
  247                  */
  248                 pidx = ns.pidx_tail;
  249                 pending = 0;
  250         }
  251 }
  252 #endif
  253 
  254 int
  255 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
  256     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
  257 {
  258         struct ifmp_ring *r;
  259 
  260         /* All idx are 16b so size can be 65536 at most */
  261         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
  262             can_drain == NULL)
  263                 return (EINVAL);
  264         *pr = NULL;
  265         flags &= M_NOWAIT | M_WAITOK;
  266         MPASS(flags != 0);
  267 
  268         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
  269         if (r == NULL)
  270                 return (ENOMEM);
  271         r->size = size;
  272         r->cookie = cookie;
  273         r->mt = mt;
  274         r->drain = drain;
  275         r->can_drain = can_drain;
  276         r->enqueues = counter_u64_alloc(flags);
  277         r->drops = counter_u64_alloc(flags);
  278         r->starts = counter_u64_alloc(flags);
  279         r->stalls = counter_u64_alloc(flags);
  280         r->restarts = counter_u64_alloc(flags);
  281         r->abdications = counter_u64_alloc(flags);
  282         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
  283             r->stalls == NULL || r->restarts == NULL ||
  284             r->abdications == NULL) {
  285                 ifmp_ring_free(r);
  286                 return (ENOMEM);
  287         }
  288 
  289         *pr = r;
  290 #ifdef MP_RING_NO_64BIT_ATOMICS
  291         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
  292 #endif
  293         return (0);
  294 }
  295 
  296 void
  297 ifmp_ring_free(struct ifmp_ring *r)
  298 {
  299 
  300         if (r == NULL)
  301                 return;
  302 
  303         if (r->enqueues != NULL)
  304                 counter_u64_free(r->enqueues);
  305         if (r->drops != NULL)
  306                 counter_u64_free(r->drops);
  307         if (r->starts != NULL)
  308                 counter_u64_free(r->starts);
  309         if (r->stalls != NULL)
  310                 counter_u64_free(r->stalls);
  311         if (r->restarts != NULL)
  312                 counter_u64_free(r->restarts);
  313         if (r->abdications != NULL)
  314                 counter_u64_free(r->abdications);
  315 
  316         free(r, r->mt);
  317 }
  318 
  319 /*
  320  * Enqueue n items and maybe drain the ring for some time.
  321  *
  322  * Returns an errno.
  323  */
  324 #ifdef MP_RING_NO_64BIT_ATOMICS
  325 int
  326 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
  327 {
  328         union ring_state os, ns;
  329         uint16_t pidx_start, pidx_stop;
  330         int i;
  331 
  332         MPASS(items != NULL);
  333         MPASS(n > 0);
  334 
  335         mtx_lock(&r->lock);
  336         /*
  337          * Reserve room for the new items.  Our reservation, if successful, is
  338          * from 'pidx_start' to 'pidx_stop'.
  339          */
  340         os.state = r->state;
  341         if (n >= space_available(r, os)) {
  342                 counter_u64_add(r->drops, n);
  343                 MPASS(os.flags != IDLE);
  344                 mtx_unlock(&r->lock);
  345                 if (os.flags == STALLED)
  346                         ifmp_ring_check_drainage(r, 0);
  347                 return (ENOBUFS);
  348         }
  349         ns.state = os.state;
  350         ns.pidx_head = increment_idx(r, os.pidx_head, n);
  351         r->state = ns.state;
  352         pidx_start = os.pidx_head;
  353         pidx_stop = ns.pidx_head;
  354 
  355         /*
  356          * Wait for other producers who got in ahead of us to enqueue their
  357          * items, one producer at a time.  It is our turn when the ring's
  358          * pidx_tail reaches the beginning of our reservation (pidx_start).
  359          */
  360         while (ns.pidx_tail != pidx_start) {
  361                 cpu_spinwait();
  362                 ns.state = r->state;
  363         }
  364 
  365         /* Now it is our turn to fill up the area we reserved earlier. */
  366         i = pidx_start;
  367         do {
  368                 r->items[i] = *items++;
  369                 if (__predict_false(++i == r->size))
  370                         i = 0;
  371         } while (i != pidx_stop);
  372 
  373         /*
  374          * Update the ring's pidx_tail.  The release style atomic guarantees
  375          * that the items are visible to any thread that sees the updated pidx.
  376          */
  377         os.state = ns.state = r->state;
  378         ns.pidx_tail = pidx_stop;
  379         ns.flags = BUSY;
  380         r->state = ns.state;
  381         counter_u64_add(r->enqueues, n);
  382 
  383         /*
  384          * Turn into a consumer if some other thread isn't active as a consumer
  385          * already.
  386          */
  387         if (os.flags != BUSY)
  388                 drain_ring_locked(r, ns, os.flags, budget);
  389 
  390         mtx_unlock(&r->lock);
  391         return (0);
  392 }
  393 
  394 #else
  395 int
  396 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
  397 {
  398         union ring_state os, ns;
  399         uint16_t pidx_start, pidx_stop;
  400         int i;
  401 
  402         MPASS(items != NULL);
  403         MPASS(n > 0);
  404 
  405         /*
  406          * Reserve room for the new items.  Our reservation, if successful, is
  407          * from 'pidx_start' to 'pidx_stop'.
  408          */
  409         for (;;) {
  410                 os.state = r->state;
  411                 if (n >= space_available(r, os)) {
  412                         counter_u64_add(r->drops, n);
  413                         MPASS(os.flags != IDLE);
  414                         if (os.flags == STALLED)
  415                                 ifmp_ring_check_drainage(r, 0);
  416                         return (ENOBUFS);
  417                 }
  418                 ns.state = os.state;
  419                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
  420                 critical_enter();
  421                 if (atomic_cmpset_64(&r->state, os.state, ns.state))
  422                         break;
  423                 critical_exit();
  424                 cpu_spinwait();
  425         }
  426         pidx_start = os.pidx_head;
  427         pidx_stop = ns.pidx_head;
  428 
  429         /*
  430          * Wait for other producers who got in ahead of us to enqueue their
  431          * items, one producer at a time.  It is our turn when the ring's
  432          * pidx_tail reaches the beginning of our reservation (pidx_start).
  433          */
  434         while (ns.pidx_tail != pidx_start) {
  435                 cpu_spinwait();
  436                 ns.state = r->state;
  437         }
  438 
  439         /* Now it is our turn to fill up the area we reserved earlier. */
  440         i = pidx_start;
  441         do {
  442                 r->items[i] = *items++;
  443                 if (__predict_false(++i == r->size))
  444                         i = 0;
  445         } while (i != pidx_stop);
  446 
  447         /*
  448          * Update the ring's pidx_tail.  The release style atomic guarantees
  449          * that the items are visible to any thread that sees the updated pidx.
  450          */
  451         do {
  452                 os.state = ns.state = r->state;
  453                 ns.pidx_tail = pidx_stop;
  454                 if (os.flags == IDLE)
  455                         ns.flags = ABDICATED;
  456         } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
  457         critical_exit();
  458         counter_u64_add(r->enqueues, n);
  459 
  460         return (0);
  461 }
  462 #endif
  463 
  464 void
  465 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
  466 {
  467         union ring_state os, ns;
  468 
  469         os.state = r->state;
  470         if ((os.flags != STALLED && os.flags != ABDICATED) ||   // Only continue in STALLED and ABDICATED
  471             os.pidx_head != os.pidx_tail ||                     // Require work to be available
  472             (os.flags != ABDICATED && r->can_drain(r) == 0))    // Can either drain, or everyone left
  473                 return;
  474 
  475         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
  476         ns.state = os.state;
  477         ns.flags = BUSY;
  478 
  479 
  480 #ifdef MP_RING_NO_64BIT_ATOMICS
  481         mtx_lock(&r->lock);
  482         if (r->state != os.state) {
  483                 mtx_unlock(&r->lock);
  484                 return;
  485         }
  486         r->state = ns.state;
  487         drain_ring_locked(r, ns, os.flags, budget);
  488         mtx_unlock(&r->lock);
  489 #else
  490         /*
  491          * The acquire style atomic guarantees visibility of items associated
  492          * with the pidx that we read here.
  493          */
  494         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
  495                 return;
  496 
  497 
  498         drain_ring_lockless(r, ns, os.flags, budget);
  499 #endif
  500 }
  501 
  502 void
  503 ifmp_ring_reset_stats(struct ifmp_ring *r)
  504 {
  505 
  506         counter_u64_zero(r->enqueues);
  507         counter_u64_zero(r->drops);
  508         counter_u64_zero(r->starts);
  509         counter_u64_zero(r->stalls);
  510         counter_u64_zero(r->restarts);
  511         counter_u64_zero(r->abdications);
  512 }
  513 
  514 int
  515 ifmp_ring_is_idle(struct ifmp_ring *r)
  516 {
  517         union ring_state s;
  518 
  519         s.state = r->state;
  520         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
  521             s.flags == IDLE)
  522                 return (1);
  523 
  524         return (0);
  525 }
  526 
  527 int
  528 ifmp_ring_is_stalled(struct ifmp_ring *r)
  529 {
  530         union ring_state s;
  531 
  532         s.state = r->state;
  533         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
  534                 return (1);
  535 
  536         return (0);
  537 }

Cache object: bbc21bd2debba48516dbbe3d95531f79


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.