| 
     1 /*
    2  * CDDL HEADER START
    3  *
    4  * This file and its contents are supplied under the terms of the
    5  * Common Development and Distribution License ("CDDL"), version 1.0.
    6  * You may only use this file in accordance with the terms of version
    7  * 1.0 of the CDDL.
    8  *
    9  * A full copy of the text of the CDDL should have accompanied this
   10  * source.  A copy of the CDDL is also available via the Internet at
   11  * http://www.illumos.org/license/CDDL.
   12  *
   13  * CDDL HEADER END
   14  */
   15 /*
   16  * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
   17  */
   18 
   19 #include        <sys/bqueue.h>
   20 #include        <sys/zfs_context.h>
   21 
   22 static inline bqueue_node_t *
   23 obj2node(bqueue_t *q, void *data)
   24 {
   25         return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
   26 }
   27 
   28 /*
   29  * Initialize a blocking queue  The maximum capacity of the queue is set to
   30  * size.  Types that are stored in a bqueue must contain a bqueue_node_t, and
   31  * node_offset must be its offset from the start of the struct. fill_fraction
   32  * is a performance tuning value; when the queue is full, any threads
   33  * attempting to enqueue records will block.  They will block until they're
   34  * signaled, which will occur when the queue is at least 1/fill_fraction
   35  * empty.  Similar behavior occurs on dequeue; if the queue is empty, threads
   36  * block.  They will be signalled when the queue has 1/fill_fraction full.
   37  * As a result, you must call bqueue_enqueue_flush() when you enqueue your
   38  * final record on a thread, in case the dequeuing threads are currently
   39  * blocked and that enqueue does not cause them to be woken. Alternatively,
   40  * this behavior can be disabled (causing signaling to happen immediately) by
   41  * setting fill_fraction to any value larger than size. Return 0 on success,
   42  * or -1 on failure.
   43  *
   44  * Note: The caller must ensure that for a given bqueue_t, there's only a
   45  * single call to bqueue_enqueue() running at a time (e.g. by calling only
   46  * from a single thread, or with locking around the call). Similarly, the
   47  * caller must ensure that there's only a single call to bqueue_dequeue()
   48  * running at a time. However, the one call to bqueue_enqueue() may be
   49  * invoked concurrently with the one call to bqueue_dequeue().
   50  */
   51 int
   52 bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
   53 {
   54         if (fill_fraction == 0) {
   55                 return (-1);
   56         }
   57         list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
   58             node_offset + offsetof(bqueue_node_t, bqn_node));
   59         list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
   60             node_offset + offsetof(bqueue_node_t, bqn_node));
   61         list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
   62             node_offset + offsetof(bqueue_node_t, bqn_node));
   63         cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
   64         cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
   65         mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
   66         q->bq_node_offset = node_offset;
   67         q->bq_size = 0;
   68         q->bq_dequeuing_size = 0;
   69         q->bq_enqueuing_size = 0;
   70         q->bq_maxsize = size;
   71         q->bq_fill_fraction = fill_fraction;
   72         return (0);
   73 }
   74 
   75 /*
   76  * Destroy a blocking queue.  This function asserts that there are no
   77  * elements in the queue, and no one is blocked on the condition
   78  * variables.
   79  */
   80 void
   81 bqueue_destroy(bqueue_t *q)
   82 {
   83         mutex_enter(&q->bq_lock);
   84         ASSERT0(q->bq_size);
   85         ASSERT0(q->bq_dequeuing_size);
   86         ASSERT0(q->bq_enqueuing_size);
   87         cv_destroy(&q->bq_add_cv);
   88         cv_destroy(&q->bq_pop_cv);
   89         list_destroy(&q->bq_list);
   90         list_destroy(&q->bq_dequeuing_list);
   91         list_destroy(&q->bq_enqueuing_list);
   92         mutex_exit(&q->bq_lock);
   93         mutex_destroy(&q->bq_lock);
   94 }
   95 
   96 static void
   97 bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
   98 {
   99         ASSERT3U(item_size, >, 0);
  100         ASSERT3U(item_size, <=, q->bq_maxsize);
  101 
  102         obj2node(q, data)->bqn_size = item_size;
  103         q->bq_enqueuing_size += item_size;
  104         list_insert_tail(&q->bq_enqueuing_list, data);
  105 
  106         if (flush ||
  107             q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
  108                 /* Append the enquing list to the shared list. */
  109                 mutex_enter(&q->bq_lock);
  110                 while (q->bq_size > q->bq_maxsize) {
  111                         cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
  112                 }
  113                 q->bq_size += q->bq_enqueuing_size;
  114                 list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
  115                 q->bq_enqueuing_size = 0;
  116                 cv_broadcast(&q->bq_pop_cv);
  117                 mutex_exit(&q->bq_lock);
  118         }
  119 }
  120 
  121 /*
  122  * Add data to q, consuming size units of capacity.  If there is insufficient
  123  * capacity to consume size units, block until capacity exists.  Asserts size is
  124  * > 0.
  125  */
  126 void
  127 bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
  128 {
  129         bqueue_enqueue_impl(q, data, item_size, B_FALSE);
  130 }
  131 
  132 /*
  133  * Enqueue an entry, and then flush the queue.  This forces the popping threads
  134  * to wake up, even if we're below the fill fraction.  We have this in a single
  135  * function, rather than having a separate call, because it prevents race
  136  * conditions between the enqueuing thread and the dequeuing thread, where the
  137  * enqueueing thread will wake up the dequeuing thread, that thread will
  138  * destroy the condvar before the enqueuing thread is done.
  139  */
  140 void
  141 bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
  142 {
  143         bqueue_enqueue_impl(q, data, item_size, B_TRUE);
  144 }
  145 
  146 /*
  147  * Take the first element off of q.  If there are no elements on the queue, wait
  148  * until one is put there.  Return the removed element.
  149  */
  150 void *
  151 bqueue_dequeue(bqueue_t *q)
  152 {
  153         void *ret = list_remove_head(&q->bq_dequeuing_list);
  154         if (ret == NULL) {
  155                 /*
  156                  * Dequeuing list is empty.  Wait for there to be something on
  157                  * the shared list, then move the entire shared list to the
  158                  * dequeuing list.
  159                  */
  160                 mutex_enter(&q->bq_lock);
  161                 while (q->bq_size == 0) {
  162                         cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
  163                 }
  164                 ASSERT0(q->bq_dequeuing_size);
  165                 ASSERT(list_is_empty(&q->bq_dequeuing_list));
  166                 list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
  167                 q->bq_dequeuing_size = q->bq_size;
  168                 q->bq_size = 0;
  169                 cv_broadcast(&q->bq_add_cv);
  170                 mutex_exit(&q->bq_lock);
  171                 ret = list_remove_head(&q->bq_dequeuing_list);
  172         }
  173         q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
  174         return (ret);
  175 }
Cache object: 67fa50c5615917e408246282289847df 
 
 |