The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System, Second Edition

[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/common/fs/zfs/zil.c

Version: -  FREEBSD  -  FREEBSD10  -  FREEBSD9  -  FREEBSD92  -  FREEBSD91  -  FREEBSD90  -  FREEBSD8  -  FREEBSD82  -  FREEBSD81  -  FREEBSD80  -  FREEBSD7  -  FREEBSD74  -  FREEBSD73  -  FREEBSD72  -  FREEBSD71  -  FREEBSD70  -  FREEBSD6  -  FREEBSD64  -  FREEBSD63  -  FREEBSD62  -  FREEBSD61  -  FREEBSD60  -  FREEBSD5  -  FREEBSD55  -  FREEBSD54  -  FREEBSD53  -  FREEBSD52  -  FREEBSD51  -  FREEBSD50  -  FREEBSD4  -  FREEBSD3  -  FREEBSD22  -  linux-2.6  -  linux-2.4.22  -  MK83  -  MK84  -  PLAN9  -  DFBSD  -  NETBSD  -  NETBSD5  -  NETBSD4  -  NETBSD3  -  NETBSD20  -  OPENBSD  -  xnu-517  -  xnu-792  -  xnu-792.6.70  -  xnu-1228  -  xnu-1456.1.26  -  xnu-1699.24.8  -  xnu-2050.18.24  -  OPENSOLARIS  -  minix-3-1-1 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * CDDL HEADER START
    3  *
    4  * The contents of this file are subject to the terms of the
    5  * Common Development and Distribution License (the "License").
    6  * You may not use this file except in compliance with the License.
    7  *
    8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
    9  * or http://www.opensolaris.org/os/licensing.
   10  * See the License for the specific language governing permissions
   11  * and limitations under the License.
   12  *
   13  * When distributing Covered Code, include this CDDL HEADER in each
   14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
   15  * If applicable, add the following below this CDDL HEADER, with the
   16  * fields enclosed by brackets "[]" replaced with your own identifying
   17  * information: Portions Copyright [yyyy] [name of copyright owner]
   18  *
   19  * CDDL HEADER END
   20  */
   21 /*
   22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
   23  */
   24 
   25 /* Portions Copyright 2010 Robert Milkowski */
   26 
   27 #include <sys/zfs_context.h>
   28 #include <sys/spa.h>
   29 #include <sys/dmu.h>
   30 #include <sys/zap.h>
   31 #include <sys/arc.h>
   32 #include <sys/stat.h>
   33 #include <sys/resource.h>
   34 #include <sys/zil.h>
   35 #include <sys/zil_impl.h>
   36 #include <sys/dsl_dataset.h>
   37 #include <sys/vdev_impl.h>
   38 #include <sys/dmu_tx.h>
   39 #include <sys/dsl_pool.h>
   40 
   41 /*
   42  * The zfs intent log (ZIL) saves transaction records of system calls
   43  * that change the file system in memory with enough information
   44  * to be able to replay them. These are stored in memory until
   45  * either the DMU transaction group (txg) commits them to the stable pool
   46  * and they can be discarded, or they are flushed to the stable log
   47  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
   48  * requirement. In the event of a panic or power fail then those log
   49  * records (transactions) are replayed.
   50  *
   51  * There is one ZIL per file system. Its on-disk (pool) format consists
   52  * of 3 parts:
   53  *
   54  *      - ZIL header
   55  *      - ZIL blocks
   56  *      - ZIL records
   57  *
   58  * A log record holds a system call transaction. Log blocks can
   59  * hold many log records and the blocks are chained together.
   60  * Each ZIL block contains a block pointer (blkptr_t) to the next
   61  * ZIL block in the chain. The ZIL header points to the first
   62  * block in the chain. Note there is not a fixed place in the pool
   63  * to hold blocks. They are dynamically allocated and freed as
   64  * needed from the blocks available. Figure X shows the ZIL structure:
   65  */
   66 
   67 /*
   68  * This global ZIL switch affects all pools
   69  */
   70 int zil_replay_disable = 0;    /* disable intent logging replay */
   71 
   72 /*
   73  * Tunable parameter for debugging or performance analysis.  Setting
   74  * zfs_nocacheflush will cause corruption on power loss if a volatile
   75  * out-of-order write cache is enabled.
   76  */
   77 boolean_t zfs_nocacheflush = B_FALSE;
   78 
   79 static kmem_cache_t *zil_lwb_cache;
   80 
   81 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
   82 
   83 #define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
   84     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
   85 
   86 
   87 /*
   88  * ziltest is by and large an ugly hack, but very useful in
   89  * checking replay without tedious work.
   90  * When running ziltest we want to keep all itx's and so maintain
   91  * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
   92  * We subtract TXG_CONCURRENT_STATES to allow for common code.
   93  */
   94 #define ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
   95 
   96 static int
   97 zil_bp_compare(const void *x1, const void *x2)
   98 {
   99         const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
  100         const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
  101 
  102         if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
  103                 return (-1);
  104         if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
  105                 return (1);
  106 
  107         if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
  108                 return (-1);
  109         if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
  110                 return (1);
  111 
  112         return (0);
  113 }
  114 
  115 static void
  116 zil_bp_tree_init(zilog_t *zilog)
  117 {
  118         avl_create(&zilog->zl_bp_tree, zil_bp_compare,
  119             sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
  120 }
  121 
  122 static void
  123 zil_bp_tree_fini(zilog_t *zilog)
  124 {
  125         avl_tree_t *t = &zilog->zl_bp_tree;
  126         zil_bp_node_t *zn;
  127         void *cookie = NULL;
  128 
  129         while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
  130                 kmem_free(zn, sizeof (zil_bp_node_t));
  131 
  132         avl_destroy(t);
  133 }
  134 
  135 int
  136 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
  137 {
  138         avl_tree_t *t = &zilog->zl_bp_tree;
  139         const dva_t *dva = BP_IDENTITY(bp);
  140         zil_bp_node_t *zn;
  141         avl_index_t where;
  142 
  143         if (avl_find(t, dva, &where) != NULL)
  144                 return (EEXIST);
  145 
  146         zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
  147         zn->zn_dva = *dva;
  148         avl_insert(t, zn, where);
  149 
  150         return (0);
  151 }
  152 
  153 static zil_header_t *
  154 zil_header_in_syncing_context(zilog_t *zilog)
  155 {
  156         return ((zil_header_t *)zilog->zl_header);
  157 }
  158 
  159 static void
  160 zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
  161 {
  162         zio_cksum_t *zc = &bp->blk_cksum;
  163 
  164         zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
  165         zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
  166         zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
  167         zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
  168 }
  169 
  170 /*
  171  * Read a log block and make sure it's valid.
  172  */
  173 static int
  174 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
  175     char **end)
  176 {
  177         enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
  178         uint32_t aflags = ARC_WAIT;
  179         arc_buf_t *abuf = NULL;
  180         zbookmark_t zb;
  181         int error;
  182 
  183         if (zilog->zl_header->zh_claim_txg == 0)
  184                 zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
  185 
  186         if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
  187                 zio_flags |= ZIO_FLAG_SPECULATIVE;
  188 
  189         SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
  190             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
  191 
  192         error = dsl_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
  193             ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
  194 
  195         if (error == 0) {
  196                 zio_cksum_t cksum = bp->blk_cksum;
  197 
  198                 /*
  199                  * Validate the checksummed log block.
  200                  *
  201                  * Sequence numbers should be... sequential.  The checksum
  202                  * verifier for the next block should be bp's checksum plus 1.
  203                  *
  204                  * Also check the log chain linkage and size used.
  205                  */
  206                 cksum.zc_word[ZIL_ZC_SEQ]++;
  207 
  208                 if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
  209                         zil_chain_t *zilc = abuf->b_data;
  210                         char *lr = (char *)(zilc + 1);
  211                         uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
  212 
  213                         if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
  214                             sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
  215                                 error = ECKSUM;
  216                         } else {
  217                                 bcopy(lr, dst, len);
  218                                 *end = (char *)dst + len;
  219                                 *nbp = zilc->zc_next_blk;
  220                         }
  221                 } else {
  222                         char *lr = abuf->b_data;
  223                         uint64_t size = BP_GET_LSIZE(bp);
  224                         zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
  225 
  226                         if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
  227                             sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
  228                             (zilc->zc_nused > (size - sizeof (*zilc)))) {
  229                                 error = ECKSUM;
  230                         } else {
  231                                 bcopy(lr, dst, zilc->zc_nused);
  232                                 *end = (char *)dst + zilc->zc_nused;
  233                                 *nbp = zilc->zc_next_blk;
  234                         }
  235                 }
  236 
  237                 VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
  238         }
  239 
  240         return (error);
  241 }
  242 
  243 /*
  244  * Read a TX_WRITE log data block.
  245  */
  246 static int
  247 zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
  248 {
  249         enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
  250         const blkptr_t *bp = &lr->lr_blkptr;
  251         uint32_t aflags = ARC_WAIT;
  252         arc_buf_t *abuf = NULL;
  253         zbookmark_t zb;
  254         int error;
  255 
  256         if (BP_IS_HOLE(bp)) {
  257                 if (wbuf != NULL)
  258                         bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
  259                 return (0);
  260         }
  261 
  262         if (zilog->zl_header->zh_claim_txg == 0)
  263                 zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
  264 
  265         SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
  266             ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
  267 
  268         error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
  269             ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
  270 
  271         if (error == 0) {
  272                 if (wbuf != NULL)
  273                         bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
  274                 (void) arc_buf_remove_ref(abuf, &abuf);
  275         }
  276 
  277         return (error);
  278 }
  279 
  280 /*
  281  * Parse the intent log, and call parse_func for each valid record within.
  282  */
  283 int
  284 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
  285     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
  286 {
  287         const zil_header_t *zh = zilog->zl_header;
  288         boolean_t claimed = !!zh->zh_claim_txg;
  289         uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
  290         uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
  291         uint64_t max_blk_seq = 0;
  292         uint64_t max_lr_seq = 0;
  293         uint64_t blk_count = 0;
  294         uint64_t lr_count = 0;
  295         blkptr_t blk, next_blk;
  296         char *lrbuf, *lrp;
  297         int error = 0;
  298 
  299         /*
  300          * Old logs didn't record the maximum zh_claim_lr_seq.
  301          */
  302         if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
  303                 claim_lr_seq = UINT64_MAX;
  304 
  305         /*
  306          * Starting at the block pointed to by zh_log we read the log chain.
  307          * For each block in the chain we strongly check that block to
  308          * ensure its validity.  We stop when an invalid block is found.
  309          * For each block pointer in the chain we call parse_blk_func().
  310          * For each record in each valid block we call parse_lr_func().
  311          * If the log has been claimed, stop if we encounter a sequence
  312          * number greater than the highest claimed sequence number.
  313          */
  314         lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
  315         zil_bp_tree_init(zilog);
  316 
  317         for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
  318                 uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
  319                 int reclen;
  320                 char *end;
  321 
  322                 if (blk_seq > claim_blk_seq)
  323                         break;
  324                 if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
  325                         break;
  326                 ASSERT3U(max_blk_seq, <, blk_seq);
  327                 max_blk_seq = blk_seq;
  328                 blk_count++;
  329 
  330                 if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
  331                         break;
  332 
  333                 error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
  334                 if (error)
  335                         break;
  336 
  337                 for (lrp = lrbuf; lrp < end; lrp += reclen) {
  338                         lr_t *lr = (lr_t *)lrp;
  339                         reclen = lr->lrc_reclen;
  340                         ASSERT3U(reclen, >=, sizeof (lr_t));
  341                         if (lr->lrc_seq > claim_lr_seq)
  342                                 goto done;
  343                         if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
  344                                 goto done;
  345                         ASSERT3U(max_lr_seq, <, lr->lrc_seq);
  346                         max_lr_seq = lr->lrc_seq;
  347                         lr_count++;
  348                 }
  349         }
  350 done:
  351         zilog->zl_parse_error = error;
  352         zilog->zl_parse_blk_seq = max_blk_seq;
  353         zilog->zl_parse_lr_seq = max_lr_seq;
  354         zilog->zl_parse_blk_count = blk_count;
  355         zilog->zl_parse_lr_count = lr_count;
  356 
  357         ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
  358             (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
  359 
  360         zil_bp_tree_fini(zilog);
  361         zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
  362 
  363         return (error);
  364 }
  365 
  366 static int
  367 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
  368 {
  369         /*
  370          * Claim log block if not already committed and not already claimed.
  371          * If tx == NULL, just verify that the block is claimable.
  372          */
  373         if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
  374                 return (0);
  375 
  376         return (zio_wait(zio_claim(NULL, zilog->zl_spa,
  377             tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
  378             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
  379 }
  380 
  381 static int
  382 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
  383 {
  384         lr_write_t *lr = (lr_write_t *)lrc;
  385         int error;
  386 
  387         if (lrc->lrc_txtype != TX_WRITE)
  388                 return (0);
  389 
  390         /*
  391          * If the block is not readable, don't claim it.  This can happen
  392          * in normal operation when a log block is written to disk before
  393          * some of the dmu_sync() blocks it points to.  In this case, the
  394          * transaction cannot have been committed to anyone (we would have
  395          * waited for all writes to be stable first), so it is semantically
  396          * correct to declare this the end of the log.
  397          */
  398         if (lr->lr_blkptr.blk_birth >= first_txg &&
  399             (error = zil_read_log_data(zilog, lr, NULL)) != 0)
  400                 return (error);
  401         return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
  402 }
  403 
  404 /* ARGSUSED */
  405 static int
  406 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
  407 {
  408         zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
  409 
  410         return (0);
  411 }
  412 
  413 static int
  414 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
  415 {
  416         lr_write_t *lr = (lr_write_t *)lrc;
  417         blkptr_t *bp = &lr->lr_blkptr;
  418 
  419         /*
  420          * If we previously claimed it, we need to free it.
  421          */
  422         if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
  423             bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
  424                 zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
  425 
  426         return (0);
  427 }
  428 
  429 static lwb_t *
  430 zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg)
  431 {
  432         lwb_t *lwb;
  433 
  434         lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
  435         lwb->lwb_zilog = zilog;
  436         lwb->lwb_blk = *bp;
  437         lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
  438         lwb->lwb_max_txg = txg;
  439         lwb->lwb_zio = NULL;
  440         lwb->lwb_tx = NULL;
  441         if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
  442                 lwb->lwb_nused = sizeof (zil_chain_t);
  443                 lwb->lwb_sz = BP_GET_LSIZE(bp);
  444         } else {
  445                 lwb->lwb_nused = 0;
  446                 lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
  447         }
  448 
  449         mutex_enter(&zilog->zl_lock);
  450         list_insert_tail(&zilog->zl_lwb_list, lwb);
  451         mutex_exit(&zilog->zl_lock);
  452 
  453         return (lwb);
  454 }
  455 
  456 /*
  457  * Create an on-disk intent log.
  458  */
  459 static lwb_t *
  460 zil_create(zilog_t *zilog)
  461 {
  462         const zil_header_t *zh = zilog->zl_header;
  463         lwb_t *lwb = NULL;
  464         uint64_t txg = 0;
  465         dmu_tx_t *tx = NULL;
  466         blkptr_t blk;
  467         int error = 0;
  468 
  469         /*
  470          * Wait for any previous destroy to complete.
  471          */
  472         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
  473 
  474         ASSERT(zh->zh_claim_txg == 0);
  475         ASSERT(zh->zh_replay_seq == 0);
  476 
  477         blk = zh->zh_log;
  478 
  479         /*
  480          * Allocate an initial log block if:
  481          *    - there isn't one already
  482          *    - the existing block is the wrong endianess
  483          */
  484         if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
  485                 tx = dmu_tx_create(zilog->zl_os);
  486                 VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
  487                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  488                 txg = dmu_tx_get_txg(tx);
  489 
  490                 if (!BP_IS_HOLE(&blk)) {
  491                         zio_free_zil(zilog->zl_spa, txg, &blk);
  492                         BP_ZERO(&blk);
  493                 }
  494 
  495                 error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
  496                     ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
  497 
  498                 if (error == 0)
  499                         zil_init_log_chain(zilog, &blk);
  500         }
  501 
  502         /*
  503          * Allocate a log write buffer (lwb) for the first log block.
  504          */
  505         if (error == 0)
  506                 lwb = zil_alloc_lwb(zilog, &blk, txg);
  507 
  508         /*
  509          * If we just allocated the first log block, commit our transaction
  510          * and wait for zil_sync() to stuff the block poiner into zh_log.
  511          * (zh is part of the MOS, so we cannot modify it in open context.)
  512          */
  513         if (tx != NULL) {
  514                 dmu_tx_commit(tx);
  515                 txg_wait_synced(zilog->zl_dmu_pool, txg);
  516         }
  517 
  518         ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
  519 
  520         return (lwb);
  521 }
  522 
  523 /*
  524  * In one tx, free all log blocks and clear the log header.
  525  * If keep_first is set, then we're replaying a log with no content.
  526  * We want to keep the first block, however, so that the first
  527  * synchronous transaction doesn't require a txg_wait_synced()
  528  * in zil_create().  We don't need to txg_wait_synced() here either
  529  * when keep_first is set, because both zil_create() and zil_destroy()
  530  * will wait for any in-progress destroys to complete.
  531  */
  532 void
  533 zil_destroy(zilog_t *zilog, boolean_t keep_first)
  534 {
  535         const zil_header_t *zh = zilog->zl_header;
  536         lwb_t *lwb;
  537         dmu_tx_t *tx;
  538         uint64_t txg;
  539 
  540         /*
  541          * Wait for any previous destroy to complete.
  542          */
  543         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
  544 
  545         zilog->zl_old_header = *zh;             /* debugging aid */
  546 
  547         if (BP_IS_HOLE(&zh->zh_log))
  548                 return;
  549 
  550         tx = dmu_tx_create(zilog->zl_os);
  551         VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
  552         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  553         txg = dmu_tx_get_txg(tx);
  554 
  555         mutex_enter(&zilog->zl_lock);
  556 
  557         ASSERT3U(zilog->zl_destroy_txg, <, txg);
  558         zilog->zl_destroy_txg = txg;
  559         zilog->zl_keep_first = keep_first;
  560 
  561         if (!list_is_empty(&zilog->zl_lwb_list)) {
  562                 ASSERT(zh->zh_claim_txg == 0);
  563                 ASSERT(!keep_first);
  564                 while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
  565                         list_remove(&zilog->zl_lwb_list, lwb);
  566                         if (lwb->lwb_buf != NULL)
  567                                 zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
  568                         zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
  569                         kmem_cache_free(zil_lwb_cache, lwb);
  570                 }
  571         } else if (!keep_first) {
  572                 (void) zil_parse(zilog, zil_free_log_block,
  573                     zil_free_log_record, tx, zh->zh_claim_txg);
  574         }
  575         mutex_exit(&zilog->zl_lock);
  576 
  577         dmu_tx_commit(tx);
  578 }
  579 
  580 int
  581 zil_claim(const char *osname, void *txarg)
  582 {
  583         dmu_tx_t *tx = txarg;
  584         uint64_t first_txg = dmu_tx_get_txg(tx);
  585         zilog_t *zilog;
  586         zil_header_t *zh;
  587         objset_t *os;
  588         int error;
  589 
  590         error = dmu_objset_hold(osname, FTAG, &os);
  591         if (error) {
  592                 cmn_err(CE_WARN, "can't open objset for %s", osname);
  593                 return (0);
  594         }
  595 
  596         zilog = dmu_objset_zil(os);
  597         zh = zil_header_in_syncing_context(zilog);
  598 
  599         if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
  600                 if (!BP_IS_HOLE(&zh->zh_log))
  601                         zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
  602                 BP_ZERO(&zh->zh_log);
  603                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
  604                 dmu_objset_rele(os, FTAG);
  605                 return (0);
  606         }
  607 
  608         /*
  609          * Claim all log blocks if we haven't already done so, and remember
  610          * the highest claimed sequence number.  This ensures that if we can
  611          * read only part of the log now (e.g. due to a missing device),
  612          * but we can read the entire log later, we will not try to replay
  613          * or destroy beyond the last block we successfully claimed.
  614          */
  615         ASSERT3U(zh->zh_claim_txg, <=, first_txg);
  616         if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
  617                 (void) zil_parse(zilog, zil_claim_log_block,
  618                     zil_claim_log_record, tx, first_txg);
  619                 zh->zh_claim_txg = first_txg;
  620                 zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
  621                 zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
  622                 if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
  623                         zh->zh_flags |= ZIL_REPLAY_NEEDED;
  624                 zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
  625                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
  626         }
  627 
  628         ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
  629         dmu_objset_rele(os, FTAG);
  630         return (0);
  631 }
  632 
  633 /*
  634  * Check the log by walking the log chain.
  635  * Checksum errors are ok as they indicate the end of the chain.
  636  * Any other error (no device or read failure) returns an error.
  637  */
  638 int
  639 zil_check_log_chain(const char *osname, void *tx)
  640 {
  641         zilog_t *zilog;
  642         objset_t *os;
  643         blkptr_t *bp;
  644         int error;
  645 
  646         ASSERT(tx == NULL);
  647 
  648         error = dmu_objset_hold(osname, FTAG, &os);
  649         if (error) {
  650                 cmn_err(CE_WARN, "can't open objset for %s", osname);
  651                 return (0);
  652         }
  653 
  654         zilog = dmu_objset_zil(os);
  655         bp = (blkptr_t *)&zilog->zl_header->zh_log;
  656 
  657         /*
  658          * Check the first block and determine if it's on a log device
  659          * which may have been removed or faulted prior to loading this
  660          * pool.  If so, there's no point in checking the rest of the log
  661          * as its content should have already been synced to the pool.
  662          */
  663         if (!BP_IS_HOLE(bp)) {
  664                 vdev_t *vd;
  665                 boolean_t valid = B_TRUE;
  666 
  667                 spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
  668                 vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
  669                 if (vd->vdev_islog && vdev_is_dead(vd))
  670                         valid = vdev_log_state_valid(vd);
  671                 spa_config_exit(os->os_spa, SCL_STATE, FTAG);
  672 
  673                 if (!valid) {
  674                         dmu_objset_rele(os, FTAG);
  675                         return (0);
  676                 }
  677         }
  678 
  679         /*
  680          * Because tx == NULL, zil_claim_log_block() will not actually claim
  681          * any blocks, but just determine whether it is possible to do so.
  682          * In addition to checking the log chain, zil_claim_log_block()
  683          * will invoke zio_claim() with a done func of spa_claim_notify(),
  684          * which will update spa_max_claim_txg.  See spa_load() for details.
  685          */
  686         error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
  687             zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
  688 
  689         dmu_objset_rele(os, FTAG);
  690 
  691         return ((error == ECKSUM || error == ENOENT) ? 0 : error);
  692 }
  693 
  694 static int
  695 zil_vdev_compare(const void *x1, const void *x2)
  696 {
  697         const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
  698         const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
  699 
  700         if (v1 < v2)
  701                 return (-1);
  702         if (v1 > v2)
  703                 return (1);
  704 
  705         return (0);
  706 }
  707 
  708 void
  709 zil_add_block(zilog_t *zilog, const blkptr_t *bp)
  710 {
  711         avl_tree_t *t = &zilog->zl_vdev_tree;
  712         avl_index_t where;
  713         zil_vdev_node_t *zv, zvsearch;
  714         int ndvas = BP_GET_NDVAS(bp);
  715         int i;
  716 
  717         if (zfs_nocacheflush)
  718                 return;
  719 
  720         ASSERT(zilog->zl_writer);
  721 
  722         /*
  723          * Even though we're zl_writer, we still need a lock because the
  724          * zl_get_data() callbacks may have dmu_sync() done callbacks
  725          * that will run concurrently.
  726          */
  727         mutex_enter(&zilog->zl_vdev_lock);
  728         for (i = 0; i < ndvas; i++) {
  729                 zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
  730                 if (avl_find(t, &zvsearch, &where) == NULL) {
  731                         zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
  732                         zv->zv_vdev = zvsearch.zv_vdev;
  733                         avl_insert(t, zv, where);
  734                 }
  735         }
  736         mutex_exit(&zilog->zl_vdev_lock);
  737 }
  738 
  739 static void
  740 zil_flush_vdevs(zilog_t *zilog)
  741 {
  742         spa_t *spa = zilog->zl_spa;
  743         avl_tree_t *t = &zilog->zl_vdev_tree;
  744         void *cookie = NULL;
  745         zil_vdev_node_t *zv;
  746         zio_t *zio;
  747 
  748         ASSERT(zilog->zl_writer);
  749 
  750         /*
  751          * We don't need zl_vdev_lock here because we're the zl_writer,
  752          * and all zl_get_data() callbacks are done.
  753          */
  754         if (avl_numnodes(t) == 0)
  755                 return;
  756 
  757         spa_config_enter(spa, SCL_STATE, FTAG, RW_READER);
  758 
  759         zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
  760 
  761         while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
  762                 vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
  763                 if (vd != NULL)
  764                         zio_flush(zio, vd);
  765                 kmem_free(zv, sizeof (*zv));
  766         }
  767 
  768         /*
  769          * Wait for all the flushes to complete.  Not all devices actually
  770          * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
  771          */
  772         (void) zio_wait(zio);
  773 
  774         spa_config_exit(spa, SCL_STATE, FTAG);
  775 }
  776 
  777 /*
  778  * Function called when a log block write completes
  779  */
  780 static void
  781 zil_lwb_write_done(zio_t *zio)
  782 {
  783         lwb_t *lwb = zio->io_private;
  784         zilog_t *zilog = lwb->lwb_zilog;
  785         dmu_tx_t *tx = lwb->lwb_tx;
  786 
  787         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
  788         ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
  789         ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
  790         ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
  791         ASSERT(!BP_IS_GANG(zio->io_bp));
  792         ASSERT(!BP_IS_HOLE(zio->io_bp));
  793         ASSERT(zio->io_bp->blk_fill == 0);
  794 
  795         /*
  796          * Ensure the lwb buffer pointer is cleared before releasing
  797          * the txg. If we have had an allocation failure and
  798          * the txg is waiting to sync then we want want zil_sync()
  799          * to remove the lwb so that it's not picked up as the next new
  800          * one in zil_commit_writer(). zil_sync() will only remove
  801          * the lwb if lwb_buf is null.
  802          */
  803         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
  804         mutex_enter(&zilog->zl_lock);
  805         lwb->lwb_buf = NULL;
  806         lwb->lwb_tx = NULL;
  807         mutex_exit(&zilog->zl_lock);
  808 
  809         /*
  810          * Now that we've written this log block, we have a stable pointer
  811          * to the next block in the chain, so it's OK to let the txg in
  812          * which we allocated the next block sync.
  813          */
  814         dmu_tx_commit(tx);
  815 }
  816 
  817 /*
  818  * Initialize the io for a log block.
  819  */
  820 static void
  821 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
  822 {
  823         zbookmark_t zb;
  824 
  825         SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
  826             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
  827             lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
  828 
  829         if (zilog->zl_root_zio == NULL) {
  830                 zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
  831                     ZIO_FLAG_CANFAIL);
  832         }
  833         if (lwb->lwb_zio == NULL) {
  834                 lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
  835                     0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
  836                     zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
  837                     ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
  838         }
  839 }
  840 
  841 /*
  842  * Define a limited set of intent log block sizes.
  843  * These must be a multiple of 4KB. Note only the amount used (again
  844  * aligned to 4KB) actually gets written. However, we can't always just
  845  * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
  846  */
  847 uint64_t zil_block_buckets[] = {
  848     4096,               /* non TX_WRITE */
  849     8192+4096,          /* data base */
  850     32*1024 + 4096,     /* NFS writes */
  851     UINT64_MAX
  852 };
  853 
  854 /*
  855  * Use the slog as long as the logbias is 'latency' and the current commit size
  856  * is less than the limit or the total list size is less than 2X the limit.
  857  * Limit checking is disabled by setting zil_slog_limit to UINT64_MAX.
  858  */
  859 uint64_t zil_slog_limit = 1024 * 1024;
  860 #define USE_SLOG(zilog) (((zilog)->zl_logbias == ZFS_LOGBIAS_LATENCY) && \
  861         (((zilog)->zl_cur_used < zil_slog_limit) || \
  862         ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1))))
  863 
  864 /*
  865  * Start a log block write and advance to the next log block.
  866  * Calls are serialized.
  867  */
  868 static lwb_t *
  869 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
  870 {
  871         lwb_t *nlwb = NULL;
  872         zil_chain_t *zilc;
  873         spa_t *spa = zilog->zl_spa;
  874         blkptr_t *bp;
  875         dmu_tx_t *tx;
  876         uint64_t txg;
  877         uint64_t zil_blksz, wsz;
  878         int i, error;
  879 
  880         if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
  881                 zilc = (zil_chain_t *)lwb->lwb_buf;
  882                 bp = &zilc->zc_next_blk;
  883         } else {
  884                 zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
  885                 bp = &zilc->zc_next_blk;
  886         }
  887 
  888         ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
  889 
  890         /*
  891          * Allocate the next block and save its address in this block
  892          * before writing it in order to establish the log chain.
  893          * Note that if the allocation of nlwb synced before we wrote
  894          * the block that points at it (lwb), we'd leak it if we crashed.
  895          * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
  896          * We dirty the dataset to ensure that zil_sync() will be called
  897          * to clean up in the event of allocation failure or I/O failure.
  898          */
  899         tx = dmu_tx_create(zilog->zl_os);
  900         VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
  901         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  902         txg = dmu_tx_get_txg(tx);
  903 
  904         lwb->lwb_tx = tx;
  905 
  906         /*
  907          * Log blocks are pre-allocated. Here we select the size of the next
  908          * block, based on size used in the last block.
  909          * - first find the smallest bucket that will fit the block from a
  910          *   limited set of block sizes. This is because it's faster to write
  911          *   blocks allocated from the same metaslab as they are adjacent or
  912          *   close.
  913          * - next find the maximum from the new suggested size and an array of
  914          *   previous sizes. This lessens a picket fence effect of wrongly
  915          *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
  916          *   requests.
  917          *
  918          * Note we only write what is used, but we can't just allocate
  919          * the maximum block size because we can exhaust the available
  920          * pool log space.
  921          */
  922         zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
  923         for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
  924                 continue;
  925         zil_blksz = zil_block_buckets[i];
  926         if (zil_blksz == UINT64_MAX)
  927                 zil_blksz = SPA_MAXBLOCKSIZE;
  928         zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
  929         for (i = 0; i < ZIL_PREV_BLKS; i++)
  930                 zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
  931         zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
  932 
  933         BP_ZERO(bp);
  934         /* pass the old blkptr in order to spread log blocks across devs */
  935         error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
  936             USE_SLOG(zilog));
  937         if (!error) {
  938                 ASSERT3U(bp->blk_birth, ==, txg);
  939                 bp->blk_cksum = lwb->lwb_blk.blk_cksum;
  940                 bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
  941 
  942                 /*
  943                  * Allocate a new log write buffer (lwb).
  944                  */
  945                 nlwb = zil_alloc_lwb(zilog, bp, txg);
  946 
  947                 /* Record the block for later vdev flushing */
  948                 zil_add_block(zilog, &lwb->lwb_blk);
  949         }
  950 
  951         if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
  952                 /* For Slim ZIL only write what is used. */
  953                 wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
  954                 ASSERT3U(wsz, <=, lwb->lwb_sz);
  955                 zio_shrink(lwb->lwb_zio, wsz);
  956 
  957         } else {
  958                 wsz = lwb->lwb_sz;
  959         }
  960 
  961         zilc->zc_pad = 0;
  962         zilc->zc_nused = lwb->lwb_nused;
  963         zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
  964 
  965         /*
  966          * clear unused data for security
  967          */
  968         bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
  969 
  970         zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
  971 
  972         /*
  973          * If there was an allocation failure then nlwb will be null which
  974          * forces a txg_wait_synced().
  975          */
  976         return (nlwb);
  977 }
  978 
  979 static lwb_t *
  980 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
  981 {
  982         lr_t *lrc = &itx->itx_lr; /* common log record */
  983         lr_write_t *lrw = (lr_write_t *)lrc;
  984         char *lr_buf;
  985         uint64_t txg = lrc->lrc_txg;
  986         uint64_t reclen = lrc->lrc_reclen;
  987         uint64_t dlen = 0;
  988 
  989         if (lwb == NULL)
  990                 return (NULL);
  991 
  992         ASSERT(lwb->lwb_buf != NULL);
  993 
  994         if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
  995                 dlen = P2ROUNDUP_TYPED(
  996                     lrw->lr_length, sizeof (uint64_t), uint64_t);
  997 
  998         zilog->zl_cur_used += (reclen + dlen);
  999 
 1000         zil_lwb_write_init(zilog, lwb);
 1001 
 1002         /*
 1003          * If this record won't fit in the current log block, start a new one.
 1004          */
 1005         if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
 1006                 lwb = zil_lwb_write_start(zilog, lwb);
 1007                 if (lwb == NULL)
 1008                         return (NULL);
 1009                 zil_lwb_write_init(zilog, lwb);
 1010                 ASSERT(LWB_EMPTY(lwb));
 1011                 if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
 1012                         txg_wait_synced(zilog->zl_dmu_pool, txg);
 1013                         return (lwb);
 1014                 }
 1015         }
 1016 
 1017         lr_buf = lwb->lwb_buf + lwb->lwb_nused;
 1018         bcopy(lrc, lr_buf, reclen);
 1019         lrc = (lr_t *)lr_buf;
 1020         lrw = (lr_write_t *)lrc;
 1021 
 1022         /*
 1023          * If it's a write, fetch the data or get its blkptr as appropriate.
 1024          */
 1025         if (lrc->lrc_txtype == TX_WRITE) {
 1026                 if (txg > spa_freeze_txg(zilog->zl_spa))
 1027                         txg_wait_synced(zilog->zl_dmu_pool, txg);
 1028                 if (itx->itx_wr_state != WR_COPIED) {
 1029                         char *dbuf;
 1030                         int error;
 1031 
 1032                         if (dlen) {
 1033                                 ASSERT(itx->itx_wr_state == WR_NEED_COPY);
 1034                                 dbuf = lr_buf + reclen;
 1035                                 lrw->lr_common.lrc_reclen += dlen;
 1036                         } else {
 1037                                 ASSERT(itx->itx_wr_state == WR_INDIRECT);
 1038                                 dbuf = NULL;
 1039                         }
 1040                         error = zilog->zl_get_data(
 1041                             itx->itx_private, lrw, dbuf, lwb->lwb_zio);
 1042                         if (error == EIO) {
 1043                                 txg_wait_synced(zilog->zl_dmu_pool, txg);
 1044                                 return (lwb);
 1045                         }
 1046                         if (error) {
 1047                                 ASSERT(error == ENOENT || error == EEXIST ||
 1048                                     error == EALREADY);
 1049                                 return (lwb);
 1050                         }
 1051                 }
 1052         }
 1053 
 1054         /*
 1055          * We're actually making an entry, so update lrc_seq to be the
 1056          * log record sequence number.  Note that this is generally not
 1057          * equal to the itx sequence number because not all transactions
 1058          * are synchronous, and sometimes spa_sync() gets there first.
 1059          */
 1060         lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
 1061         lwb->lwb_nused += reclen + dlen;
 1062         lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
 1063         ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
 1064         ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
 1065 
 1066         return (lwb);
 1067 }
 1068 
 1069 itx_t *
 1070 zil_itx_create(uint64_t txtype, size_t lrsize)
 1071 {
 1072         itx_t *itx;
 1073 
 1074         lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
 1075 
 1076         itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
 1077         itx->itx_lr.lrc_txtype = txtype;
 1078         itx->itx_lr.lrc_reclen = lrsize;
 1079         itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
 1080         itx->itx_lr.lrc_seq = 0;        /* defensive */
 1081         itx->itx_sync = B_TRUE;         /* default is synchronous */
 1082 
 1083         return (itx);
 1084 }
 1085 
 1086 void
 1087 zil_itx_destroy(itx_t *itx)
 1088 {
 1089         kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
 1090 }
 1091 
 1092 /*
 1093  * Free up the sync and async itxs. The itxs_t has already been detached
 1094  * so no locks are needed.
 1095  */
 1096 static void
 1097 zil_itxg_clean(itxs_t *itxs)
 1098 {
 1099         itx_t *itx;
 1100         list_t *list;
 1101         avl_tree_t *t;
 1102         void *cookie;
 1103         itx_async_node_t *ian;
 1104 
 1105         list = &itxs->i_sync_list;
 1106         while ((itx = list_head(list)) != NULL) {
 1107                 list_remove(list, itx);
 1108                 kmem_free(itx, offsetof(itx_t, itx_lr) +
 1109                     itx->itx_lr.lrc_reclen);
 1110         }
 1111 
 1112         cookie = NULL;
 1113         t = &itxs->i_async_tree;
 1114         while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
 1115                 list = &ian->ia_list;
 1116                 while ((itx = list_head(list)) != NULL) {
 1117                         list_remove(list, itx);
 1118                         kmem_free(itx, offsetof(itx_t, itx_lr) +
 1119                             itx->itx_lr.lrc_reclen);
 1120                 }
 1121                 list_destroy(list);
 1122                 kmem_free(ian, sizeof (itx_async_node_t));
 1123         }
 1124         avl_destroy(t);
 1125 
 1126         kmem_free(itxs, sizeof (itxs_t));
 1127 }
 1128 
 1129 static int
 1130 zil_aitx_compare(const void *x1, const void *x2)
 1131 {
 1132         const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
 1133         const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
 1134 
 1135         if (o1 < o2)
 1136                 return (-1);
 1137         if (o1 > o2)
 1138                 return (1);
 1139 
 1140         return (0);
 1141 }
 1142 
 1143 /*
 1144  * Remove all async itx with the given oid.
 1145  */
 1146 static void
 1147 zil_remove_async(zilog_t *zilog, uint64_t oid)
 1148 {
 1149         uint64_t otxg, txg;
 1150         itx_async_node_t *ian;
 1151         avl_tree_t *t;
 1152         avl_index_t where;
 1153         list_t clean_list;
 1154         itx_t *itx;
 1155 
 1156         ASSERT(oid != 0);
 1157         list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
 1158 
 1159         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
 1160                 otxg = ZILTEST_TXG;
 1161         else
 1162                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 1163 
 1164         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
 1165                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 1166 
 1167                 mutex_enter(&itxg->itxg_lock);
 1168                 if (itxg->itxg_txg != txg) {
 1169                         mutex_exit(&itxg->itxg_lock);
 1170                         continue;
 1171                 }
 1172 
 1173                 /*
 1174                  * Locate the object node and append its list.
 1175                  */
 1176                 t = &itxg->itxg_itxs->i_async_tree;
 1177                 ian = avl_find(t, &oid, &where);
 1178                 if (ian != NULL)
 1179                         list_move_tail(&clean_list, &ian->ia_list);
 1180                 mutex_exit(&itxg->itxg_lock);
 1181         }
 1182         while ((itx = list_head(&clean_list)) != NULL) {
 1183                 list_remove(&clean_list, itx);
 1184                 kmem_free(itx, offsetof(itx_t, itx_lr) +
 1185                     itx->itx_lr.lrc_reclen);
 1186         }
 1187         list_destroy(&clean_list);
 1188 }
 1189 
 1190 void
 1191 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
 1192 {
 1193         uint64_t txg;
 1194         itxg_t *itxg;
 1195         itxs_t *itxs, *clean = NULL;
 1196 
 1197         /*
 1198          * Object ids can be re-instantiated in the next txg so
 1199          * remove any async transactions to avoid future leaks.
 1200          * This can happen if a fsync occurs on the re-instantiated
 1201          * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
 1202          * the new file data and flushes a write record for the old object.
 1203          */
 1204         if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
 1205                 zil_remove_async(zilog, itx->itx_oid);
 1206 
 1207         /*
 1208          * Ensure the data of a renamed file is committed before the rename.
 1209          */
 1210         if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_RENAME)
 1211                 zil_async_to_sync(zilog, itx->itx_oid);
 1212 
 1213         if (spa_freeze_txg(zilog->zl_spa) !=  UINT64_MAX)
 1214                 txg = ZILTEST_TXG;
 1215         else
 1216                 txg = dmu_tx_get_txg(tx);
 1217 
 1218         itxg = &zilog->zl_itxg[txg & TXG_MASK];
 1219         mutex_enter(&itxg->itxg_lock);
 1220         itxs = itxg->itxg_itxs;
 1221         if (itxg->itxg_txg != txg) {
 1222                 if (itxs != NULL) {
 1223                         /*
 1224                          * The zil_clean callback hasn't got around to cleaning
 1225                          * this itxg. Save the itxs for release below.
 1226                          * This should be rare.
 1227                          */
 1228                         atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
 1229                         itxg->itxg_sod = 0;
 1230                         clean = itxg->itxg_itxs;
 1231                 }
 1232                 ASSERT(itxg->itxg_sod == 0);
 1233                 itxg->itxg_txg = txg;
 1234                 itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_SLEEP);
 1235 
 1236                 list_create(&itxs->i_sync_list, sizeof (itx_t),
 1237                     offsetof(itx_t, itx_node));
 1238                 avl_create(&itxs->i_async_tree, zil_aitx_compare,
 1239                     sizeof (itx_async_node_t),
 1240                     offsetof(itx_async_node_t, ia_node));
 1241         }
 1242         if (itx->itx_sync) {
 1243                 list_insert_tail(&itxs->i_sync_list, itx);
 1244                 atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
 1245                 itxg->itxg_sod += itx->itx_sod;
 1246         } else {
 1247                 avl_tree_t *t = &itxs->i_async_tree;
 1248                 uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
 1249                 itx_async_node_t *ian;
 1250                 avl_index_t where;
 1251 
 1252                 ian = avl_find(t, &foid, &where);
 1253                 if (ian == NULL) {
 1254                         ian = kmem_alloc(sizeof (itx_async_node_t), KM_SLEEP);
 1255                         list_create(&ian->ia_list, sizeof (itx_t),
 1256                             offsetof(itx_t, itx_node));
 1257                         ian->ia_foid = foid;
 1258                         avl_insert(t, ian, where);
 1259                 }
 1260                 list_insert_tail(&ian->ia_list, itx);
 1261         }
 1262 
 1263         itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
 1264         mutex_exit(&itxg->itxg_lock);
 1265 
 1266         /* Release the old itxs now we've dropped the lock */
 1267         if (clean != NULL)
 1268                 zil_itxg_clean(clean);
 1269 }
 1270 
 1271 /*
 1272  * If there are any in-memory intent log transactions which have now been
 1273  * synced then start up a taskq to free them.
 1274  */
 1275 void
 1276 zil_clean(zilog_t *zilog, uint64_t synced_txg)
 1277 {
 1278         itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
 1279         itxs_t *clean_me;
 1280 
 1281         mutex_enter(&itxg->itxg_lock);
 1282         if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
 1283                 mutex_exit(&itxg->itxg_lock);
 1284                 return;
 1285         }
 1286         ASSERT3U(itxg->itxg_txg, <=, synced_txg);
 1287         ASSERT(itxg->itxg_txg != 0);
 1288         ASSERT(zilog->zl_clean_taskq != NULL);
 1289         atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
 1290         itxg->itxg_sod = 0;
 1291         clean_me = itxg->itxg_itxs;
 1292         itxg->itxg_itxs = NULL;
 1293         itxg->itxg_txg = 0;
 1294         mutex_exit(&itxg->itxg_lock);
 1295         /*
 1296          * Preferably start a task queue to free up the old itxs but
 1297          * if taskq_dispatch can't allocate resources to do that then
 1298          * free it in-line. This should be rare. Note, using TQ_SLEEP
 1299          * created a bad performance problem.
 1300          */
 1301         if (taskq_dispatch(zilog->zl_clean_taskq,
 1302             (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == NULL)
 1303                 zil_itxg_clean(clean_me);
 1304 }
 1305 
 1306 /*
 1307  * Get the list of itxs to commit into zl_itx_commit_list.
 1308  */
 1309 static void
 1310 zil_get_commit_list(zilog_t *zilog)
 1311 {
 1312         uint64_t otxg, txg;
 1313         list_t *commit_list = &zilog->zl_itx_commit_list;
 1314         uint64_t push_sod = 0;
 1315 
 1316         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
 1317                 otxg = ZILTEST_TXG;
 1318         else
 1319                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 1320 
 1321         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
 1322                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 1323 
 1324                 mutex_enter(&itxg->itxg_lock);
 1325                 if (itxg->itxg_txg != txg) {
 1326                         mutex_exit(&itxg->itxg_lock);
 1327                         continue;
 1328                 }
 1329 
 1330                 list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
 1331                 push_sod += itxg->itxg_sod;
 1332                 itxg->itxg_sod = 0;
 1333 
 1334                 mutex_exit(&itxg->itxg_lock);
 1335         }
 1336         atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
 1337 }
 1338 
 1339 /*
 1340  * Move the async itxs for a specified object to commit into sync lists.
 1341  */
 1342 static void
 1343 zil_async_to_sync(zilog_t *zilog, uint64_t foid)
 1344 {
 1345         uint64_t otxg, txg;
 1346         itx_async_node_t *ian;
 1347         avl_tree_t *t;
 1348         avl_index_t where;
 1349 
 1350         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
 1351                 otxg = ZILTEST_TXG;
 1352         else
 1353                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
 1354 
 1355         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
 1356                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
 1357 
 1358                 mutex_enter(&itxg->itxg_lock);
 1359                 if (itxg->itxg_txg != txg) {
 1360                         mutex_exit(&itxg->itxg_lock);
 1361                         continue;
 1362                 }
 1363 
 1364                 /*
 1365                  * If a foid is specified then find that node and append its
 1366                  * list. Otherwise walk the tree appending all the lists
 1367                  * to the sync list. We add to the end rather than the
 1368                  * beginning to ensure the create has happened.
 1369                  */
 1370                 t = &itxg->itxg_itxs->i_async_tree;
 1371                 if (foid != 0) {
 1372                         ian = avl_find(t, &foid, &where);
 1373                         if (ian != NULL) {
 1374                                 list_move_tail(&itxg->itxg_itxs->i_sync_list,
 1375                                     &ian->ia_list);
 1376                         }
 1377                 } else {
 1378                         void *cookie = NULL;
 1379 
 1380                         while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
 1381                                 list_move_tail(&itxg->itxg_itxs->i_sync_list,
 1382                                     &ian->ia_list);
 1383                                 list_destroy(&ian->ia_list);
 1384                                 kmem_free(ian, sizeof (itx_async_node_t));
 1385                         }
 1386                 }
 1387                 mutex_exit(&itxg->itxg_lock);
 1388         }
 1389 }
 1390 
 1391 static void
 1392 zil_commit_writer(zilog_t *zilog)
 1393 {
 1394         uint64_t txg;
 1395         itx_t *itx;
 1396         lwb_t *lwb;
 1397         spa_t *spa = zilog->zl_spa;
 1398         int error = 0;
 1399 
 1400         ASSERT(zilog->zl_root_zio == NULL);
 1401 
 1402         mutex_exit(&zilog->zl_lock);
 1403 
 1404         zil_get_commit_list(zilog);
 1405 
 1406         /*
 1407          * Return if there's nothing to commit before we dirty the fs by
 1408          * calling zil_create().
 1409          */
 1410         if (list_head(&zilog->zl_itx_commit_list) == NULL) {
 1411                 mutex_enter(&zilog->zl_lock);
 1412                 return;
 1413         }
 1414 
 1415         if (zilog->zl_suspend) {
 1416                 lwb = NULL;
 1417         } else {
 1418                 lwb = list_tail(&zilog->zl_lwb_list);
 1419                 if (lwb == NULL)
 1420                         lwb = zil_create(zilog);
 1421         }
 1422 
 1423         DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
 1424         while (itx = list_head(&zilog->zl_itx_commit_list)) {
 1425                 txg = itx->itx_lr.lrc_txg;
 1426                 ASSERT(txg);
 1427 
 1428                 if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
 1429                         lwb = zil_lwb_commit(zilog, itx, lwb);
 1430                 list_remove(&zilog->zl_itx_commit_list, itx);
 1431                 kmem_free(itx, offsetof(itx_t, itx_lr)
 1432                     + itx->itx_lr.lrc_reclen);
 1433         }
 1434         DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
 1435 
 1436         /* write the last block out */
 1437         if (lwb != NULL && lwb->lwb_zio != NULL)
 1438                 lwb = zil_lwb_write_start(zilog, lwb);
 1439 
 1440         zilog->zl_cur_used = 0;
 1441 
 1442         /*
 1443          * Wait if necessary for the log blocks to be on stable storage.
 1444          */
 1445         if (zilog->zl_root_zio) {
 1446                 error = zio_wait(zilog->zl_root_zio);
 1447                 zilog->zl_root_zio = NULL;
 1448                 zil_flush_vdevs(zilog);
 1449         }
 1450 
 1451         if (error || lwb == NULL)
 1452                 txg_wait_synced(zilog->zl_dmu_pool, 0);
 1453 
 1454         mutex_enter(&zilog->zl_lock);
 1455 
 1456         /*
 1457          * Remember the highest committed log sequence number for ztest.
 1458          * We only update this value when all the log writes succeeded,
 1459          * because ztest wants to ASSERT that it got the whole log chain.
 1460          */
 1461         if (error == 0 && lwb != NULL)
 1462                 zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
 1463 }
 1464 
 1465 /*
 1466  * Commit zfs transactions to stable storage.
 1467  * If foid is 0 push out all transactions, otherwise push only those
 1468  * for that object or might reference that object.
 1469  *
 1470  * itxs are committed in batches. In a heavily stressed zil there will be
 1471  * a commit writer thread who is writing out a bunch of itxs to the log
 1472  * for a set of committing threads (cthreads) in the same batch as the writer.
 1473  * Those cthreads are all waiting on the same cv for that batch.
 1474  *
 1475  * There will also be a different and growing batch of threads that are
 1476  * waiting to commit (qthreads). When the committing batch completes
 1477  * a transition occurs such that the cthreads exit and the qthreads become
 1478  * cthreads. One of the new cthreads becomes the writer thread for the
 1479  * batch. Any new threads arriving become new qthreads.
 1480  *
 1481  * Only 2 condition variables are needed and there's no transition
 1482  * between the two cvs needed. They just flip-flop between qthreads
 1483  * and cthreads.
 1484  *
 1485  * Using this scheme we can efficiently wakeup up only those threads
 1486  * that have been committed.
 1487  */
 1488 void
 1489 zil_commit(zilog_t *zilog, uint64_t foid)
 1490 {
 1491         uint64_t mybatch;
 1492 
 1493         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
 1494                 return;
 1495 
 1496         /* move the async itxs for the foid to the sync queues */
 1497         zil_async_to_sync(zilog, foid);
 1498 
 1499         mutex_enter(&zilog->zl_lock);
 1500         mybatch = zilog->zl_next_batch;
 1501         while (zilog->zl_writer) {
 1502                 cv_wait(&zilog->zl_cv_batch[mybatch & 1], &zilog->zl_lock);
 1503                 if (mybatch <= zilog->zl_com_batch) {
 1504                         mutex_exit(&zilog->zl_lock);
 1505                         return;
 1506                 }
 1507         }
 1508 
 1509         zilog->zl_next_batch++;
 1510         zilog->zl_writer = B_TRUE;
 1511         zil_commit_writer(zilog);
 1512         zilog->zl_com_batch = mybatch;
 1513         zilog->zl_writer = B_FALSE;
 1514         mutex_exit(&zilog->zl_lock);
 1515 
 1516         /* wake up one thread to become the next writer */
 1517         cv_signal(&zilog->zl_cv_batch[(mybatch+1) & 1]);
 1518 
 1519         /* wake up all threads waiting for this batch to be committed */
 1520         cv_broadcast(&zilog->zl_cv_batch[mybatch & 1]);
 1521 }
 1522 
 1523 /*
 1524  * Called in syncing context to free committed log blocks and update log header.
 1525  */
 1526 void
 1527 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
 1528 {
 1529         zil_header_t *zh = zil_header_in_syncing_context(zilog);
 1530         uint64_t txg = dmu_tx_get_txg(tx);
 1531         spa_t *spa = zilog->zl_spa;
 1532         uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
 1533         lwb_t *lwb;
 1534 
 1535         /*
 1536          * We don't zero out zl_destroy_txg, so make sure we don't try
 1537          * to destroy it twice.
 1538          */
 1539         if (spa_sync_pass(spa) != 1)
 1540                 return;
 1541 
 1542         mutex_enter(&zilog->zl_lock);
 1543 
 1544         ASSERT(zilog->zl_stop_sync == 0);
 1545 
 1546         if (*replayed_seq != 0) {
 1547                 ASSERT(zh->zh_replay_seq < *replayed_seq);
 1548                 zh->zh_replay_seq = *replayed_seq;
 1549                 *replayed_seq = 0;
 1550         }
 1551 
 1552         if (zilog->zl_destroy_txg == txg) {
 1553                 blkptr_t blk = zh->zh_log;
 1554 
 1555                 ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
 1556 
 1557                 bzero(zh, sizeof (zil_header_t));
 1558                 bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
 1559 
 1560                 if (zilog->zl_keep_first) {
 1561                         /*
 1562                          * If this block was part of log chain that couldn't
 1563                          * be claimed because a device was missing during
 1564                          * zil_claim(), but that device later returns,
 1565                          * then this block could erroneously appear valid.
 1566                          * To guard against this, assign a new GUID to the new
 1567                          * log chain so it doesn't matter what blk points to.
 1568                          */
 1569                         zil_init_log_chain(zilog, &blk);
 1570                         zh->zh_log = blk;
 1571                 }
 1572         }
 1573 
 1574         while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
 1575                 zh->zh_log = lwb->lwb_blk;
 1576                 if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
 1577                         break;
 1578                 list_remove(&zilog->zl_lwb_list, lwb);
 1579                 zio_free_zil(spa, txg, &lwb->lwb_blk);
 1580                 kmem_cache_free(zil_lwb_cache, lwb);
 1581 
 1582                 /*
 1583                  * If we don't have anything left in the lwb list then
 1584                  * we've had an allocation failure and we need to zero
 1585                  * out the zil_header blkptr so that we don't end
 1586                  * up freeing the same block twice.
 1587                  */
 1588                 if (list_head(&zilog->zl_lwb_list) == NULL)
 1589                         BP_ZERO(&zh->zh_log);
 1590         }
 1591         mutex_exit(&zilog->zl_lock);
 1592 }
 1593 
 1594 void
 1595 zil_init(void)
 1596 {
 1597         zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
 1598             sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
 1599 }
 1600 
 1601 void
 1602 zil_fini(void)
 1603 {
 1604         kmem_cache_destroy(zil_lwb_cache);
 1605 }
 1606 
 1607 void
 1608 zil_set_sync(zilog_t *zilog, uint64_t sync)
 1609 {
 1610         zilog->zl_sync = sync;
 1611 }
 1612 
 1613 void
 1614 zil_set_logbias(zilog_t *zilog, uint64_t logbias)
 1615 {
 1616         zilog->zl_logbias = logbias;
 1617 }
 1618 
 1619 zilog_t *
 1620 zil_alloc(objset_t *os, zil_header_t *zh_phys)
 1621 {
 1622         zilog_t *zilog;
 1623 
 1624         zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
 1625 
 1626         zilog->zl_header = zh_phys;
 1627         zilog->zl_os = os;
 1628         zilog->zl_spa = dmu_objset_spa(os);
 1629         zilog->zl_dmu_pool = dmu_objset_pool(os);
 1630         zilog->zl_destroy_txg = TXG_INITIAL - 1;
 1631         zilog->zl_logbias = dmu_objset_logbias(os);
 1632         zilog->zl_sync = dmu_objset_syncprop(os);
 1633         zilog->zl_next_batch = 1;
 1634 
 1635         mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
 1636 
 1637         for (int i = 0; i < TXG_SIZE; i++) {
 1638                 mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
 1639                     MUTEX_DEFAULT, NULL);
 1640         }
 1641 
 1642         list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
 1643             offsetof(lwb_t, lwb_node));
 1644 
 1645         list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
 1646             offsetof(itx_t, itx_node));
 1647 
 1648         mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
 1649 
 1650         avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
 1651             sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
 1652 
 1653         cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
 1654         cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
 1655         cv_init(&zilog->zl_cv_batch[0], NULL, CV_DEFAULT, NULL);
 1656         cv_init(&zilog->zl_cv_batch[1], NULL, CV_DEFAULT, NULL);
 1657 
 1658         return (zilog);
 1659 }
 1660 
 1661 void
 1662 zil_free(zilog_t *zilog)
 1663 {
 1664         lwb_t *head_lwb;
 1665 
 1666         zilog->zl_stop_sync = 1;
 1667 
 1668         /*
 1669          * After zil_close() there should only be one lwb with a buffer.
 1670          */
 1671         head_lwb = list_head(&zilog->zl_lwb_list);
 1672         if (head_lwb) {
 1673                 ASSERT(head_lwb == list_tail(&zilog->zl_lwb_list));
 1674                 list_remove(&zilog->zl_lwb_list, head_lwb);
 1675                 zio_buf_free(head_lwb->lwb_buf, head_lwb->lwb_sz);
 1676                 kmem_cache_free(zil_lwb_cache, head_lwb);
 1677         }
 1678         list_destroy(&zilog->zl_lwb_list);
 1679 
 1680         avl_destroy(&zilog->zl_vdev_tree);
 1681         mutex_destroy(&zilog->zl_vdev_lock);
 1682 
 1683         ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
 1684         list_destroy(&zilog->zl_itx_commit_list);
 1685 
 1686         for (int i = 0; i < TXG_SIZE; i++) {
 1687                 /*
 1688                  * It's possible for an itx to be generated that doesn't dirty
 1689                  * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
 1690                  * callback to remove the entry. We remove those here.
 1691                  *
 1692                  * Also free up the ziltest itxs.
 1693                  */
 1694                 if (zilog->zl_itxg[i].itxg_itxs)
 1695                         zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
 1696                 mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
 1697         }
 1698 
 1699         mutex_destroy(&zilog->zl_lock);
 1700 
 1701         cv_destroy(&zilog->zl_cv_writer);
 1702         cv_destroy(&zilog->zl_cv_suspend);
 1703         cv_destroy(&zilog->zl_cv_batch[0]);
 1704         cv_destroy(&zilog->zl_cv_batch[1]);
 1705 
 1706         kmem_free(zilog, sizeof (zilog_t));
 1707 }
 1708 
 1709 /*
 1710  * Open an intent log.
 1711  */
 1712 zilog_t *
 1713 zil_open(objset_t *os, zil_get_data_t *get_data)
 1714 {
 1715         zilog_t *zilog = dmu_objset_zil(os);
 1716 
 1717         zilog->zl_get_data = get_data;
 1718         zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
 1719             2, 2, TASKQ_PREPOPULATE);
 1720 
 1721         return (zilog);
 1722 }
 1723 
 1724 /*
 1725  * Close an intent log.
 1726  */
 1727 void
 1728 zil_close(zilog_t *zilog)
 1729 {
 1730         lwb_t *tail_lwb;
 1731         uint64_t txg = 0;
 1732 
 1733         zil_commit(zilog, 0); /* commit all itx */
 1734 
 1735         /*
 1736          * The lwb_max_txg for the stubby lwb will reflect the last activity
 1737          * for the zil.  After a txg_wait_synced() on the txg we know all the
 1738          * callbacks have occurred that may clean the zil.  Only then can we
 1739          * destroy the zl_clean_taskq.
 1740          */
 1741         mutex_enter(&zilog->zl_lock);
 1742         tail_lwb = list_tail(&zilog->zl_lwb_list);
 1743         if (tail_lwb != NULL)
 1744                 txg = tail_lwb->lwb_max_txg;
 1745         mutex_exit(&zilog->zl_lock);
 1746         if (txg)
 1747                 txg_wait_synced(zilog->zl_dmu_pool, txg);
 1748 
 1749         taskq_destroy(zilog->zl_clean_taskq);
 1750         zilog->zl_clean_taskq = NULL;
 1751         zilog->zl_get_data = NULL;
 1752 }
 1753 
 1754 /*
 1755  * Suspend an intent log.  While in suspended mode, we still honor
 1756  * synchronous semantics, but we rely on txg_wait_synced() to do it.
 1757  * We suspend the log briefly when taking a snapshot so that the snapshot
 1758  * contains all the data it's supposed to, and has an empty intent log.
 1759  */
 1760 int
 1761 zil_suspend(zilog_t *zilog)
 1762 {
 1763         const zil_header_t *zh = zilog->zl_header;
 1764 
 1765         mutex_enter(&zilog->zl_lock);
 1766         if (zh->zh_flags & ZIL_REPLAY_NEEDED) {         /* unplayed log */
 1767                 mutex_exit(&zilog->zl_lock);
 1768                 return (EBUSY);
 1769         }
 1770         if (zilog->zl_suspend++ != 0) {
 1771                 /*
 1772                  * Someone else already began a suspend.
 1773                  * Just wait for them to finish.
 1774                  */
 1775                 while (zilog->zl_suspending)
 1776                         cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
 1777                 mutex_exit(&zilog->zl_lock);
 1778                 return (0);
 1779         }
 1780         zilog->zl_suspending = B_TRUE;
 1781         mutex_exit(&zilog->zl_lock);
 1782 
 1783         zil_commit(zilog, 0);
 1784 
 1785         zil_destroy(zilog, B_FALSE);
 1786 
 1787         mutex_enter(&zilog->zl_lock);
 1788         zilog->zl_suspending = B_FALSE;
 1789         cv_broadcast(&zilog->zl_cv_suspend);
 1790         mutex_exit(&zilog->zl_lock);
 1791 
 1792         return (0);
 1793 }
 1794 
 1795 void
 1796 zil_resume(zilog_t *zilog)
 1797 {
 1798         mutex_enter(&zilog->zl_lock);
 1799         ASSERT(zilog->zl_suspend != 0);
 1800         zilog->zl_suspend--;
 1801         mutex_exit(&zilog->zl_lock);
 1802 }
 1803 
 1804 typedef struct zil_replay_arg {
 1805         zil_replay_func_t **zr_replay;
 1806         void            *zr_arg;
 1807         boolean_t       zr_byteswap;
 1808         char            *zr_lr;
 1809 } zil_replay_arg_t;
 1810 
 1811 static int
 1812 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
 1813 {
 1814         char name[MAXNAMELEN];
 1815 
 1816         zilog->zl_replaying_seq--;      /* didn't actually replay this one */
 1817 
 1818         dmu_objset_name(zilog->zl_os, name);
 1819 
 1820         cmn_err(CE_WARN, "ZFS replay transaction error %d, "
 1821             "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
 1822             (u_longlong_t)lr->lrc_seq,
 1823             (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
 1824             (lr->lrc_txtype & TX_CI) ? "CI" : "");
 1825 
 1826         return (error);
 1827 }
 1828 
 1829 static int
 1830 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
 1831 {
 1832         zil_replay_arg_t *zr = zra;
 1833         const zil_header_t *zh = zilog->zl_header;
 1834         uint64_t reclen = lr->lrc_reclen;
 1835         uint64_t txtype = lr->lrc_txtype;
 1836         int error = 0;
 1837 
 1838         zilog->zl_replaying_seq = lr->lrc_seq;
 1839 
 1840         if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
 1841                 return (0);
 1842 
 1843         if (lr->lrc_txg < claim_txg)            /* already committed */
 1844                 return (0);
 1845 
 1846         /* Strip case-insensitive bit, still present in log record */
 1847         txtype &= ~TX_CI;
 1848 
 1849         if (txtype == 0 || txtype >= TX_MAX_TYPE)
 1850                 return (zil_replay_error(zilog, lr, EINVAL));
 1851 
 1852         /*
 1853          * If this record type can be logged out of order, the object
 1854          * (lr_foid) may no longer exist.  That's legitimate, not an error.
 1855          */
 1856         if (TX_OOO(txtype)) {
 1857                 error = dmu_object_info(zilog->zl_os,
 1858                     ((lr_ooo_t *)lr)->lr_foid, NULL);
 1859                 if (error == ENOENT || error == EEXIST)
 1860                         return (0);
 1861         }
 1862 
 1863         /*
 1864          * Make a copy of the data so we can revise and extend it.
 1865          */
 1866         bcopy(lr, zr->zr_lr, reclen);
 1867 
 1868         /*
 1869          * If this is a TX_WRITE with a blkptr, suck in the data.
 1870          */
 1871         if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
 1872                 error = zil_read_log_data(zilog, (lr_write_t *)lr,
 1873                     zr->zr_lr + reclen);
 1874                 if (error)
 1875                         return (zil_replay_error(zilog, lr, error));
 1876         }
 1877 
 1878         /*
 1879          * The log block containing this lr may have been byteswapped
 1880          * so that we can easily examine common fields like lrc_txtype.
 1881          * However, the log is a mix of different record types, and only the
 1882          * replay vectors know how to byteswap their records.  Therefore, if
 1883          * the lr was byteswapped, undo it before invoking the replay vector.
 1884          */
 1885         if (zr->zr_byteswap)
 1886                 byteswap_uint64_array(zr->zr_lr, reclen);
 1887 
 1888         /*
 1889          * We must now do two things atomically: replay this log record,
 1890          * and update the log header sequence number to reflect the fact that
 1891          * we did so. At the end of each replay function the sequence number
 1892          * is updated if we are in replay mode.
 1893          */
 1894         error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
 1895         if (error) {
 1896                 /*
 1897                  * The DMU's dnode layer doesn't see removes until the txg
 1898                  * commits, so a subsequent claim can spuriously fail with
 1899                  * EEXIST. So if we receive any error we try syncing out
 1900                  * any removes then retry the transaction.  Note that we
 1901                  * specify B_FALSE for byteswap now, so we don't do it twice.
 1902                  */
 1903                 txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
 1904                 error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
 1905                 if (error)
 1906                         return (zil_replay_error(zilog, lr, error));
 1907         }
 1908         return (0);
 1909 }
 1910 
 1911 /* ARGSUSED */
 1912 static int
 1913 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
 1914 {
 1915         zilog->zl_replay_blks++;
 1916 
 1917         return (0);
 1918 }
 1919 
 1920 /*
 1921  * If this dataset has a non-empty intent log, replay it and destroy it.
 1922  */
 1923 void
 1924 zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
 1925 {
 1926         zilog_t *zilog = dmu_objset_zil(os);
 1927         const zil_header_t *zh = zilog->zl_header;
 1928         zil_replay_arg_t zr;
 1929 
 1930         if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
 1931                 zil_destroy(zilog, B_TRUE);
 1932                 return;
 1933         }
 1934 
 1935         zr.zr_replay = replay_func;
 1936         zr.zr_arg = arg;
 1937         zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
 1938         zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
 1939 
 1940         /*
 1941          * Wait for in-progress removes to sync before starting replay.
 1942          */
 1943         txg_wait_synced(zilog->zl_dmu_pool, 0);
 1944 
 1945         zilog->zl_replay = B_TRUE;
 1946         zilog->zl_replay_time = ddi_get_lbolt();
 1947         ASSERT(zilog->zl_replay_blks == 0);
 1948         (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
 1949             zh->zh_claim_txg);
 1950         kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
 1951 
 1952         zil_destroy(zilog, B_FALSE);
 1953         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
 1954         zilog->zl_replay = B_FALSE;
 1955 }
 1956 
 1957 boolean_t
 1958 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
 1959 {
 1960         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
 1961                 return (B_TRUE);
 1962 
 1963         if (zilog->zl_replay) {
 1964                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 1965                 zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
 1966                     zilog->zl_replaying_seq;
 1967                 return (B_TRUE);
 1968         }
 1969 
 1970         return (B_FALSE);
 1971 }
 1972 
 1973 /* ARGSUSED */
 1974 int
 1975 zil_vdev_offline(const char *osname, void *arg)
 1976 {
 1977         objset_t *os;
 1978         zilog_t *zilog;
 1979         int error;
 1980 
 1981         error = dmu_objset_hold(osname, FTAG, &os);
 1982         if (error)
 1983                 return (error);
 1984 
 1985         zilog = dmu_objset_zil(os);
 1986         if (zil_suspend(zilog) != 0)
 1987                 error = EEXIST;
 1988         else
 1989                 zil_resume(zilog);
 1990         dmu_objset_rele(os, FTAG);
 1991         return (error);
 1992 }

Cache object: 4b46901a5150b6ab9203d66bc67d5de0


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.