| 
     1 /*
    2  * CDDL HEADER START
    3  *
    4  * The contents of this file are subject to the terms of the
    5  * Common Development and Distribution License (the "License").
    6  * You may not use this file except in compliance with the License.
    7  *
    8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
    9  * or https://opensource.org/licenses/CDDL-1.0.
   10  * See the License for the specific language governing permissions
   11  * and limitations under the License.
   12  *
   13  * When distributing Covered Code, include this CDDL HEADER in each
   14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
   15  * If applicable, add the following below this CDDL HEADER, with the
   16  * fields enclosed by brackets "[]" replaced with your own identifying
   17  * information: Portions Copyright [yyyy] [name of copyright owner]
   18  *
   19  * CDDL HEADER END
   20  */
   21 /*
   22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
   23  * Portions Copyright 2011 Martin Matuska
   24  * Copyright (c) 2012, 2019 by Delphix. All rights reserved.
   25  */
   26 
   27 #include <sys/zfs_context.h>
   28 #include <sys/txg_impl.h>
   29 #include <sys/dmu_impl.h>
   30 #include <sys/spa_impl.h>
   31 #include <sys/dmu_tx.h>
   32 #include <sys/dsl_pool.h>
   33 #include <sys/dsl_scan.h>
   34 #include <sys/zil.h>
   35 #include <sys/callb.h>
   36 #include <sys/trace_zfs.h>
   37 
   38 /*
   39  * ZFS Transaction Groups
   40  * ----------------------
   41  *
   42  * ZFS transaction groups are, as the name implies, groups of transactions
   43  * that act on persistent state. ZFS asserts consistency at the granularity of
   44  * these transaction groups. Each successive transaction group (txg) is
   45  * assigned a 64-bit consecutive identifier. There are three active
   46  * transaction group states: open, quiescing, or syncing. At any given time,
   47  * there may be an active txg associated with each state; each active txg may
   48  * either be processing, or blocked waiting to enter the next state. There may
   49  * be up to three active txgs, and there is always a txg in the open state
   50  * (though it may be blocked waiting to enter the quiescing state). In broad
   51  * strokes, transactions -- operations that change in-memory structures -- are
   52  * accepted into the txg in the open state, and are completed while the txg is
   53  * in the open or quiescing states. The accumulated changes are written to
   54  * disk in the syncing state.
   55  *
   56  * Open
   57  *
   58  * When a new txg becomes active, it first enters the open state. New
   59  * transactions -- updates to in-memory structures -- are assigned to the
   60  * currently open txg. There is always a txg in the open state so that ZFS can
   61  * accept new changes (though the txg may refuse new changes if it has hit
   62  * some limit). ZFS advances the open txg to the next state for a variety of
   63  * reasons such as it hitting a time or size threshold, or the execution of an
   64  * administrative action that must be completed in the syncing state.
   65  *
   66  * Quiescing
   67  *
   68  * After a txg exits the open state, it enters the quiescing state. The
   69  * quiescing state is intended to provide a buffer between accepting new
   70  * transactions in the open state and writing them out to stable storage in
   71  * the syncing state. While quiescing, transactions can continue their
   72  * operation without delaying either of the other states. Typically, a txg is
   73  * in the quiescing state very briefly since the operations are bounded by
   74  * software latencies rather than, say, slower I/O latencies. After all
   75  * transactions complete, the txg is ready to enter the next state.
   76  *
   77  * Syncing
   78  *
   79  * In the syncing state, the in-memory state built up during the open and (to
   80  * a lesser degree) the quiescing states is written to stable storage. The
   81  * process of writing out modified data can, in turn modify more data. For
   82  * example when we write new blocks, we need to allocate space for them; those
   83  * allocations modify metadata (space maps)... which themselves must be
   84  * written to stable storage. During the sync state, ZFS iterates, writing out
   85  * data until it converges and all in-memory changes have been written out.
   86  * The first such pass is the largest as it encompasses all the modified user
   87  * data (as opposed to filesystem metadata). Subsequent passes typically have
   88  * far less data to write as they consist exclusively of filesystem metadata.
   89  *
   90  * To ensure convergence, after a certain number of passes ZFS begins
   91  * overwriting locations on stable storage that had been allocated earlier in
   92  * the syncing state (and subsequently freed). ZFS usually allocates new
   93  * blocks to optimize for large, continuous, writes. For the syncing state to
   94  * converge however it must complete a pass where no new blocks are allocated
   95  * since each allocation requires a modification of persistent metadata.
   96  * Further, to hasten convergence, after a prescribed number of passes, ZFS
   97  * also defers frees, and stops compressing.
   98  *
   99  * In addition to writing out user data, we must also execute synctasks during
  100  * the syncing context. A synctask is the mechanism by which some
  101  * administrative activities work such as creating and destroying snapshots or
  102  * datasets. Note that when a synctask is initiated it enters the open txg,
  103  * and ZFS then pushes that txg as quickly as possible to completion of the
  104  * syncing state in order to reduce the latency of the administrative
  105  * activity. To complete the syncing state, ZFS writes out a new uberblock,
  106  * the root of the tree of blocks that comprise all state stored on the ZFS
  107  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
  108  * now transition to the syncing state.
  109  */
  110 
  111 static __attribute__((noreturn)) void txg_sync_thread(void *arg);
  112 static __attribute__((noreturn)) void txg_quiesce_thread(void *arg);
  113 
  114 uint_t zfs_txg_timeout = 5;     /* max seconds worth of delta per txg */
  115 
  116 /*
  117  * Prepare the txg subsystem.
  118  */
  119 void
  120 txg_init(dsl_pool_t *dp, uint64_t txg)
  121 {
  122         tx_state_t *tx = &dp->dp_tx;
  123         int c;
  124         memset(tx, 0, sizeof (tx_state_t));
  125 
  126         tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
  127 
  128         for (c = 0; c < max_ncpus; c++) {
  129                 int i;
  130 
  131                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
  132                 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_NOLOCKDEP,
  133                     NULL);
  134                 for (i = 0; i < TXG_SIZE; i++) {
  135                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
  136                             NULL);
  137                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
  138                             sizeof (dmu_tx_callback_t),
  139                             offsetof(dmu_tx_callback_t, dcb_node));
  140                 }
  141         }
  142 
  143         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
  144 
  145         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
  146         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
  147         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
  148         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
  149         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
  150 
  151         tx->tx_open_txg = txg;
  152 }
  153 
  154 /*
  155  * Close down the txg subsystem.
  156  */
  157 void
  158 txg_fini(dsl_pool_t *dp)
  159 {
  160         tx_state_t *tx = &dp->dp_tx;
  161         int c;
  162 
  163         ASSERT0(tx->tx_threads);
  164 
  165         mutex_destroy(&tx->tx_sync_lock);
  166 
  167         cv_destroy(&tx->tx_sync_more_cv);
  168         cv_destroy(&tx->tx_sync_done_cv);
  169         cv_destroy(&tx->tx_quiesce_more_cv);
  170         cv_destroy(&tx->tx_quiesce_done_cv);
  171         cv_destroy(&tx->tx_exit_cv);
  172 
  173         for (c = 0; c < max_ncpus; c++) {
  174                 int i;
  175 
  176                 mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
  177                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
  178                 for (i = 0; i < TXG_SIZE; i++) {
  179                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
  180                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
  181                 }
  182         }
  183 
  184         if (tx->tx_commit_cb_taskq != NULL)
  185                 taskq_destroy(tx->tx_commit_cb_taskq);
  186 
  187         vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
  188 
  189         memset(tx, 0, sizeof (tx_state_t));
  190 }
  191 
  192 /*
  193  * Start syncing transaction groups.
  194  */
  195 void
  196 txg_sync_start(dsl_pool_t *dp)
  197 {
  198         tx_state_t *tx = &dp->dp_tx;
  199 
  200         mutex_enter(&tx->tx_sync_lock);
  201 
  202         dprintf("pool %p\n", dp);
  203 
  204         ASSERT0(tx->tx_threads);
  205 
  206         tx->tx_threads = 2;
  207 
  208         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
  209             dp, 0, &p0, TS_RUN, defclsyspri);
  210 
  211         /*
  212          * The sync thread can need a larger-than-default stack size on
  213          * 32-bit x86.  This is due in part to nested pools and
  214          * scrub_visitbp() recursion.
  215          */
  216         tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
  217             dp, 0, &p0, TS_RUN, defclsyspri);
  218 
  219         mutex_exit(&tx->tx_sync_lock);
  220 }
  221 
  222 static void
  223 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
  224 {
  225         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
  226         mutex_enter(&tx->tx_sync_lock);
  227 }
  228 
  229 static void
  230 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
  231 {
  232         ASSERT(*tpp != NULL);
  233         *tpp = NULL;
  234         tx->tx_threads--;
  235         cv_broadcast(&tx->tx_exit_cv);
  236         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
  237         thread_exit();
  238 }
  239 
  240 static void
  241 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
  242 {
  243         CALLB_CPR_SAFE_BEGIN(cpr);
  244 
  245         if (time) {
  246                 (void) cv_timedwait_idle(cv, &tx->tx_sync_lock,
  247                     ddi_get_lbolt() + time);
  248         } else {
  249                 cv_wait_idle(cv, &tx->tx_sync_lock);
  250         }
  251 
  252         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
  253 }
  254 
  255 /*
  256  * Stop syncing transaction groups.
  257  */
  258 void
  259 txg_sync_stop(dsl_pool_t *dp)
  260 {
  261         tx_state_t *tx = &dp->dp_tx;
  262 
  263         dprintf("pool %p\n", dp);
  264         /*
  265          * Finish off any work in progress.
  266          */
  267         ASSERT3U(tx->tx_threads, ==, 2);
  268 
  269         /*
  270          * We need to ensure that we've vacated the deferred metaslab trees.
  271          */
  272         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
  273 
  274         /*
  275          * Wake all sync threads and wait for them to die.
  276          */
  277         mutex_enter(&tx->tx_sync_lock);
  278 
  279         ASSERT3U(tx->tx_threads, ==, 2);
  280 
  281         tx->tx_exiting = 1;
  282 
  283         cv_broadcast(&tx->tx_quiesce_more_cv);
  284         cv_broadcast(&tx->tx_quiesce_done_cv);
  285         cv_broadcast(&tx->tx_sync_more_cv);
  286 
  287         while (tx->tx_threads != 0)
  288                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
  289 
  290         tx->tx_exiting = 0;
  291 
  292         mutex_exit(&tx->tx_sync_lock);
  293 }
  294 
  295 /*
  296  * Get a handle on the currently open txg and keep it open.
  297  *
  298  * The txg is guaranteed to stay open until txg_rele_to_quiesce() is called for
  299  * the handle. Once txg_rele_to_quiesce() has been called, the txg stays
  300  * in quiescing state until txg_rele_to_sync() is called for the handle.
  301  *
  302  * It is guaranteed that subsequent calls return monotonically increasing
  303  * txgs for the same dsl_pool_t. Of course this is not strong monotonicity,
  304  * because the same txg can be returned multiple times in a row. This
  305  * guarantee holds both for subsequent calls from one thread and for multiple
  306  * threads. For example, it is impossible to observe the following sequence
  307  * of events:
  308  *
  309  *           Thread 1                            Thread 2
  310  *
  311  *   1 <- txg_hold_open(P, ...)
  312  *                                       2 <- txg_hold_open(P, ...)
  313  *   1 <- txg_hold_open(P, ...)
  314  *
  315  */
  316 uint64_t
  317 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
  318 {
  319         tx_state_t *tx = &dp->dp_tx;
  320         tx_cpu_t *tc;
  321         uint64_t txg;
  322 
  323         /*
  324          * It appears the processor id is simply used as a "random"
  325          * number to index into the array, and there isn't any other
  326          * significance to the chosen tx_cpu. Because.. Why not use
  327          * the current cpu to index into the array?
  328          */
  329         tc = &tx->tx_cpu[CPU_SEQID_UNSTABLE];
  330 
  331         mutex_enter(&tc->tc_open_lock);
  332         txg = tx->tx_open_txg;
  333 
  334         mutex_enter(&tc->tc_lock);
  335         tc->tc_count[txg & TXG_MASK]++;
  336         mutex_exit(&tc->tc_lock);
  337 
  338         th->th_cpu = tc;
  339         th->th_txg = txg;
  340 
  341         return (txg);
  342 }
  343 
  344 void
  345 txg_rele_to_quiesce(txg_handle_t *th)
  346 {
  347         tx_cpu_t *tc = th->th_cpu;
  348 
  349         ASSERT(!MUTEX_HELD(&tc->tc_lock));
  350         mutex_exit(&tc->tc_open_lock);
  351 }
  352 
  353 void
  354 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
  355 {
  356         tx_cpu_t *tc = th->th_cpu;
  357         int g = th->th_txg & TXG_MASK;
  358 
  359         mutex_enter(&tc->tc_lock);
  360         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
  361         mutex_exit(&tc->tc_lock);
  362 }
  363 
  364 void
  365 txg_rele_to_sync(txg_handle_t *th)
  366 {
  367         tx_cpu_t *tc = th->th_cpu;
  368         int g = th->th_txg & TXG_MASK;
  369 
  370         mutex_enter(&tc->tc_lock);
  371         ASSERT(tc->tc_count[g] != 0);
  372         if (--tc->tc_count[g] == 0)
  373                 cv_broadcast(&tc->tc_cv[g]);
  374         mutex_exit(&tc->tc_lock);
  375 
  376         th->th_cpu = NULL;      /* defensive */
  377 }
  378 
  379 /*
  380  * Blocks until all transactions in the group are committed.
  381  *
  382  * On return, the transaction group has reached a stable state in which it can
  383  * then be passed off to the syncing context.
  384  */
  385 static void
  386 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
  387 {
  388         tx_state_t *tx = &dp->dp_tx;
  389         uint64_t tx_open_time;
  390         int g = txg & TXG_MASK;
  391         int c;
  392 
  393         /*
  394          * Grab all tc_open_locks so nobody else can get into this txg.
  395          */
  396         for (c = 0; c < max_ncpus; c++)
  397                 mutex_enter(&tx->tx_cpu[c].tc_open_lock);
  398 
  399         ASSERT(txg == tx->tx_open_txg);
  400         tx->tx_open_txg++;
  401         tx->tx_open_time = tx_open_time = gethrtime();
  402 
  403         DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
  404         DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
  405 
  406         /*
  407          * Now that we've incremented tx_open_txg, we can let threads
  408          * enter the next transaction group.
  409          */
  410         for (c = 0; c < max_ncpus; c++)
  411                 mutex_exit(&tx->tx_cpu[c].tc_open_lock);
  412 
  413         spa_txg_history_set(dp->dp_spa, txg, TXG_STATE_OPEN, tx_open_time);
  414         spa_txg_history_add(dp->dp_spa, txg + 1, tx_open_time);
  415 
  416         /*
  417          * Quiesce the transaction group by waiting for everyone to
  418          * call txg_rele_to_sync() for their open transaction handles.
  419          */
  420         for (c = 0; c < max_ncpus; c++) {
  421                 tx_cpu_t *tc = &tx->tx_cpu[c];
  422                 mutex_enter(&tc->tc_lock);
  423                 while (tc->tc_count[g] != 0)
  424                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
  425                 mutex_exit(&tc->tc_lock);
  426         }
  427 
  428         spa_txg_history_set(dp->dp_spa, txg, TXG_STATE_QUIESCED, gethrtime());
  429 }
  430 
  431 static void
  432 txg_do_callbacks(void *cb_list)
  433 {
  434         dmu_tx_do_callbacks(cb_list, 0);
  435 
  436         list_destroy(cb_list);
  437 
  438         kmem_free(cb_list, sizeof (list_t));
  439 }
  440 
  441 /*
  442  * Dispatch the commit callbacks registered on this txg to worker threads.
  443  *
  444  * If no callbacks are registered for a given TXG, nothing happens.
  445  * This function creates a taskq for the associated pool, if needed.
  446  */
  447 static void
  448 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
  449 {
  450         int c;
  451         tx_state_t *tx = &dp->dp_tx;
  452         list_t *cb_list;
  453 
  454         for (c = 0; c < max_ncpus; c++) {
  455                 tx_cpu_t *tc = &tx->tx_cpu[c];
  456                 /*
  457                  * No need to lock tx_cpu_t at this point, since this can
  458                  * only be called once a txg has been synced.
  459                  */
  460 
  461                 int g = txg & TXG_MASK;
  462 
  463                 if (list_is_empty(&tc->tc_callbacks[g]))
  464                         continue;
  465 
  466                 if (tx->tx_commit_cb_taskq == NULL) {
  467                         /*
  468                          * Commit callback taskq hasn't been created yet.
  469                          */
  470                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
  471                             100, defclsyspri, boot_ncpus, boot_ncpus * 2,
  472                             TASKQ_PREPOPULATE | TASKQ_DYNAMIC |
  473                             TASKQ_THREADS_CPU_PCT);
  474                 }
  475 
  476                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
  477                 list_create(cb_list, sizeof (dmu_tx_callback_t),
  478                     offsetof(dmu_tx_callback_t, dcb_node));
  479 
  480                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
  481 
  482                 (void) taskq_dispatch(tx->tx_commit_cb_taskq,
  483                     txg_do_callbacks, cb_list, TQ_SLEEP);
  484         }
  485 }
  486 
  487 /*
  488  * Wait for pending commit callbacks of already-synced transactions to finish
  489  * processing.
  490  * Calling this function from within a commit callback will deadlock.
  491  */
  492 void
  493 txg_wait_callbacks(dsl_pool_t *dp)
  494 {
  495         tx_state_t *tx = &dp->dp_tx;
  496 
  497         if (tx->tx_commit_cb_taskq != NULL)
  498                 taskq_wait_outstanding(tx->tx_commit_cb_taskq, 0);
  499 }
  500 
  501 static boolean_t
  502 txg_is_quiescing(dsl_pool_t *dp)
  503 {
  504         tx_state_t *tx = &dp->dp_tx;
  505         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
  506         return (tx->tx_quiescing_txg != 0);
  507 }
  508 
  509 static boolean_t
  510 txg_has_quiesced_to_sync(dsl_pool_t *dp)
  511 {
  512         tx_state_t *tx = &dp->dp_tx;
  513         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
  514         return (tx->tx_quiesced_txg != 0);
  515 }
  516 
  517 static __attribute__((noreturn)) void
  518 txg_sync_thread(void *arg)
  519 {
  520         dsl_pool_t *dp = arg;
  521         spa_t *spa = dp->dp_spa;
  522         tx_state_t *tx = &dp->dp_tx;
  523         callb_cpr_t cpr;
  524         clock_t start, delta;
  525 
  526         (void) spl_fstrans_mark();
  527         txg_thread_enter(tx, &cpr);
  528 
  529         start = delta = 0;
  530         for (;;) {
  531                 clock_t timeout = zfs_txg_timeout * hz;
  532                 clock_t timer;
  533                 uint64_t txg;
  534 
  535                 /*
  536                  * We sync when we're scanning, there's someone waiting
  537                  * on us, or the quiesce thread has handed off a txg to
  538                  * us, or we have reached our timeout.
  539                  */
  540                 timer = (delta >= timeout ? 0 : timeout - delta);
  541                 while (!dsl_scan_active(dp->dp_scan) &&
  542                     !tx->tx_exiting && timer > 0 &&
  543                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
  544                     !txg_has_quiesced_to_sync(dp)) {
  545                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
  546                             (u_longlong_t)tx->tx_synced_txg,
  547                             (u_longlong_t)tx->tx_sync_txg_waiting, dp);
  548                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
  549                         delta = ddi_get_lbolt() - start;
  550                         timer = (delta > timeout ? 0 : timeout - delta);
  551                 }
  552 
  553                 /*
  554                  * Wait until the quiesce thread hands off a txg to us,
  555                  * prompting it to do so if necessary.
  556                  */
  557                 while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) {
  558                         if (txg_is_quiescing(dp)) {
  559                                 txg_thread_wait(tx, &cpr,
  560                                     &tx->tx_quiesce_done_cv, 0);
  561                                 continue;
  562                         }
  563                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
  564                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
  565                         cv_broadcast(&tx->tx_quiesce_more_cv);
  566                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
  567                 }
  568 
  569                 if (tx->tx_exiting)
  570                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
  571 
  572                 /*
  573                  * Consume the quiesced txg which has been handed off to
  574                  * us.  This may cause the quiescing thread to now be
  575                  * able to quiesce another txg, so we must signal it.
  576                  */
  577                 ASSERT(tx->tx_quiesced_txg != 0);
  578                 txg = tx->tx_quiesced_txg;
  579                 tx->tx_quiesced_txg = 0;
  580                 tx->tx_syncing_txg = txg;
  581                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
  582                 cv_broadcast(&tx->tx_quiesce_more_cv);
  583 
  584                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
  585                     (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting,
  586                     (u_longlong_t)tx->tx_sync_txg_waiting);
  587                 mutex_exit(&tx->tx_sync_lock);
  588 
  589                 txg_stat_t *ts = spa_txg_history_init_io(spa, txg, dp);
  590                 start = ddi_get_lbolt();
  591                 spa_sync(spa, txg);
  592                 delta = ddi_get_lbolt() - start;
  593                 spa_txg_history_fini_io(spa, ts);
  594 
  595                 mutex_enter(&tx->tx_sync_lock);
  596                 tx->tx_synced_txg = txg;
  597                 tx->tx_syncing_txg = 0;
  598                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
  599                 cv_broadcast(&tx->tx_sync_done_cv);
  600 
  601                 /*
  602                  * Dispatch commit callbacks to worker threads.
  603                  */
  604                 txg_dispatch_callbacks(dp, txg);
  605         }
  606 }
  607 
  608 static __attribute__((noreturn)) void
  609 txg_quiesce_thread(void *arg)
  610 {
  611         dsl_pool_t *dp = arg;
  612         tx_state_t *tx = &dp->dp_tx;
  613         callb_cpr_t cpr;
  614 
  615         txg_thread_enter(tx, &cpr);
  616 
  617         for (;;) {
  618                 uint64_t txg;
  619 
  620                 /*
  621                  * We quiesce when there's someone waiting on us.
  622                  * However, we can only have one txg in "quiescing" or
  623                  * "quiesced, waiting to sync" state.  So we wait until
  624                  * the "quiesced, waiting to sync" txg has been consumed
  625                  * by the sync thread.
  626                  */
  627                 while (!tx->tx_exiting &&
  628                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
  629                     txg_has_quiesced_to_sync(dp)))
  630                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
  631 
  632                 if (tx->tx_exiting)
  633                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
  634 
  635                 txg = tx->tx_open_txg;
  636                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
  637                     (u_longlong_t)txg,
  638                     (u_longlong_t)tx->tx_quiesce_txg_waiting,
  639                     (u_longlong_t)tx->tx_sync_txg_waiting);
  640                 tx->tx_quiescing_txg = txg;
  641 
  642                 mutex_exit(&tx->tx_sync_lock);
  643                 txg_quiesce(dp, txg);
  644                 mutex_enter(&tx->tx_sync_lock);
  645 
  646                 /*
  647                  * Hand this txg off to the sync thread.
  648                  */
  649                 dprintf("quiesce done, handing off txg %llu\n",
  650                     (u_longlong_t)txg);
  651                 tx->tx_quiescing_txg = 0;
  652                 tx->tx_quiesced_txg = txg;
  653                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
  654                 cv_broadcast(&tx->tx_sync_more_cv);
  655                 cv_broadcast(&tx->tx_quiesce_done_cv);
  656         }
  657 }
  658 
  659 /*
  660  * Delay this thread by delay nanoseconds if we are still in the open
  661  * transaction group and there is already a waiting txg quiescing or quiesced.
  662  * Abort the delay if this txg stalls or enters the quiescing state.
  663  */
  664 void
  665 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
  666 {
  667         tx_state_t *tx = &dp->dp_tx;
  668         hrtime_t start = gethrtime();
  669 
  670         /* don't delay if this txg could transition to quiescing immediately */
  671         if (tx->tx_open_txg > txg ||
  672             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
  673                 return;
  674 
  675         mutex_enter(&tx->tx_sync_lock);
  676         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
  677                 mutex_exit(&tx->tx_sync_lock);
  678                 return;
  679         }
  680 
  681         while (gethrtime() - start < delay &&
  682             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
  683                 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
  684                     &tx->tx_sync_lock, delay, resolution, 0);
  685         }
  686 
  687         DMU_TX_STAT_BUMP(dmu_tx_delay);
  688 
  689         mutex_exit(&tx->tx_sync_lock);
  690 }
  691 
  692 static boolean_t
  693 txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig)
  694 {
  695         tx_state_t *tx = &dp->dp_tx;
  696 
  697         ASSERT(!dsl_pool_config_held(dp));
  698 
  699         mutex_enter(&tx->tx_sync_lock);
  700         ASSERT3U(tx->tx_threads, ==, 2);
  701         if (txg == 0)
  702                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
  703         if (tx->tx_sync_txg_waiting < txg)
  704                 tx->tx_sync_txg_waiting = txg;
  705         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
  706             (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting,
  707             (u_longlong_t)tx->tx_sync_txg_waiting);
  708         while (tx->tx_synced_txg < txg) {
  709                 dprintf("broadcasting sync more "
  710                     "tx_synced=%llu waiting=%llu dp=%px\n",
  711                     (u_longlong_t)tx->tx_synced_txg,
  712                     (u_longlong_t)tx->tx_sync_txg_waiting, dp);
  713                 cv_broadcast(&tx->tx_sync_more_cv);
  714                 if (wait_sig) {
  715                         /*
  716                          * Condition wait here but stop if the thread receives a
  717                          * signal. The caller may call txg_wait_synced*() again
  718                          * to resume waiting for this txg.
  719                          */
  720                         if (cv_wait_io_sig(&tx->tx_sync_done_cv,
  721                             &tx->tx_sync_lock) == 0) {
  722                                 mutex_exit(&tx->tx_sync_lock);
  723                                 return (B_TRUE);
  724                         }
  725                 } else {
  726                         cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
  727                 }
  728         }
  729         mutex_exit(&tx->tx_sync_lock);
  730         return (B_FALSE);
  731 }
  732 
  733 void
  734 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
  735 {
  736         VERIFY0(txg_wait_synced_impl(dp, txg, B_FALSE));
  737 }
  738 
  739 /*
  740  * Similar to a txg_wait_synced but it can be interrupted from a signal.
  741  * Returns B_TRUE if the thread was signaled while waiting.
  742  */
  743 boolean_t
  744 txg_wait_synced_sig(dsl_pool_t *dp, uint64_t txg)
  745 {
  746         return (txg_wait_synced_impl(dp, txg, B_TRUE));
  747 }
  748 
  749 /*
  750  * Wait for the specified open transaction group.  Set should_quiesce
  751  * when the current open txg should be quiesced immediately.
  752  */
  753 void
  754 txg_wait_open(dsl_pool_t *dp, uint64_t txg, boolean_t should_quiesce)
  755 {
  756         tx_state_t *tx = &dp->dp_tx;
  757 
  758         ASSERT(!dsl_pool_config_held(dp));
  759 
  760         mutex_enter(&tx->tx_sync_lock);
  761         ASSERT3U(tx->tx_threads, ==, 2);
  762         if (txg == 0)
  763                 txg = tx->tx_open_txg + 1;
  764         if (tx->tx_quiesce_txg_waiting < txg && should_quiesce)
  765                 tx->tx_quiesce_txg_waiting = txg;
  766         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
  767             (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting,
  768             (u_longlong_t)tx->tx_sync_txg_waiting);
  769         while (tx->tx_open_txg < txg) {
  770                 cv_broadcast(&tx->tx_quiesce_more_cv);
  771                 /*
  772                  * Callers setting should_quiesce will use cv_wait_io() and
  773                  * be accounted for as iowait time.  Otherwise, the caller is
  774                  * understood to be idle and cv_wait_sig() is used to prevent
  775                  * incorrectly inflating the system load average.
  776                  */
  777                 if (should_quiesce == B_TRUE) {
  778                         cv_wait_io(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
  779                 } else {
  780                         cv_wait_idle(&tx->tx_quiesce_done_cv,
  781                             &tx->tx_sync_lock);
  782                 }
  783         }
  784         mutex_exit(&tx->tx_sync_lock);
  785 }
  786 
  787 /*
  788  * Pass in the txg number that should be synced.
  789  */
  790 void
  791 txg_kick(dsl_pool_t *dp, uint64_t txg)
  792 {
  793         tx_state_t *tx = &dp->dp_tx;
  794 
  795         ASSERT(!dsl_pool_config_held(dp));
  796 
  797         if (tx->tx_sync_txg_waiting >= txg)
  798                 return;
  799 
  800         mutex_enter(&tx->tx_sync_lock);
  801         if (tx->tx_sync_txg_waiting < txg) {
  802                 tx->tx_sync_txg_waiting = txg;
  803                 cv_broadcast(&tx->tx_sync_more_cv);
  804         }
  805         mutex_exit(&tx->tx_sync_lock);
  806 }
  807 
  808 boolean_t
  809 txg_stalled(dsl_pool_t *dp)
  810 {
  811         tx_state_t *tx = &dp->dp_tx;
  812         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
  813 }
  814 
  815 boolean_t
  816 txg_sync_waiting(dsl_pool_t *dp)
  817 {
  818         tx_state_t *tx = &dp->dp_tx;
  819 
  820         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
  821             tx->tx_quiesced_txg != 0);
  822 }
  823 
  824 /*
  825  * Verify that this txg is active (open, quiescing, syncing).  Non-active
  826  * txg's should not be manipulated.
  827  */
  828 #ifdef ZFS_DEBUG
  829 void
  830 txg_verify(spa_t *spa, uint64_t txg)
  831 {
  832         dsl_pool_t *dp __maybe_unused = spa_get_dsl(spa);
  833         if (txg <= TXG_INITIAL || txg == ZILTEST_TXG)
  834                 return;
  835         ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
  836         ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg);
  837         ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES);
  838 }
  839 #endif
  840 
  841 /*
  842  * Per-txg object lists.
  843  */
  844 void
  845 txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset)
  846 {
  847         int t;
  848 
  849         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
  850 
  851         tl->tl_offset = offset;
  852         tl->tl_spa = spa;
  853 
  854         for (t = 0; t < TXG_SIZE; t++)
  855                 tl->tl_head[t] = NULL;
  856 }
  857 
  858 static boolean_t
  859 txg_list_empty_impl(txg_list_t *tl, uint64_t txg)
  860 {
  861         ASSERT(MUTEX_HELD(&tl->tl_lock));
  862         TXG_VERIFY(tl->tl_spa, txg);
  863         return (tl->tl_head[txg & TXG_MASK] == NULL);
  864 }
  865 
  866 boolean_t
  867 txg_list_empty(txg_list_t *tl, uint64_t txg)
  868 {
  869         mutex_enter(&tl->tl_lock);
  870         boolean_t ret = txg_list_empty_impl(tl, txg);
  871         mutex_exit(&tl->tl_lock);
  872 
  873         return (ret);
  874 }
  875 
  876 void
  877 txg_list_destroy(txg_list_t *tl)
  878 {
  879         int t;
  880 
  881         mutex_enter(&tl->tl_lock);
  882         for (t = 0; t < TXG_SIZE; t++)
  883                 ASSERT(txg_list_empty_impl(tl, t));
  884         mutex_exit(&tl->tl_lock);
  885 
  886         mutex_destroy(&tl->tl_lock);
  887 }
  888 
  889 /*
  890  * Returns true if all txg lists are empty.
  891  *
  892  * Warning: this is inherently racy (an item could be added immediately
  893  * after this function returns).
  894  */
  895 boolean_t
  896 txg_all_lists_empty(txg_list_t *tl)
  897 {
  898         mutex_enter(&tl->tl_lock);
  899         for (int i = 0; i < TXG_SIZE; i++) {
  900                 if (!txg_list_empty_impl(tl, i)) {
  901                         mutex_exit(&tl->tl_lock);
  902                         return (B_FALSE);
  903                 }
  904         }
  905         mutex_exit(&tl->tl_lock);
  906         return (B_TRUE);
  907 }
  908 
  909 /*
  910  * Add an entry to the list (unless it's already on the list).
  911  * Returns B_TRUE if it was actually added.
  912  */
  913 boolean_t
  914 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
  915 {
  916         int t = txg & TXG_MASK;
  917         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
  918         boolean_t add;
  919 
  920         TXG_VERIFY(tl->tl_spa, txg);
  921         mutex_enter(&tl->tl_lock);
  922         add = (tn->tn_member[t] == 0);
  923         if (add) {
  924                 tn->tn_member[t] = 1;
  925                 tn->tn_next[t] = tl->tl_head[t];
  926                 tl->tl_head[t] = tn;
  927         }
  928         mutex_exit(&tl->tl_lock);
  929 
  930         return (add);
  931 }
  932 
  933 /*
  934  * Add an entry to the end of the list, unless it's already on the list.
  935  * (walks list to find end)
  936  * Returns B_TRUE if it was actually added.
  937  */
  938 boolean_t
  939 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
  940 {
  941         int t = txg & TXG_MASK;
  942         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
  943         boolean_t add;
  944 
  945         TXG_VERIFY(tl->tl_spa, txg);
  946         mutex_enter(&tl->tl_lock);
  947         add = (tn->tn_member[t] == 0);
  948         if (add) {
  949                 txg_node_t **tp;
  950 
  951                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
  952                         continue;
  953 
  954                 tn->tn_member[t] = 1;
  955                 tn->tn_next[t] = NULL;
  956                 *tp = tn;
  957         }
  958         mutex_exit(&tl->tl_lock);
  959 
  960         return (add);
  961 }
  962 
  963 /*
  964  * Remove the head of the list and return it.
  965  */
  966 void *
  967 txg_list_remove(txg_list_t *tl, uint64_t txg)
  968 {
  969         int t = txg & TXG_MASK;
  970         txg_node_t *tn;
  971         void *p = NULL;
  972 
  973         TXG_VERIFY(tl->tl_spa, txg);
  974         mutex_enter(&tl->tl_lock);
  975         if ((tn = tl->tl_head[t]) != NULL) {
  976                 ASSERT(tn->tn_member[t]);
  977                 ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]);
  978                 p = (char *)tn - tl->tl_offset;
  979                 tl->tl_head[t] = tn->tn_next[t];
  980                 tn->tn_next[t] = NULL;
  981                 tn->tn_member[t] = 0;
  982         }
  983         mutex_exit(&tl->tl_lock);
  984 
  985         return (p);
  986 }
  987 
  988 /*
  989  * Remove a specific item from the list and return it.
  990  */
  991 void *
  992 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
  993 {
  994         int t = txg & TXG_MASK;
  995         txg_node_t *tn, **tp;
  996 
  997         TXG_VERIFY(tl->tl_spa, txg);
  998         mutex_enter(&tl->tl_lock);
  999 
 1000         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
 1001                 if ((char *)tn - tl->tl_offset == p) {
 1002                         *tp = tn->tn_next[t];
 1003                         tn->tn_next[t] = NULL;
 1004                         tn->tn_member[t] = 0;
 1005                         mutex_exit(&tl->tl_lock);
 1006                         return (p);
 1007                 }
 1008         }
 1009 
 1010         mutex_exit(&tl->tl_lock);
 1011 
 1012         return (NULL);
 1013 }
 1014 
 1015 boolean_t
 1016 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
 1017 {
 1018         int t = txg & TXG_MASK;
 1019         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 1020 
 1021         TXG_VERIFY(tl->tl_spa, txg);
 1022         return (tn->tn_member[t] != 0);
 1023 }
 1024 
 1025 /*
 1026  * Walk a txg list
 1027  */
 1028 void *
 1029 txg_list_head(txg_list_t *tl, uint64_t txg)
 1030 {
 1031         int t = txg & TXG_MASK;
 1032         txg_node_t *tn;
 1033 
 1034         mutex_enter(&tl->tl_lock);
 1035         tn = tl->tl_head[t];
 1036         mutex_exit(&tl->tl_lock);
 1037 
 1038         TXG_VERIFY(tl->tl_spa, txg);
 1039         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 1040 }
 1041 
 1042 void *
 1043 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 1044 {
 1045         int t = txg & TXG_MASK;
 1046         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 1047 
 1048         TXG_VERIFY(tl->tl_spa, txg);
 1049 
 1050         mutex_enter(&tl->tl_lock);
 1051         tn = tn->tn_next[t];
 1052         mutex_exit(&tl->tl_lock);
 1053 
 1054         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 1055 }
 1056 
 1057 EXPORT_SYMBOL(txg_init);
 1058 EXPORT_SYMBOL(txg_fini);
 1059 EXPORT_SYMBOL(txg_sync_start);
 1060 EXPORT_SYMBOL(txg_sync_stop);
 1061 EXPORT_SYMBOL(txg_hold_open);
 1062 EXPORT_SYMBOL(txg_rele_to_quiesce);
 1063 EXPORT_SYMBOL(txg_rele_to_sync);
 1064 EXPORT_SYMBOL(txg_register_callbacks);
 1065 EXPORT_SYMBOL(txg_delay);
 1066 EXPORT_SYMBOL(txg_wait_synced);
 1067 EXPORT_SYMBOL(txg_wait_open);
 1068 EXPORT_SYMBOL(txg_wait_callbacks);
 1069 EXPORT_SYMBOL(txg_stalled);
 1070 EXPORT_SYMBOL(txg_sync_waiting);
 1071 
 1072 ZFS_MODULE_PARAM(zfs_txg, zfs_txg_, timeout, UINT, ZMOD_RW,
 1073         "Max seconds worth of delta per txg");
Cache object: fdc29c686d402d5e94d368f37a452461 
 
 |