The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/contrib/ck/include/ck_ring.h

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * Copyright 2009-2015 Samy Al Bahra.
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions
    7  * are met:
    8  * 1. Redistributions of source code must retain the above copyright
    9  *    notice, this list of conditions and the following disclaimer.
   10  * 2. Redistributions in binary form must reproduce the above copyright
   11  *    notice, this list of conditions and the following disclaimer in the
   12  *    documentation and/or other materials provided with the distribution.
   13  *
   14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
   15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
   18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   24  * SUCH DAMAGE.
   25  */
   26 
   27 #ifndef CK_RING_H
   28 #define CK_RING_H
   29 
   30 #include <ck_cc.h>
   31 #include <ck_md.h>
   32 #include <ck_pr.h>
   33 #include <ck_stdbool.h>
   34 #include <ck_string.h>
   35 
   36 /*
   37  * Concurrent ring buffer.
   38  */
   39 
   40 struct ck_ring {
   41         unsigned int c_head;
   42         char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
   43         unsigned int p_tail;
   44         unsigned int p_head;
   45         char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
   46         unsigned int size;
   47         unsigned int mask;
   48 };
   49 typedef struct ck_ring ck_ring_t;
   50 
   51 struct ck_ring_buffer {
   52         void *value;
   53 };
   54 typedef struct ck_ring_buffer ck_ring_buffer_t;
   55 
   56 CK_CC_INLINE static unsigned int
   57 ck_ring_size(const struct ck_ring *ring)
   58 {
   59         unsigned int c, p;
   60 
   61         c = ck_pr_load_uint(&ring->c_head);
   62         p = ck_pr_load_uint(&ring->p_tail);
   63         return (p - c) & ring->mask;
   64 }
   65 
   66 CK_CC_INLINE static unsigned int
   67 ck_ring_capacity(const struct ck_ring *ring)
   68 {
   69 
   70         return ring->size;
   71 }
   72 
   73 /*
   74  * This function is only safe to call when there are no concurrent operations
   75  * on the ring. This is primarily meant for persistent ck_ring use-cases. The
   76  * function returns true if any mutations were performed on the ring.
   77  */
   78 CK_CC_INLINE static bool
   79 ck_ring_repair(struct ck_ring *ring)
   80 {
   81         bool r = false;
   82 
   83         if (ring->p_tail != ring->p_head) {
   84                 ring->p_tail = ring->p_head;
   85                 r = true;
   86         }
   87 
   88         return r;
   89 }
   90 
   91 /*
   92  * This can be called when no concurrent updates are occurring on the ring
   93  * structure to check for consistency. This is primarily meant to be used for
   94  * persistent storage of the ring. If this functions returns false, the ring
   95  * is in an inconsistent state.
   96  */
   97 CK_CC_INLINE static bool
   98 ck_ring_valid(const struct ck_ring *ring)
   99 {
  100         unsigned int size = ring->size;
  101         unsigned int c_head = ring->c_head;
  102         unsigned int p_head = ring->p_head;
  103 
  104         /* The ring must be a power of 2. */
  105         if (size & (size - 1))
  106                 return false;
  107 
  108         /* The consumer counter must always be smaller than the producer. */
  109         if (c_head > p_head)
  110                 return false;
  111 
  112         /* The producer may only be up to size slots ahead of consumer. */
  113         if (p_head - c_head >= size)
  114                 return false;
  115 
  116         return true;
  117 }
  118 
  119 CK_CC_INLINE static void
  120 ck_ring_init(struct ck_ring *ring, unsigned int size)
  121 {
  122 
  123         ring->size = size;
  124         ring->mask = size - 1;
  125         ring->p_tail = 0;
  126         ring->p_head = 0;
  127         ring->c_head = 0;
  128         return;
  129 }
  130 
  131 /*
  132  * The _ck_ring_* namespace is internal only and must not used externally.
  133  */
  134 
  135 /*
  136  * This function will return a region of memory to write for the next value
  137  * for a single producer.
  138  */
  139 CK_CC_FORCE_INLINE static void *
  140 _ck_ring_enqueue_reserve_sp(struct ck_ring *ring,
  141     void *CK_CC_RESTRICT buffer,
  142     unsigned int ts,
  143     unsigned int *size)
  144 {
  145         const unsigned int mask = ring->mask;
  146         unsigned int consumer, producer, delta;
  147 
  148         consumer = ck_pr_load_uint(&ring->c_head);
  149         producer = ring->p_tail;
  150         delta = producer + 1;
  151         if (size != NULL)
  152                 *size = (producer - consumer) & mask;
  153 
  154         if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
  155                 return NULL;
  156 
  157         return (char *)buffer + ts * (producer & mask);
  158 }
  159 
  160 /*
  161  * This is to be called to commit and make visible a region of previously
  162  * reserved with reverse_sp.
  163  */
  164 CK_CC_FORCE_INLINE static void
  165 _ck_ring_enqueue_commit_sp(struct ck_ring *ring)
  166 {
  167 
  168         ck_pr_fence_store();
  169         ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1);
  170         return;
  171 }
  172 
  173 CK_CC_FORCE_INLINE static bool
  174 _ck_ring_enqueue_sp(struct ck_ring *ring,
  175     void *CK_CC_RESTRICT buffer,
  176     const void *CK_CC_RESTRICT entry,
  177     unsigned int ts,
  178     unsigned int *size)
  179 {
  180         const unsigned int mask = ring->mask;
  181         unsigned int consumer, producer, delta;
  182 
  183         consumer = ck_pr_load_uint(&ring->c_head);
  184         producer = ring->p_tail;
  185         delta = producer + 1;
  186         if (size != NULL)
  187                 *size = (producer - consumer) & mask;
  188 
  189         if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
  190                 return false;
  191 
  192         buffer = (char *)buffer + ts * (producer & mask);
  193         memcpy(buffer, entry, ts);
  194 
  195         /*
  196          * Make sure to update slot value before indicating
  197          * that the slot is available for consumption.
  198          */
  199         ck_pr_fence_store();
  200         ck_pr_store_uint(&ring->p_tail, delta);
  201         return true;
  202 }
  203 
  204 CK_CC_FORCE_INLINE static bool
  205 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
  206     void *CK_CC_RESTRICT buffer,
  207     const void *CK_CC_RESTRICT entry,
  208     unsigned int ts,
  209     unsigned int *size)
  210 {
  211         unsigned int sz;
  212         bool r;
  213 
  214         r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
  215         *size = sz;
  216         return r;
  217 }
  218 
  219 CK_CC_FORCE_INLINE static bool
  220 _ck_ring_dequeue_sc(struct ck_ring *ring,
  221     const void *CK_CC_RESTRICT buffer,
  222     void *CK_CC_RESTRICT target,
  223     unsigned int size)
  224 {
  225         const unsigned int mask = ring->mask;
  226         unsigned int consumer, producer;
  227 
  228         consumer = ring->c_head;
  229         producer = ck_pr_load_uint(&ring->p_tail);
  230 
  231         if (CK_CC_UNLIKELY(consumer == producer))
  232                 return false;
  233 
  234         /*
  235          * Make sure to serialize with respect to our snapshot
  236          * of the producer counter.
  237          */
  238         ck_pr_fence_load();
  239 
  240         buffer = (const char *)buffer + size * (consumer & mask);
  241         memcpy(target, buffer, size);
  242 
  243         /*
  244          * Make sure copy is completed with respect to consumer
  245          * update.
  246          */
  247         ck_pr_fence_store();
  248         ck_pr_store_uint(&ring->c_head, consumer + 1);
  249         return true;
  250 }
  251 
  252 CK_CC_FORCE_INLINE static void *
  253 _ck_ring_enqueue_reserve_mp(struct ck_ring *ring,
  254     void *buffer,
  255     unsigned int ts,
  256     unsigned int *ticket,
  257     unsigned int *size)
  258 {
  259         const unsigned int mask = ring->mask;
  260         unsigned int producer, consumer, delta;
  261 
  262         producer = ck_pr_load_uint(&ring->p_head);
  263 
  264         for (;;) {
  265                 ck_pr_fence_load();
  266                 consumer = ck_pr_load_uint(&ring->c_head);
  267 
  268                 delta = producer + 1;
  269 
  270                 if (CK_CC_LIKELY((producer - consumer) < mask)) {
  271                         if (ck_pr_cas_uint_value(&ring->p_head,
  272                             producer, delta, &producer) == true) {
  273                                 break;
  274                         }
  275                 } else {
  276                         unsigned int new_producer;
  277 
  278                         ck_pr_fence_load();
  279                         new_producer = ck_pr_load_uint(&ring->p_head);
  280 
  281                         if (producer == new_producer) {
  282                                 if (size != NULL)
  283                                         *size = (producer - consumer) & mask;
  284 
  285                                 return false;
  286                         }
  287 
  288                         producer = new_producer;
  289                 }
  290         }
  291 
  292         *ticket = producer;
  293         if (size != NULL)
  294                 *size = (producer - consumer) & mask;
  295 
  296         return (char *)buffer + ts * (producer & mask);
  297 }
  298 
  299 CK_CC_FORCE_INLINE static void
  300 _ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer)
  301 {
  302 
  303         while (ck_pr_load_uint(&ring->p_tail) != producer)
  304                 ck_pr_stall();
  305 
  306         ck_pr_fence_store();
  307         ck_pr_store_uint(&ring->p_tail, producer + 1);
  308         return;
  309 }
  310 
  311 CK_CC_FORCE_INLINE static bool
  312 _ck_ring_enqueue_mp(struct ck_ring *ring,
  313     void *buffer,
  314     const void *entry,
  315     unsigned int ts,
  316     unsigned int *size)
  317 {
  318         const unsigned int mask = ring->mask;
  319         unsigned int producer, consumer, delta;
  320         bool r = true;
  321 
  322         producer = ck_pr_load_uint(&ring->p_head);
  323 
  324         for (;;) {
  325                 /*
  326                  * The snapshot of producer must be up to date with respect to
  327                  * consumer.
  328                  */
  329                 ck_pr_fence_load();
  330                 consumer = ck_pr_load_uint(&ring->c_head);
  331 
  332                 delta = producer + 1;
  333 
  334                 /*
  335                  * Only try to CAS if the producer is not clearly stale (not
  336                  * less than consumer) and the buffer is definitely not full.
  337                  */
  338                 if (CK_CC_LIKELY((producer - consumer) < mask)) {
  339                         if (ck_pr_cas_uint_value(&ring->p_head,
  340                             producer, delta, &producer) == true) {
  341                                 break;
  342                         }
  343                 } else {
  344                         unsigned int new_producer;
  345 
  346                         /*
  347                          * Slow path.  Either the buffer is full or we have a
  348                          * stale snapshot of p_head.  Execute a second read of
  349                          * p_read that must be ordered wrt the snapshot of
  350                          * c_head.
  351                          */
  352                         ck_pr_fence_load();
  353                         new_producer = ck_pr_load_uint(&ring->p_head);
  354 
  355                         /*
  356                          * Only fail if we haven't made forward progress in
  357                          * production: the buffer must have been full when we
  358                          * read new_producer (or we wrapped around UINT_MAX
  359                          * during this iteration).
  360                          */
  361                         if (producer == new_producer) {
  362                                 r = false;
  363                                 goto leave;
  364                         }
  365 
  366                         /*
  367                          * p_head advanced during this iteration. Try again.
  368                          */
  369                         producer = new_producer;
  370                 }
  371         }
  372 
  373         buffer = (char *)buffer + ts * (producer & mask);
  374         memcpy(buffer, entry, ts);
  375 
  376         /*
  377          * Wait until all concurrent producers have completed writing
  378          * their data into the ring buffer.
  379          */
  380         while (ck_pr_load_uint(&ring->p_tail) != producer)
  381                 ck_pr_stall();
  382 
  383         /*
  384          * Ensure that copy is completed before updating shared producer
  385          * counter.
  386          */
  387         ck_pr_fence_store();
  388         ck_pr_store_uint(&ring->p_tail, delta);
  389 
  390 leave:
  391         if (size != NULL)
  392                 *size = (producer - consumer) & mask;
  393 
  394         return r;
  395 }
  396 
  397 CK_CC_FORCE_INLINE static bool
  398 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
  399     void *buffer,
  400     const void *entry,
  401     unsigned int ts,
  402     unsigned int *size)
  403 {
  404         unsigned int sz;
  405         bool r;
  406 
  407         r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
  408         *size = sz;
  409         return r;
  410 }
  411 
  412 CK_CC_FORCE_INLINE static bool
  413 _ck_ring_trydequeue_mc(struct ck_ring *ring,
  414     const void *buffer,
  415     void *data,
  416     unsigned int size)
  417 {
  418         const unsigned int mask = ring->mask;
  419         unsigned int consumer, producer;
  420 
  421         consumer = ck_pr_load_uint(&ring->c_head);
  422         ck_pr_fence_load();
  423         producer = ck_pr_load_uint(&ring->p_tail);
  424 
  425         if (CK_CC_UNLIKELY(consumer == producer))
  426                 return false;
  427 
  428         ck_pr_fence_load();
  429 
  430         buffer = (const char *)buffer + size * (consumer & mask);
  431         memcpy(data, buffer, size);
  432 
  433         ck_pr_fence_store_atomic();
  434         return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
  435 }
  436 
  437 CK_CC_FORCE_INLINE static bool
  438 _ck_ring_dequeue_mc(struct ck_ring *ring,
  439     const void *buffer,
  440     void *data,
  441     unsigned int ts)
  442 {
  443         const unsigned int mask = ring->mask;
  444         unsigned int consumer, producer;
  445 
  446         consumer = ck_pr_load_uint(&ring->c_head);
  447 
  448         do {
  449                 const char *target;
  450 
  451                 /*
  452                  * Producer counter must represent state relative to
  453                  * our latest consumer snapshot.
  454                  */
  455                 ck_pr_fence_load();
  456                 producer = ck_pr_load_uint(&ring->p_tail);
  457 
  458                 if (CK_CC_UNLIKELY(consumer == producer))
  459                         return false;
  460 
  461                 ck_pr_fence_load();
  462 
  463                 target = (const char *)buffer + ts * (consumer & mask);
  464                 memcpy(data, target, ts);
  465 
  466                 /* Serialize load with respect to head update. */
  467                 ck_pr_fence_store_atomic();
  468         } while (ck_pr_cas_uint_value(&ring->c_head,
  469                                       consumer,
  470                                       consumer + 1,
  471                                       &consumer) == false);
  472 
  473         return true;
  474 }
  475 
  476 /*
  477  * The ck_ring_*_spsc namespace is the public interface for interacting with a
  478  * ring buffer containing pointers. Correctness is only provided if there is up
  479  * to one concurrent consumer and up to one concurrent producer.
  480  */
  481 CK_CC_INLINE static bool
  482 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
  483     struct ck_ring_buffer *buffer,
  484     const void *entry,
  485     unsigned int *size)
  486 {
  487 
  488         return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
  489             sizeof(entry), size);
  490 }
  491 
  492 CK_CC_INLINE static bool
  493 ck_ring_enqueue_spsc(struct ck_ring *ring,
  494     struct ck_ring_buffer *buffer,
  495     const void *entry)
  496 {
  497 
  498         return _ck_ring_enqueue_sp(ring, buffer,
  499             &entry, sizeof(entry), NULL);
  500 }
  501 
  502 CK_CC_INLINE static void *
  503 ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring,
  504     struct ck_ring_buffer *buffer,
  505     unsigned int *size)
  506 {
  507 
  508         return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
  509             size);
  510 }
  511 
  512 CK_CC_INLINE static void *
  513 ck_ring_enqueue_reserve_spsc(struct ck_ring *ring,
  514     struct ck_ring_buffer *buffer)
  515 {
  516 
  517         return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
  518             NULL);
  519 }
  520 
  521 CK_CC_INLINE static void
  522 ck_ring_enqueue_commit_spsc(struct ck_ring *ring)
  523 {
  524 
  525         _ck_ring_enqueue_commit_sp(ring);
  526         return;
  527 }
  528 
  529 CK_CC_INLINE static bool
  530 ck_ring_dequeue_spsc(struct ck_ring *ring,
  531     const struct ck_ring_buffer *buffer,
  532     void *data)
  533 {
  534 
  535         return _ck_ring_dequeue_sc(ring, buffer,
  536             (void **)data, sizeof(void *));
  537 }
  538 
  539 /*
  540  * The ck_ring_*_mpmc namespace is the public interface for interacting with a
  541  * ring buffer containing pointers. Correctness is provided for any number of
  542  * producers and consumers.
  543  */
  544 CK_CC_INLINE static bool
  545 ck_ring_enqueue_mpmc(struct ck_ring *ring,
  546     struct ck_ring_buffer *buffer,
  547     const void *entry)
  548 {
  549 
  550         return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL);
  551 }
  552 
  553 CK_CC_INLINE static bool
  554 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
  555     struct ck_ring_buffer *buffer,
  556     const void *entry,
  557     unsigned int *size)
  558 {
  559 
  560         return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry),
  561             size);
  562 }
  563 
  564 CK_CC_INLINE static void *
  565 ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring,
  566     struct ck_ring_buffer *buffer,
  567     unsigned int *ticket)
  568 {
  569 
  570         return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
  571             ticket, NULL);
  572 }
  573 
  574 CK_CC_INLINE static void *
  575 ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring,
  576     struct ck_ring_buffer *buffer,
  577     unsigned int *ticket,
  578     unsigned int *size)
  579 {
  580 
  581         return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
  582             ticket, size);
  583 }
  584 
  585 CK_CC_INLINE static void
  586 ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket)
  587 {
  588 
  589         _ck_ring_enqueue_commit_mp(ring, ticket);
  590         return;
  591 }
  592 
  593 CK_CC_INLINE static bool
  594 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
  595     const struct ck_ring_buffer *buffer,
  596     void *data)
  597 {
  598 
  599         return _ck_ring_trydequeue_mc(ring,
  600             buffer, (void **)data, sizeof(void *));
  601 }
  602 
  603 CK_CC_INLINE static bool
  604 ck_ring_dequeue_mpmc(struct ck_ring *ring,
  605     const struct ck_ring_buffer *buffer,
  606     void *data)
  607 {
  608 
  609         return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
  610             sizeof(void *));
  611 }
  612 
  613 /*
  614  * The ck_ring_*_spmc namespace is the public interface for interacting with a
  615  * ring buffer containing pointers. Correctness is provided for any number of
  616  * consumers with up to one concurrent producer.
  617  */
  618 CK_CC_INLINE static void *
  619 ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring,
  620     struct ck_ring_buffer *buffer,
  621     unsigned int *size)
  622 {
  623 
  624         return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size);
  625 }
  626 
  627 CK_CC_INLINE static void *
  628 ck_ring_enqueue_reserve_spmc(struct ck_ring *ring,
  629     struct ck_ring_buffer *buffer)
  630 {
  631 
  632         return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL);
  633 }
  634 
  635 CK_CC_INLINE static void
  636 ck_ring_enqueue_commit_spmc(struct ck_ring *ring)
  637 {
  638 
  639         _ck_ring_enqueue_commit_sp(ring);
  640         return;
  641 }
  642 
  643 CK_CC_INLINE static bool
  644 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
  645     struct ck_ring_buffer *buffer,
  646     const void *entry,
  647     unsigned int *size)
  648 {
  649 
  650         return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
  651             sizeof(entry), size);
  652 }
  653 
  654 CK_CC_INLINE static bool
  655 ck_ring_enqueue_spmc(struct ck_ring *ring,
  656     struct ck_ring_buffer *buffer,
  657     const void *entry)
  658 {
  659 
  660         return _ck_ring_enqueue_sp(ring, buffer, &entry,
  661             sizeof(entry), NULL);
  662 }
  663 
  664 CK_CC_INLINE static bool
  665 ck_ring_trydequeue_spmc(struct ck_ring *ring,
  666     const struct ck_ring_buffer *buffer,
  667     void *data)
  668 {
  669 
  670         return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
  671 }
  672 
  673 CK_CC_INLINE static bool
  674 ck_ring_dequeue_spmc(struct ck_ring *ring,
  675     const struct ck_ring_buffer *buffer,
  676     void *data)
  677 {
  678 
  679         return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
  680 }
  681 
  682 /*
  683  * The ck_ring_*_mpsc namespace is the public interface for interacting with a
  684  * ring buffer containing pointers. Correctness is provided for any number of
  685  * producers with up to one concurrent consumers.
  686  */
  687 CK_CC_INLINE static void *
  688 ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring,
  689     struct ck_ring_buffer *buffer,
  690     unsigned int *ticket)
  691 {
  692 
  693         return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
  694             ticket, NULL);
  695 }
  696 
  697 CK_CC_INLINE static void *
  698 ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring,
  699     struct ck_ring_buffer *buffer,
  700     unsigned int *ticket,
  701     unsigned int *size)
  702 {
  703 
  704         return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
  705             ticket, size);
  706 }
  707 
  708 CK_CC_INLINE static void
  709 ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket)
  710 {
  711 
  712         _ck_ring_enqueue_commit_mp(ring, ticket);
  713         return;
  714 }
  715 
  716 CK_CC_INLINE static bool
  717 ck_ring_enqueue_mpsc(struct ck_ring *ring,
  718     struct ck_ring_buffer *buffer,
  719     const void *entry)
  720 {
  721 
  722         return _ck_ring_enqueue_mp(ring, buffer, &entry,
  723             sizeof(entry), NULL);
  724 }
  725 
  726 CK_CC_INLINE static bool
  727 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
  728     struct ck_ring_buffer *buffer,
  729     const void *entry,
  730     unsigned int *size)
  731 {
  732 
  733         return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
  734             sizeof(entry), size);
  735 }
  736 
  737 CK_CC_INLINE static bool
  738 ck_ring_dequeue_mpsc(struct ck_ring *ring,
  739     const struct ck_ring_buffer *buffer,
  740     void *data)
  741 {
  742 
  743         return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
  744             sizeof(void *));
  745 }
  746 
  747 /*
  748  * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
  749  * values of a particular type in the ring the buffer.
  750  */
  751 #define CK_RING_PROTOTYPE(name, type)                           \
  752 CK_CC_INLINE static struct type *                               \
  753 ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a,          \
  754     struct type *b)                                             \
  755 {                                                               \
  756                                                                 \
  757         return _ck_ring_enqueue_reserve_sp(a, b,                \
  758             sizeof(struct type), NULL);                         \
  759 }                                                               \
  760                                                                 \
  761 CK_CC_INLINE static struct type *                               \
  762 ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a,     \
  763     struct type *b,                                             \
  764     unsigned int *c)                                            \
  765 {                                                               \
  766                                                                 \
  767         return _ck_ring_enqueue_reserve_sp(a, b,                \
  768             sizeof(struct type), c);                            \
  769 }                                                               \
  770                                                                 \
  771 CK_CC_INLINE static bool                                        \
  772 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,             \
  773     struct type *b,                                             \
  774     struct type *c,                                             \
  775     unsigned int *d)                                            \
  776 {                                                               \
  777                                                                 \
  778         return _ck_ring_enqueue_sp_size(a, b, c,                \
  779             sizeof(struct type), d);                            \
  780 }                                                               \
  781                                                                 \
  782 CK_CC_INLINE static bool                                        \
  783 ck_ring_enqueue_spsc_##name(struct ck_ring *a,                  \
  784     struct type *b,                                             \
  785     struct type *c)                                             \
  786 {                                                               \
  787                                                                 \
  788         return _ck_ring_enqueue_sp(a, b, c,                     \
  789             sizeof(struct type), NULL);                         \
  790 }                                                               \
  791                                                                 \
  792 CK_CC_INLINE static bool                                        \
  793 ck_ring_dequeue_spsc_##name(struct ck_ring *a,                  \
  794     struct type *b,                                             \
  795     struct type *c)                                             \
  796 {                                                               \
  797                                                                 \
  798         return _ck_ring_dequeue_sc(a, b, c,                     \
  799             sizeof(struct type));                               \
  800 }                                                               \
  801                                                                 \
  802 CK_CC_INLINE static struct type *                               \
  803 ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a,          \
  804     struct type *b)                                             \
  805 {                                                               \
  806                                                                 \
  807         return _ck_ring_enqueue_reserve_sp(a, b,                \
  808             sizeof(struct type), NULL);                         \
  809 }                                                               \
  810                                                                 \
  811 CK_CC_INLINE static struct type *                               \
  812 ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a,     \
  813     struct type *b,                                             \
  814     unsigned int *c)                                            \
  815 {                                                               \
  816                                                                 \
  817         return _ck_ring_enqueue_reserve_sp(a, b,                \
  818             sizeof(struct type), c);                            \
  819 }                                                               \
  820                                                                 \
  821 CK_CC_INLINE static bool                                        \
  822 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,             \
  823     struct type *b,                                             \
  824     struct type *c,                                             \
  825     unsigned int *d)                                            \
  826 {                                                               \
  827                                                                 \
  828         return _ck_ring_enqueue_sp_size(a, b, c,                \
  829             sizeof(struct type), d);                            \
  830 }                                                               \
  831                                                                 \
  832 CK_CC_INLINE static bool                                        \
  833 ck_ring_enqueue_spmc_##name(struct ck_ring *a,                  \
  834     struct type *b,                                             \
  835     struct type *c)                                             \
  836 {                                                               \
  837                                                                 \
  838         return _ck_ring_enqueue_sp(a, b, c,                     \
  839             sizeof(struct type), NULL);                         \
  840 }                                                               \
  841                                                                 \
  842 CK_CC_INLINE static bool                                        \
  843 ck_ring_trydequeue_spmc_##name(struct ck_ring *a,               \
  844     struct type *b,                                             \
  845     struct type *c)                                             \
  846 {                                                               \
  847                                                                 \
  848         return _ck_ring_trydequeue_mc(a,                        \
  849             b, c, sizeof(struct type));                         \
  850 }                                                               \
  851                                                                 \
  852 CK_CC_INLINE static bool                                        \
  853 ck_ring_dequeue_spmc_##name(struct ck_ring *a,                  \
  854     struct type *b,                                             \
  855     struct type *c)                                             \
  856 {                                                               \
  857                                                                 \
  858         return _ck_ring_dequeue_mc(a, b, c,                     \
  859             sizeof(struct type));                               \
  860 }                                                               \
  861                                                                 \
  862 CK_CC_INLINE static struct type *                               \
  863 ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a,          \
  864     struct type *b,                                             \
  865     unsigned int *c)                                            \
  866 {                                                               \
  867                                                                 \
  868         return _ck_ring_enqueue_reserve_mp(a, b,                \
  869             sizeof(struct type), c, NULL);                      \
  870 }                                                               \
  871                                                                 \
  872 CK_CC_INLINE static struct type *                               \
  873 ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a,     \
  874     struct type *b,                                             \
  875     unsigned int *c,                                            \
  876     unsigned int *d)                                            \
  877 {                                                               \
  878                                                                 \
  879         return _ck_ring_enqueue_reserve_mp(a, b,                \
  880             sizeof(struct type), c, d);                         \
  881 }                                                               \
  882                                                                 \
  883 CK_CC_INLINE static bool                                        \
  884 ck_ring_enqueue_mpsc_##name(struct ck_ring *a,                  \
  885     struct type *b,                                             \
  886     struct type *c)                                             \
  887 {                                                               \
  888                                                                 \
  889         return _ck_ring_enqueue_mp(a, b, c,                     \
  890             sizeof(struct type), NULL);                         \
  891 }                                                               \
  892                                                                 \
  893 CK_CC_INLINE static bool                                        \
  894 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,             \
  895     struct type *b,                                             \
  896     struct type *c,                                             \
  897     unsigned int *d)                                            \
  898 {                                                               \
  899                                                                 \
  900         return _ck_ring_enqueue_mp_size(a, b, c,                \
  901             sizeof(struct type), d);                            \
  902 }                                                               \
  903                                                                 \
  904 CK_CC_INLINE static bool                                        \
  905 ck_ring_dequeue_mpsc_##name(struct ck_ring *a,                  \
  906     struct type *b,                                             \
  907     struct type *c)                                             \
  908 {                                                               \
  909                                                                 \
  910         return _ck_ring_dequeue_sc(a, b, c,                     \
  911             sizeof(struct type));                               \
  912 }                                                               \
  913                                                                 \
  914 CK_CC_INLINE static struct type *                               \
  915 ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a,          \
  916     struct type *b,                                             \
  917     unsigned int *c)                                            \
  918 {                                                               \
  919                                                                 \
  920         return _ck_ring_enqueue_reserve_mp(a, b,                \
  921             sizeof(struct type), c, NULL);                      \
  922 }                                                               \
  923                                                                 \
  924 CK_CC_INLINE static struct type *                               \
  925 ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a,     \
  926     struct type *b,                                             \
  927     unsigned int *c,                                            \
  928     unsigned int *d)                                            \
  929 {                                                               \
  930                                                                 \
  931         return _ck_ring_enqueue_reserve_mp(a, b,                \
  932             sizeof(struct type), c, d);                         \
  933 }                                                               \
  934                                                                 \
  935 CK_CC_INLINE static bool                                        \
  936 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,             \
  937     struct type *b,                                             \
  938     struct type *c,                                             \
  939     unsigned int *d)                                            \
  940 {                                                               \
  941                                                                 \
  942         return _ck_ring_enqueue_mp_size(a, b, c,                \
  943             sizeof(struct type), d);                            \
  944 }                                                               \
  945                                                                 \
  946 CK_CC_INLINE static bool                                        \
  947 ck_ring_enqueue_mpmc_##name(struct ck_ring *a,                  \
  948     struct type *b,                                             \
  949     struct type *c)                                             \
  950 {                                                               \
  951                                                                 \
  952         return _ck_ring_enqueue_mp(a, b, c,                     \
  953             sizeof(struct type), NULL);                         \
  954 }                                                               \
  955                                                                 \
  956 CK_CC_INLINE static bool                                        \
  957 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,               \
  958     struct type *b,                                             \
  959     struct type *c)                                             \
  960 {                                                               \
  961                                                                 \
  962         return _ck_ring_trydequeue_mc(a,                        \
  963             b, c, sizeof(struct type));                         \
  964 }                                                               \
  965                                                                 \
  966 CK_CC_INLINE static bool                                        \
  967 ck_ring_dequeue_mpmc_##name(struct ck_ring *a,                  \
  968     struct type *b,                                             \
  969     struct type *c)                                             \
  970 {                                                               \
  971                                                                 \
  972         return _ck_ring_dequeue_mc(a, b, c,                     \
  973             sizeof(struct type));                               \
  974 }
  975 
  976 /*
  977  * A single producer with one concurrent consumer.
  978  */
  979 #define CK_RING_ENQUEUE_SPSC(name, a, b, c)                     \
  980         ck_ring_enqueue_spsc_##name(a, b, c)
  981 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)             \
  982         ck_ring_enqueue_spsc_size_##name(a, b, c, d)
  983 #define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c)             \
  984         ck_ring_enqueue_reserve_spsc_##name(a, b, c)
  985 #define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d)     \
  986         ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d)
  987 #define CK_RING_DEQUEUE_SPSC(name, a, b, c)                     \
  988         ck_ring_dequeue_spsc_##name(a, b, c)
  989 
  990 /*
  991  * A single producer with any number of concurrent consumers.
  992  */
  993 #define CK_RING_ENQUEUE_SPMC(name, a, b, c)                     \
  994         ck_ring_enqueue_spmc_##name(a, b, c)
  995 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)             \
  996         ck_ring_enqueue_spmc_size_##name(a, b, c, d)
  997 #define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c)             \
  998         ck_ring_enqueue_reserve_spmc_##name(a, b, c)
  999 #define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d)     \
 1000         ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d)
 1001 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)                  \
 1002         ck_ring_trydequeue_spmc_##name(a, b, c)
 1003 #define CK_RING_DEQUEUE_SPMC(name, a, b, c)                     \
 1004         ck_ring_dequeue_spmc_##name(a, b, c)
 1005 
 1006 /*
 1007  * Any number of concurrent producers with up to one
 1008  * concurrent consumer.
 1009  */
 1010 #define CK_RING_ENQUEUE_MPSC(name, a, b, c)                     \
 1011         ck_ring_enqueue_mpsc_##name(a, b, c)
 1012 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)             \
 1013         ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
 1014 #define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c)             \
 1015         ck_ring_enqueue_reserve_mpsc_##name(a, b, c)
 1016 #define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d)     \
 1017         ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d)
 1018 #define CK_RING_DEQUEUE_MPSC(name, a, b, c)                     \
 1019         ck_ring_dequeue_mpsc_##name(a, b, c)
 1020 
 1021 /*
 1022  * Any number of concurrent producers and consumers.
 1023  */
 1024 #define CK_RING_ENQUEUE_MPMC(name, a, b, c)                     \
 1025         ck_ring_enqueue_mpmc_##name(a, b, c)
 1026 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)             \
 1027         ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
 1028 #define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c)             \
 1029         ck_ring_enqueue_reserve_mpmc_##name(a, b, c)
 1030 #define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d)     \
 1031         ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d)
 1032 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)                  \
 1033         ck_ring_trydequeue_mpmc_##name(a, b, c)
 1034 #define CK_RING_DEQUEUE_MPMC(name, a, b, c)                     \
 1035         ck_ring_dequeue_mpmc_##name(a, b, c)
 1036 
 1037 #endif /* CK_RING_H */

Cache object: 8fed20e0670fbd595641a5f9f507622c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.