| 
     1 /*
    2  * CDDL HEADER START
    3  *
    4  * The contents of this file are subject to the terms of the
    5  * Common Development and Distribution License (the "License").
    6  * You may not use this file except in compliance with the License.
    7  *
    8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
    9  * or https://opensource.org/licenses/CDDL-1.0.
   10  * See the License for the specific language governing permissions
   11  * and limitations under the License.
   12  *
   13  * When distributing Covered Code, include this CDDL HEADER in each
   14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
   15  * If applicable, add the following below this CDDL HEADER, with the
   16  * fields enclosed by brackets "[]" replaced with your own identifying
   17  * information: Portions Copyright [yyyy] [name of copyright owner]
   18  *
   19  * CDDL HEADER END
   20  */
   21 /*
   22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
   23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
   24  * Copyright (c) 2011, 2020 by Delphix. All rights reserved.
   25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
   26  * Copyright 2014 HybridCluster. All rights reserved.
   27  * Copyright (c) 2018, loli10K <ezomori.nozomu@gmail.com>. All rights reserved.
   28  * Copyright (c) 2019, Klara Inc.
   29  * Copyright (c) 2019, Allan Jude
   30  * Copyright (c) 2019 Datto Inc.
   31  * Copyright (c) 2022 Axcient.
   32  */
   33 
   34 #include <sys/arc.h>
   35 #include <sys/spa_impl.h>
   36 #include <sys/dmu.h>
   37 #include <sys/dmu_impl.h>
   38 #include <sys/dmu_send.h>
   39 #include <sys/dmu_recv.h>
   40 #include <sys/dmu_tx.h>
   41 #include <sys/dbuf.h>
   42 #include <sys/dnode.h>
   43 #include <sys/zfs_context.h>
   44 #include <sys/dmu_objset.h>
   45 #include <sys/dmu_traverse.h>
   46 #include <sys/dsl_dataset.h>
   47 #include <sys/dsl_dir.h>
   48 #include <sys/dsl_prop.h>
   49 #include <sys/dsl_pool.h>
   50 #include <sys/dsl_synctask.h>
   51 #include <sys/zfs_ioctl.h>
   52 #include <sys/zap.h>
   53 #include <sys/zvol.h>
   54 #include <sys/zio_checksum.h>
   55 #include <sys/zfs_znode.h>
   56 #include <zfs_fletcher.h>
   57 #include <sys/avl.h>
   58 #include <sys/ddt.h>
   59 #include <sys/zfs_onexit.h>
   60 #include <sys/dsl_destroy.h>
   61 #include <sys/blkptr.h>
   62 #include <sys/dsl_bookmark.h>
   63 #include <sys/zfeature.h>
   64 #include <sys/bqueue.h>
   65 #include <sys/objlist.h>
   66 #ifdef _KERNEL
   67 #include <sys/zfs_vfsops.h>
   68 #endif
   69 #include <sys/zfs_file.h>
   70 
   71 static uint_t zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
   72 static uint_t zfs_recv_queue_ff = 20;
   73 static uint_t zfs_recv_write_batch_size = 1024 * 1024;
   74 static int zfs_recv_best_effort_corrective = 0;
   75 
   76 static const void *const dmu_recv_tag = "dmu_recv_tag";
   77 const char *const recv_clone_name = "%recv";
   78 
   79 typedef enum {
   80         ORNS_NO,
   81         ORNS_YES,
   82         ORNS_MAYBE
   83 } or_need_sync_t;
   84 
   85 static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len,
   86     void *buf);
   87 
   88 struct receive_record_arg {
   89         dmu_replay_record_t header;
   90         void *payload; /* Pointer to a buffer containing the payload */
   91         /*
   92          * If the record is a WRITE or SPILL, pointer to the abd containing the
   93          * payload.
   94          */
   95         abd_t *abd;
   96         int payload_size;
   97         uint64_t bytes_read; /* bytes read from stream when record created */
   98         boolean_t eos_marker; /* Marks the end of the stream */
   99         bqueue_node_t node;
  100 };
  101 
  102 struct receive_writer_arg {
  103         objset_t *os;
  104         boolean_t byteswap;
  105         bqueue_t q;
  106 
  107         /*
  108          * These three members are used to signal to the main thread when
  109          * we're done.
  110          */
  111         kmutex_t mutex;
  112         kcondvar_t cv;
  113         boolean_t done;
  114 
  115         int err;
  116         const char *tofs;
  117         boolean_t heal;
  118         boolean_t resumable;
  119         boolean_t raw;   /* DMU_BACKUP_FEATURE_RAW set */
  120         boolean_t spill; /* DRR_FLAG_SPILL_BLOCK set */
  121         boolean_t full;  /* this is a full send stream */
  122         uint64_t last_object;
  123         uint64_t last_offset;
  124         uint64_t max_object; /* highest object ID referenced in stream */
  125         uint64_t bytes_read; /* bytes read when current record created */
  126 
  127         list_t write_batch;
  128 
  129         /* Encryption parameters for the last received DRR_OBJECT_RANGE */
  130         boolean_t or_crypt_params_present;
  131         uint64_t or_firstobj;
  132         uint64_t or_numslots;
  133         uint8_t or_salt[ZIO_DATA_SALT_LEN];
  134         uint8_t or_iv[ZIO_DATA_IV_LEN];
  135         uint8_t or_mac[ZIO_DATA_MAC_LEN];
  136         boolean_t or_byteorder;
  137         zio_t *heal_pio;
  138 
  139         /* Keep track of DRR_FREEOBJECTS right after DRR_OBJECT_RANGE */
  140         or_need_sync_t or_need_sync;
  141 };
  142 
  143 typedef struct dmu_recv_begin_arg {
  144         const char *drba_origin;
  145         dmu_recv_cookie_t *drba_cookie;
  146         cred_t *drba_cred;
  147         proc_t *drba_proc;
  148         dsl_crypto_params_t *drba_dcp;
  149 } dmu_recv_begin_arg_t;
  150 
  151 static void
  152 byteswap_record(dmu_replay_record_t *drr)
  153 {
  154 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
  155 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
  156         drr->drr_type = BSWAP_32(drr->drr_type);
  157         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
  158 
  159         switch (drr->drr_type) {
  160         case DRR_BEGIN:
  161                 DO64(drr_begin.drr_magic);
  162                 DO64(drr_begin.drr_versioninfo);
  163                 DO64(drr_begin.drr_creation_time);
  164                 DO32(drr_begin.drr_type);
  165                 DO32(drr_begin.drr_flags);
  166                 DO64(drr_begin.drr_toguid);
  167                 DO64(drr_begin.drr_fromguid);
  168                 break;
  169         case DRR_OBJECT:
  170                 DO64(drr_object.drr_object);
  171                 DO32(drr_object.drr_type);
  172                 DO32(drr_object.drr_bonustype);
  173                 DO32(drr_object.drr_blksz);
  174                 DO32(drr_object.drr_bonuslen);
  175                 DO32(drr_object.drr_raw_bonuslen);
  176                 DO64(drr_object.drr_toguid);
  177                 DO64(drr_object.drr_maxblkid);
  178                 break;
  179         case DRR_FREEOBJECTS:
  180                 DO64(drr_freeobjects.drr_firstobj);
  181                 DO64(drr_freeobjects.drr_numobjs);
  182                 DO64(drr_freeobjects.drr_toguid);
  183                 break;
  184         case DRR_WRITE:
  185                 DO64(drr_write.drr_object);
  186                 DO32(drr_write.drr_type);
  187                 DO64(drr_write.drr_offset);
  188                 DO64(drr_write.drr_logical_size);
  189                 DO64(drr_write.drr_toguid);
  190                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
  191                 DO64(drr_write.drr_key.ddk_prop);
  192                 DO64(drr_write.drr_compressed_size);
  193                 break;
  194         case DRR_WRITE_EMBEDDED:
  195                 DO64(drr_write_embedded.drr_object);
  196                 DO64(drr_write_embedded.drr_offset);
  197                 DO64(drr_write_embedded.drr_length);
  198                 DO64(drr_write_embedded.drr_toguid);
  199                 DO32(drr_write_embedded.drr_lsize);
  200                 DO32(drr_write_embedded.drr_psize);
  201                 break;
  202         case DRR_FREE:
  203                 DO64(drr_free.drr_object);
  204                 DO64(drr_free.drr_offset);
  205                 DO64(drr_free.drr_length);
  206                 DO64(drr_free.drr_toguid);
  207                 break;
  208         case DRR_SPILL:
  209                 DO64(drr_spill.drr_object);
  210                 DO64(drr_spill.drr_length);
  211                 DO64(drr_spill.drr_toguid);
  212                 DO64(drr_spill.drr_compressed_size);
  213                 DO32(drr_spill.drr_type);
  214                 break;
  215         case DRR_OBJECT_RANGE:
  216                 DO64(drr_object_range.drr_firstobj);
  217                 DO64(drr_object_range.drr_numslots);
  218                 DO64(drr_object_range.drr_toguid);
  219                 break;
  220         case DRR_REDACT:
  221                 DO64(drr_redact.drr_object);
  222                 DO64(drr_redact.drr_offset);
  223                 DO64(drr_redact.drr_length);
  224                 DO64(drr_redact.drr_toguid);
  225                 break;
  226         case DRR_END:
  227                 DO64(drr_end.drr_toguid);
  228                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
  229                 break;
  230         default:
  231                 break;
  232         }
  233 
  234         if (drr->drr_type != DRR_BEGIN) {
  235                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
  236         }
  237 
  238 #undef DO64
  239 #undef DO32
  240 }
  241 
  242 static boolean_t
  243 redact_snaps_contains(uint64_t *snaps, uint64_t num_snaps, uint64_t guid)
  244 {
  245         for (int i = 0; i < num_snaps; i++) {
  246                 if (snaps[i] == guid)
  247                         return (B_TRUE);
  248         }
  249         return (B_FALSE);
  250 }
  251 
  252 /*
  253  * Check that the new stream we're trying to receive is redacted with respect to
  254  * a subset of the snapshots that the origin was redacted with respect to.  For
  255  * the reasons behind this, see the man page on redacted zfs sends and receives.
  256  */
  257 static boolean_t
  258 compatible_redact_snaps(uint64_t *origin_snaps, uint64_t origin_num_snaps,
  259     uint64_t *redact_snaps, uint64_t num_redact_snaps)
  260 {
  261         /*
  262          * Short circuit the comparison; if we are redacted with respect to
  263          * more snapshots than the origin, we can't be redacted with respect
  264          * to a subset.
  265          */
  266         if (num_redact_snaps > origin_num_snaps) {
  267                 return (B_FALSE);
  268         }
  269 
  270         for (int i = 0; i < num_redact_snaps; i++) {
  271                 if (!redact_snaps_contains(origin_snaps, origin_num_snaps,
  272                     redact_snaps[i])) {
  273                         return (B_FALSE);
  274                 }
  275         }
  276         return (B_TRUE);
  277 }
  278 
  279 static boolean_t
  280 redact_check(dmu_recv_begin_arg_t *drba, dsl_dataset_t *origin)
  281 {
  282         uint64_t *origin_snaps;
  283         uint64_t origin_num_snaps;
  284         dmu_recv_cookie_t *drc = drba->drba_cookie;
  285         struct drr_begin *drrb = drc->drc_drrb;
  286         int featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
  287         int err = 0;
  288         boolean_t ret = B_TRUE;
  289         uint64_t *redact_snaps;
  290         uint_t numredactsnaps;
  291 
  292         /*
  293          * If this is a full send stream, we're safe no matter what.
  294          */
  295         if (drrb->drr_fromguid == 0)
  296                 return (ret);
  297 
  298         VERIFY(dsl_dataset_get_uint64_array_feature(origin,
  299             SPA_FEATURE_REDACTED_DATASETS, &origin_num_snaps, &origin_snaps));
  300 
  301         if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
  302             BEGINNV_REDACT_FROM_SNAPS, &redact_snaps, &numredactsnaps) ==
  303             0) {
  304                 /*
  305                  * If the send stream was sent from the redaction bookmark or
  306                  * the redacted version of the dataset, then we're safe.  Verify
  307                  * that this is from the a compatible redaction bookmark or
  308                  * redacted dataset.
  309                  */
  310                 if (!compatible_redact_snaps(origin_snaps, origin_num_snaps,
  311                     redact_snaps, numredactsnaps)) {
  312                         err = EINVAL;
  313                 }
  314         } else if (featureflags & DMU_BACKUP_FEATURE_REDACTED) {
  315                 /*
  316                  * If the stream is redacted, it must be redacted with respect
  317                  * to a subset of what the origin is redacted with respect to.
  318                  * See case number 2 in the zfs man page section on redacted zfs
  319                  * send.
  320                  */
  321                 err = nvlist_lookup_uint64_array(drc->drc_begin_nvl,
  322                     BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps);
  323 
  324                 if (err != 0 || !compatible_redact_snaps(origin_snaps,
  325                     origin_num_snaps, redact_snaps, numredactsnaps)) {
  326                         err = EINVAL;
  327                 }
  328         } else if (!redact_snaps_contains(origin_snaps, origin_num_snaps,
  329             drrb->drr_toguid)) {
  330                 /*
  331                  * If the stream isn't redacted but the origin is, this must be
  332                  * one of the snapshots the origin is redacted with respect to.
  333                  * See case number 1 in the zfs man page section on redacted zfs
  334                  * send.
  335                  */
  336                 err = EINVAL;
  337         }
  338 
  339         if (err != 0)
  340                 ret = B_FALSE;
  341         return (ret);
  342 }
  343 
  344 /*
  345  * If we previously received a stream with --large-block, we don't support
  346  * receiving an incremental on top of it without --large-block.  This avoids
  347  * forcing a read-modify-write or trying to re-aggregate a string of WRITE
  348  * records.
  349  */
  350 static int
  351 recv_check_large_blocks(dsl_dataset_t *ds, uint64_t featureflags)
  352 {
  353         if (dsl_dataset_feature_is_active(ds, SPA_FEATURE_LARGE_BLOCKS) &&
  354             !(featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS))
  355                 return (SET_ERROR(ZFS_ERR_STREAM_LARGE_BLOCK_MISMATCH));
  356         return (0);
  357 }
  358 
  359 static int
  360 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
  361     uint64_t fromguid, uint64_t featureflags)
  362 {
  363         uint64_t obj;
  364         uint64_t children;
  365         int error;
  366         dsl_dataset_t *snap;
  367         dsl_pool_t *dp = ds->ds_dir->dd_pool;
  368         boolean_t encrypted = ds->ds_dir->dd_crypto_obj != 0;
  369         boolean_t raw = (featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
  370         boolean_t embed = (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) != 0;
  371 
  372         /* Temporary clone name must not exist. */
  373         error = zap_lookup(dp->dp_meta_objset,
  374             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
  375             8, 1, &obj);
  376         if (error != ENOENT)
  377                 return (error == 0 ? SET_ERROR(EBUSY) : error);
  378 
  379         /* Resume state must not be set. */
  380         if (dsl_dataset_has_resume_receive_state(ds))
  381                 return (SET_ERROR(EBUSY));
  382 
  383         /* New snapshot name must not exist if we're not healing it. */
  384         error = zap_lookup(dp->dp_meta_objset,
  385             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
  386             drba->drba_cookie->drc_tosnap, 8, 1, &obj);
  387         if (drba->drba_cookie->drc_heal) {
  388                 if (error != 0)
  389                         return (error);
  390         } else if (error != ENOENT) {
  391                 return (error == 0 ? SET_ERROR(EEXIST) : error);
  392         }
  393 
  394         /* Must not have children if receiving a ZVOL. */
  395         error = zap_count(dp->dp_meta_objset,
  396             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, &children);
  397         if (error != 0)
  398                 return (error);
  399         if (drba->drba_cookie->drc_drrb->drr_type != DMU_OST_ZFS &&
  400             children > 0)
  401                 return (SET_ERROR(ZFS_ERR_WRONG_PARENT));
  402 
  403         /*
  404          * Check snapshot limit before receiving. We'll recheck again at the
  405          * end, but might as well abort before receiving if we're already over
  406          * the limit.
  407          *
  408          * Note that we do not check the file system limit with
  409          * dsl_dir_fscount_check because the temporary %clones don't count
  410          * against that limit.
  411          */
  412         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
  413             NULL, drba->drba_cred, drba->drba_proc);
  414         if (error != 0)
  415                 return (error);
  416 
  417         if (drba->drba_cookie->drc_heal) {
  418                 /* Encryption is incompatible with embedded data. */
  419                 if (encrypted && embed)
  420                         return (SET_ERROR(EINVAL));
  421 
  422                 /* Healing is not supported when in 'force' mode. */
  423                 if (drba->drba_cookie->drc_force)
  424                         return (SET_ERROR(EINVAL));
  425 
  426                 /* Must have keys loaded if doing encrypted non-raw recv. */
  427                 if (encrypted && !raw) {
  428                         if (spa_keystore_lookup_key(dp->dp_spa, ds->ds_object,
  429                             NULL, NULL) != 0)
  430                                 return (SET_ERROR(EACCES));
  431                 }
  432 
  433                 error = dsl_dataset_hold_obj(dp, obj, FTAG, &snap);
  434                 if (error != 0)
  435                         return (error);
  436 
  437                 /*
  438                  * When not doing best effort corrective recv healing can only
  439                  * be done if the send stream is for the same snapshot as the
  440                  * one we are trying to heal.
  441                  */
  442                 if (zfs_recv_best_effort_corrective == 0 &&
  443                     drba->drba_cookie->drc_drrb->drr_toguid !=
  444                     dsl_dataset_phys(snap)->ds_guid) {
  445                         dsl_dataset_rele(snap, FTAG);
  446                         return (SET_ERROR(ENOTSUP));
  447                 }
  448                 dsl_dataset_rele(snap, FTAG);
  449         } else if (fromguid != 0) {
  450                 /* Sanity check the incremental recv */
  451                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
  452 
  453                 /* Can't perform a raw receive on top of a non-raw receive */
  454                 if (!encrypted && raw)
  455                         return (SET_ERROR(EINVAL));
  456 
  457                 /* Encryption is incompatible with embedded data */
  458                 if (encrypted && embed)
  459                         return (SET_ERROR(EINVAL));
  460 
  461                 /* Find snapshot in this dir that matches fromguid. */
  462                 while (obj != 0) {
  463                         error = dsl_dataset_hold_obj(dp, obj, FTAG,
  464                             &snap);
  465                         if (error != 0)
  466                                 return (SET_ERROR(ENODEV));
  467                         if (snap->ds_dir != ds->ds_dir) {
  468                                 dsl_dataset_rele(snap, FTAG);
  469                                 return (SET_ERROR(ENODEV));
  470                         }
  471                         if (dsl_dataset_phys(snap)->ds_guid == fromguid)
  472                                 break;
  473                         obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
  474                         dsl_dataset_rele(snap, FTAG);
  475                 }
  476                 if (obj == 0)
  477                         return (SET_ERROR(ENODEV));
  478 
  479                 if (drba->drba_cookie->drc_force) {
  480                         drba->drba_cookie->drc_fromsnapobj = obj;
  481                 } else {
  482                         /*
  483                          * If we are not forcing, there must be no
  484                          * changes since fromsnap. Raw sends have an
  485                          * additional constraint that requires that
  486                          * no "noop" snapshots exist between fromsnap
  487                          * and tosnap for the IVset checking code to
  488                          * work properly.
  489                          */
  490                         if (dsl_dataset_modified_since_snap(ds, snap) ||
  491                             (raw &&
  492                             dsl_dataset_phys(ds)->ds_prev_snap_obj !=
  493                             snap->ds_object)) {
  494                                 dsl_dataset_rele(snap, FTAG);
  495                                 return (SET_ERROR(ETXTBSY));
  496                         }
  497                         drba->drba_cookie->drc_fromsnapobj =
  498                             ds->ds_prev->ds_object;
  499                 }
  500 
  501                 if (dsl_dataset_feature_is_active(snap,
  502                     SPA_FEATURE_REDACTED_DATASETS) && !redact_check(drba,
  503                     snap)) {
  504                         dsl_dataset_rele(snap, FTAG);
  505                         return (SET_ERROR(EINVAL));
  506                 }
  507 
  508                 error = recv_check_large_blocks(snap, featureflags);
  509                 if (error != 0) {
  510                         dsl_dataset_rele(snap, FTAG);
  511                         return (error);
  512                 }
  513 
  514                 dsl_dataset_rele(snap, FTAG);
  515         } else {
  516                 /* If full and not healing then must be forced. */
  517                 if (!drba->drba_cookie->drc_force)
  518                         return (SET_ERROR(EEXIST));
  519 
  520                 /*
  521                  * We don't support using zfs recv -F to blow away
  522                  * encrypted filesystems. This would require the
  523                  * dsl dir to point to the old encryption key and
  524                  * the new one at the same time during the receive.
  525                  */
  526                 if ((!encrypted && raw) || encrypted)
  527                         return (SET_ERROR(EINVAL));
  528 
  529                 /*
  530                  * Perform the same encryption checks we would if
  531                  * we were creating a new dataset from scratch.
  532                  */
  533                 if (!raw) {
  534                         boolean_t will_encrypt;
  535 
  536                         error = dmu_objset_create_crypt_check(
  537                             ds->ds_dir->dd_parent, drba->drba_dcp,
  538                             &will_encrypt);
  539                         if (error != 0)
  540                                 return (error);
  541 
  542                         if (will_encrypt && embed)
  543                                 return (SET_ERROR(EINVAL));
  544                 }
  545         }
  546 
  547         return (0);
  548 }
  549 
  550 /*
  551  * Check that any feature flags used in the data stream we're receiving are
  552  * supported by the pool we are receiving into.
  553  *
  554  * Note that some of the features we explicitly check here have additional
  555  * (implicit) features they depend on, but those dependencies are enforced
  556  * through the zfeature_register() calls declaring the features that we
  557  * explicitly check.
  558  */
  559 static int
  560 recv_begin_check_feature_flags_impl(uint64_t featureflags, spa_t *spa)
  561 {
  562         /*
  563          * Check if there are any unsupported feature flags.
  564          */
  565         if (!DMU_STREAM_SUPPORTED(featureflags)) {
  566                 return (SET_ERROR(ZFS_ERR_UNKNOWN_SEND_STREAM_FEATURE));
  567         }
  568 
  569         /* Verify pool version supports SA if SA_SPILL feature set */
  570         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
  571             spa_version(spa) < SPA_VERSION_SA)
  572                 return (SET_ERROR(ENOTSUP));
  573 
  574         /*
  575          * LZ4 compressed, ZSTD compressed, embedded, mooched, large blocks,
  576          * and large_dnodes in the stream can only be used if those pool
  577          * features are enabled because we don't attempt to decompress /
  578          * un-embed / un-mooch / split up the blocks / dnodes during the
  579          * receive process.
  580          */
  581         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
  582             !spa_feature_is_enabled(spa, SPA_FEATURE_LZ4_COMPRESS))
  583                 return (SET_ERROR(ENOTSUP));
  584         if ((featureflags & DMU_BACKUP_FEATURE_ZSTD) &&
  585             !spa_feature_is_enabled(spa, SPA_FEATURE_ZSTD_COMPRESS))
  586                 return (SET_ERROR(ENOTSUP));
  587         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
  588             !spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA))
  589                 return (SET_ERROR(ENOTSUP));
  590         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
  591             !spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_BLOCKS))
  592                 return (SET_ERROR(ENOTSUP));
  593         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
  594             !spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_DNODE))
  595                 return (SET_ERROR(ENOTSUP));
  596 
  597         /*
  598          * Receiving redacted streams requires that redacted datasets are
  599          * enabled.
  600          */
  601         if ((featureflags & DMU_BACKUP_FEATURE_REDACTED) &&
  602             !spa_feature_is_enabled(spa, SPA_FEATURE_REDACTED_DATASETS))
  603                 return (SET_ERROR(ENOTSUP));
  604 
  605         return (0);
  606 }
  607 
  608 static int
  609 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
  610 {
  611         dmu_recv_begin_arg_t *drba = arg;
  612         dsl_pool_t *dp = dmu_tx_pool(tx);
  613         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
  614         uint64_t fromguid = drrb->drr_fromguid;
  615         int flags = drrb->drr_flags;
  616         ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
  617         int error;
  618         uint64_t featureflags = drba->drba_cookie->drc_featureflags;
  619         dsl_dataset_t *ds;
  620         const char *tofs = drba->drba_cookie->drc_tofs;
  621 
  622         /* already checked */
  623         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
  624         ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
  625 
  626         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
  627             DMU_COMPOUNDSTREAM ||
  628             drrb->drr_type >= DMU_OST_NUMTYPES ||
  629             ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
  630                 return (SET_ERROR(EINVAL));
  631 
  632         error = recv_begin_check_feature_flags_impl(featureflags, dp->dp_spa);
  633         if (error != 0)
  634                 return (error);
  635 
  636         /* Resumable receives require extensible datasets */
  637         if (drba->drba_cookie->drc_resumable &&
  638             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
  639                 return (SET_ERROR(ENOTSUP));
  640 
  641         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
  642                 /* raw receives require the encryption feature */
  643                 if (!spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_ENCRYPTION))
  644                         return (SET_ERROR(ENOTSUP));
  645 
  646                 /* embedded data is incompatible with encryption and raw recv */
  647                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
  648                         return (SET_ERROR(EINVAL));
  649 
  650                 /* raw receives require spill block allocation flag */
  651                 if (!(flags & DRR_FLAG_SPILL_BLOCK))
  652                         return (SET_ERROR(ZFS_ERR_SPILL_BLOCK_FLAG_MISSING));
  653         } else {
  654                 /*
  655                  * We support unencrypted datasets below encrypted ones now,
  656                  * so add the DS_HOLD_FLAG_DECRYPT flag only if we are dealing
  657                  * with a dataset we may encrypt.
  658                  */
  659                 if (drba->drba_dcp == NULL ||
  660                     drba->drba_dcp->cp_crypt != ZIO_CRYPT_OFF) {
  661                         dsflags |= DS_HOLD_FLAG_DECRYPT;
  662                 }
  663         }
  664 
  665         error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
  666         if (error == 0) {
  667                 /* target fs already exists; recv into temp clone */
  668 
  669                 /* Can't recv a clone into an existing fs */
  670                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
  671                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
  672                         return (SET_ERROR(EINVAL));
  673                 }
  674 
  675                 error = recv_begin_check_existing_impl(drba, ds, fromguid,
  676                     featureflags);
  677                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
  678         } else if (error == ENOENT) {
  679                 /* target fs does not exist; must be a full backup or clone */
  680                 char buf[ZFS_MAX_DATASET_NAME_LEN];
  681                 objset_t *os;
  682 
  683                 /* healing recv must be done "into" an existing snapshot */
  684                 if (drba->drba_cookie->drc_heal == B_TRUE)
  685                         return (SET_ERROR(ENOTSUP));
  686 
  687                 /*
  688                  * If it's a non-clone incremental, we are missing the
  689                  * target fs, so fail the recv.
  690                  */
  691                 if (fromguid != 0 && !((flags & DRR_FLAG_CLONE) ||
  692                     drba->drba_origin))
  693                         return (SET_ERROR(ENOENT));
  694 
  695                 /*
  696                  * If we're receiving a full send as a clone, and it doesn't
  697                  * contain all the necessary free records and freeobject
  698                  * records, reject it.
  699                  */
  700                 if (fromguid == 0 && drba->drba_origin != NULL &&
  701                     !(flags & DRR_FLAG_FREERECORDS))
  702                         return (SET_ERROR(EINVAL));
  703 
  704                 /* Open the parent of tofs */
  705                 ASSERT3U(strlen(tofs), <, sizeof (buf));
  706                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
  707                 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
  708                 if (error != 0)
  709                         return (error);
  710 
  711                 if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
  712                     drba->drba_origin == NULL) {
  713                         boolean_t will_encrypt;
  714 
  715                         /*
  716                          * Check that we aren't breaking any encryption rules
  717                          * and that we have all the parameters we need to
  718                          * create an encrypted dataset if necessary. If we are
  719                          * making an encrypted dataset the stream can't have
  720                          * embedded data.
  721                          */
  722                         error = dmu_objset_create_crypt_check(ds->ds_dir,
  723                             drba->drba_dcp, &will_encrypt);
  724                         if (error != 0) {
  725                                 dsl_dataset_rele(ds, FTAG);
  726                                 return (error);
  727                         }
  728 
  729                         if (will_encrypt &&
  730                             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
  731                                 dsl_dataset_rele(ds, FTAG);
  732                                 return (SET_ERROR(EINVAL));
  733                         }
  734                 }
  735 
  736                 /*
  737                  * Check filesystem and snapshot limits before receiving. We'll
  738                  * recheck snapshot limits again at the end (we create the
  739                  * filesystems and increment those counts during begin_sync).
  740                  */
  741                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
  742                     ZFS_PROP_FILESYSTEM_LIMIT, NULL,
  743                     drba->drba_cred, drba->drba_proc);
  744                 if (error != 0) {
  745                         dsl_dataset_rele(ds, FTAG);
  746                         return (error);
  747                 }
  748 
  749                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
  750                     ZFS_PROP_SNAPSHOT_LIMIT, NULL,
  751                     drba->drba_cred, drba->drba_proc);
  752                 if (error != 0) {
  753                         dsl_dataset_rele(ds, FTAG);
  754                         return (error);
  755                 }
  756 
  757                 /* can't recv below anything but filesystems (eg. no ZVOLs) */
  758                 error = dmu_objset_from_ds(ds, &os);
  759                 if (error != 0) {
  760                         dsl_dataset_rele(ds, FTAG);
  761                         return (error);
  762                 }
  763                 if (dmu_objset_type(os) != DMU_OST_ZFS) {
  764                         dsl_dataset_rele(ds, FTAG);
  765                         return (SET_ERROR(ZFS_ERR_WRONG_PARENT));
  766                 }
  767 
  768                 if (drba->drba_origin != NULL) {
  769                         dsl_dataset_t *origin;
  770                         error = dsl_dataset_hold_flags(dp, drba->drba_origin,
  771                             dsflags, FTAG, &origin);
  772                         if (error != 0) {
  773                                 dsl_dataset_rele(ds, FTAG);
  774                                 return (error);
  775                         }
  776                         if (!origin->ds_is_snapshot) {
  777                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
  778                                 dsl_dataset_rele(ds, FTAG);
  779                                 return (SET_ERROR(EINVAL));
  780                         }
  781                         if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
  782                             fromguid != 0) {
  783                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
  784                                 dsl_dataset_rele(ds, FTAG);
  785                                 return (SET_ERROR(ENODEV));
  786                         }
  787 
  788                         if (origin->ds_dir->dd_crypto_obj != 0 &&
  789                             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
  790                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
  791                                 dsl_dataset_rele(ds, FTAG);
  792                                 return (SET_ERROR(EINVAL));
  793                         }
  794 
  795                         /*
  796                          * If the origin is redacted we need to verify that this
  797                          * send stream can safely be received on top of the
  798                          * origin.
  799                          */
  800                         if (dsl_dataset_feature_is_active(origin,
  801                             SPA_FEATURE_REDACTED_DATASETS)) {
  802                                 if (!redact_check(drba, origin)) {
  803                                         dsl_dataset_rele_flags(origin, dsflags,
  804                                             FTAG);
  805                                         dsl_dataset_rele_flags(ds, dsflags,
  806                                             FTAG);
  807                                         return (SET_ERROR(EINVAL));
  808                                 }
  809                         }
  810 
  811                         error = recv_check_large_blocks(ds, featureflags);
  812                         if (error != 0) {
  813                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
  814                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
  815                                 return (error);
  816                         }
  817 
  818                         dsl_dataset_rele_flags(origin, dsflags, FTAG);
  819                 }
  820 
  821                 dsl_dataset_rele(ds, FTAG);
  822                 error = 0;
  823         }
  824         return (error);
  825 }
  826 
  827 static void
  828 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
  829 {
  830         dmu_recv_begin_arg_t *drba = arg;
  831         dsl_pool_t *dp = dmu_tx_pool(tx);
  832         objset_t *mos = dp->dp_meta_objset;
  833         dmu_recv_cookie_t *drc = drba->drba_cookie;
  834         struct drr_begin *drrb = drc->drc_drrb;
  835         const char *tofs = drc->drc_tofs;
  836         uint64_t featureflags = drc->drc_featureflags;
  837         dsl_dataset_t *ds, *newds;
  838         objset_t *os;
  839         uint64_t dsobj;
  840         ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
  841         int error;
  842         uint64_t crflags = 0;
  843         dsl_crypto_params_t dummy_dcp = { 0 };
  844         dsl_crypto_params_t *dcp = drba->drba_dcp;
  845 
  846         if (drrb->drr_flags & DRR_FLAG_CI_DATA)
  847                 crflags |= DS_FLAG_CI_DATASET;
  848 
  849         if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
  850                 dsflags |= DS_HOLD_FLAG_DECRYPT;
  851 
  852         /*
  853          * Raw, non-incremental recvs always use a dummy dcp with
  854          * the raw cmd set. Raw incremental recvs do not use a dcp
  855          * since the encryption parameters are already set in stone.
  856          */
  857         if (dcp == NULL && drrb->drr_fromguid == 0 &&
  858             drba->drba_origin == NULL) {
  859                 ASSERT3P(dcp, ==, NULL);
  860                 dcp = &dummy_dcp;
  861 
  862                 if (featureflags & DMU_BACKUP_FEATURE_RAW)
  863                         dcp->cp_cmd = DCP_CMD_RAW_RECV;
  864         }
  865 
  866         error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
  867         if (error == 0) {
  868                 /* Create temporary clone unless we're doing corrective recv */
  869                 dsl_dataset_t *snap = NULL;
  870 
  871                 if (drba->drba_cookie->drc_fromsnapobj != 0) {
  872                         VERIFY0(dsl_dataset_hold_obj(dp,
  873                             drba->drba_cookie->drc_fromsnapobj, FTAG, &snap));
  874                         ASSERT3P(dcp, ==, NULL);
  875                 }
  876                 if (drc->drc_heal) {
  877                         /* When healing we want to use the provided snapshot */
  878                         VERIFY0(dsl_dataset_snap_lookup(ds, drc->drc_tosnap,
  879                             &dsobj));
  880                 } else {
  881                         dsobj = dsl_dataset_create_sync(ds->ds_dir,
  882                             recv_clone_name, snap, crflags, drba->drba_cred,
  883                             dcp, tx);
  884                 }
  885                 if (drba->drba_cookie->drc_fromsnapobj != 0)
  886                         dsl_dataset_rele(snap, FTAG);
  887                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
  888         } else {
  889                 dsl_dir_t *dd;
  890                 const char *tail;
  891                 dsl_dataset_t *origin = NULL;
  892 
  893                 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
  894 
  895                 if (drba->drba_origin != NULL) {
  896                         VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
  897                             FTAG, &origin));
  898                         ASSERT3P(dcp, ==, NULL);
  899                 }
  900 
  901                 /* Create new dataset. */
  902                 dsobj = dsl_dataset_create_sync(dd, strrchr(tofs, '/') + 1,
  903                     origin, crflags, drba->drba_cred, dcp, tx);
  904                 if (origin != NULL)
  905                         dsl_dataset_rele(origin, FTAG);
  906                 dsl_dir_rele(dd, FTAG);
  907                 drc->drc_newfs = B_TRUE;
  908         }
  909         VERIFY0(dsl_dataset_own_obj_force(dp, dsobj, dsflags, dmu_recv_tag,
  910             &newds));
  911         if (dsl_dataset_feature_is_active(newds,
  912             SPA_FEATURE_REDACTED_DATASETS)) {
  913                 /*
  914                  * If the origin dataset is redacted, the child will be redacted
  915                  * when we create it.  We clear the new dataset's
  916                  * redaction info; if it should be redacted, we'll fill
  917                  * in its information later.
  918                  */
  919                 dsl_dataset_deactivate_feature(newds,
  920                     SPA_FEATURE_REDACTED_DATASETS, tx);
  921         }
  922         VERIFY0(dmu_objset_from_ds(newds, &os));
  923 
  924         if (drc->drc_resumable) {
  925                 dsl_dataset_zapify(newds, tx);
  926                 if (drrb->drr_fromguid != 0) {
  927                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
  928                             8, 1, &drrb->drr_fromguid, tx));
  929                 }
  930                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
  931                     8, 1, &drrb->drr_toguid, tx));
  932                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
  933                     1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
  934                 uint64_t one = 1;
  935                 uint64_t zero = 0;
  936                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
  937                     8, 1, &one, tx));
  938                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
  939                     8, 1, &zero, tx));
  940                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
  941                     8, 1, &zero, tx));
  942                 if (featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) {
  943                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK,
  944                             8, 1, &one, tx));
  945                 }
  946                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) {
  947                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
  948                             8, 1, &one, tx));
  949                 }
  950                 if (featureflags & DMU_BACKUP_FEATURE_COMPRESSED) {
  951                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK,
  952                             8, 1, &one, tx));
  953                 }
  954                 if (featureflags & DMU_BACKUP_FEATURE_RAW) {
  955                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_RAWOK,
  956                             8, 1, &one, tx));
  957                 }
  958 
  959                 uint64_t *redact_snaps;
  960                 uint_t numredactsnaps;
  961                 if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
  962                     BEGINNV_REDACT_FROM_SNAPS, &redact_snaps,
  963                     &numredactsnaps) == 0) {
  964                         VERIFY0(zap_add(mos, dsobj,
  965                             DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS,
  966                             sizeof (*redact_snaps), numredactsnaps,
  967                             redact_snaps, tx));
  968                 }
  969         }
  970 
  971         /*
  972          * Usually the os->os_encrypted value is tied to the presence of a
  973          * DSL Crypto Key object in the dd. However, that will not be received
  974          * until dmu_recv_stream(), so we set the value manually for now.
  975          */
  976         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
  977                 os->os_encrypted = B_TRUE;
  978                 drba->drba_cookie->drc_raw = B_TRUE;
  979         }
  980 
  981         if (featureflags & DMU_BACKUP_FEATURE_REDACTED) {
  982                 uint64_t *redact_snaps;
  983                 uint_t numredactsnaps;
  984                 VERIFY0(nvlist_lookup_uint64_array(drc->drc_begin_nvl,
  985                     BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps));
  986                 dsl_dataset_activate_redaction(newds, redact_snaps,
  987                     numredactsnaps, tx);
  988         }
  989 
  990         dmu_buf_will_dirty(newds->ds_dbuf, tx);
  991         dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
  992 
  993         /*
  994          * If we actually created a non-clone, we need to create the objset
  995          * in our new dataset. If this is a raw send we postpone this until
  996          * dmu_recv_stream() so that we can allocate the metadnode with the
  997          * properties from the DRR_BEGIN payload.
  998          */
  999         rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
 1000         if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds)) &&
 1001             (featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
 1002             !drc->drc_heal) {
 1003                 (void) dmu_objset_create_impl(dp->dp_spa,
 1004                     newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
 1005         }
 1006         rrw_exit(&newds->ds_bp_rwlock, FTAG);
 1007 
 1008         drba->drba_cookie->drc_ds = newds;
 1009         drba->drba_cookie->drc_os = os;
 1010 
 1011         spa_history_log_internal_ds(newds, "receive", tx, " ");
 1012 }
 1013 
 1014 static int
 1015 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
 1016 {
 1017         dmu_recv_begin_arg_t *drba = arg;
 1018         dmu_recv_cookie_t *drc = drba->drba_cookie;
 1019         dsl_pool_t *dp = dmu_tx_pool(tx);
 1020         struct drr_begin *drrb = drc->drc_drrb;
 1021         int error;
 1022         ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
 1023         dsl_dataset_t *ds;
 1024         const char *tofs = drc->drc_tofs;
 1025 
 1026         /* already checked */
 1027         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
 1028         ASSERT(drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING);
 1029 
 1030         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
 1031             DMU_COMPOUNDSTREAM ||
 1032             drrb->drr_type >= DMU_OST_NUMTYPES)
 1033                 return (SET_ERROR(EINVAL));
 1034 
 1035         /*
 1036          * This is mostly a sanity check since we should have already done these
 1037          * checks during a previous attempt to receive the data.
 1038          */
 1039         error = recv_begin_check_feature_flags_impl(drc->drc_featureflags,
 1040             dp->dp_spa);
 1041         if (error != 0)
 1042                 return (error);
 1043 
 1044         /* 6 extra bytes for /%recv */
 1045         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
 1046 
 1047         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
 1048             tofs, recv_clone_name);
 1049 
 1050         if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) {
 1051                 /* raw receives require spill block allocation flag */
 1052                 if (!(drrb->drr_flags & DRR_FLAG_SPILL_BLOCK))
 1053                         return (SET_ERROR(ZFS_ERR_SPILL_BLOCK_FLAG_MISSING));
 1054         } else {
 1055                 dsflags |= DS_HOLD_FLAG_DECRYPT;
 1056         }
 1057 
 1058         boolean_t recvexist = B_TRUE;
 1059         if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
 1060                 /* %recv does not exist; continue in tofs */
 1061                 recvexist = B_FALSE;
 1062                 error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
 1063                 if (error != 0)
 1064                         return (error);
 1065         }
 1066 
 1067         /*
 1068          * Resume of full/newfs recv on existing dataset should be done with
 1069          * force flag
 1070          */
 1071         if (recvexist && drrb->drr_fromguid == 0 && !drc->drc_force) {
 1072                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1073                 return (SET_ERROR(ZFS_ERR_RESUME_EXISTS));
 1074         }
 1075 
 1076         /* check that ds is marked inconsistent */
 1077         if (!DS_IS_INCONSISTENT(ds)) {
 1078                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1079                 return (SET_ERROR(EINVAL));
 1080         }
 1081 
 1082         /* check that there is resuming data, and that the toguid matches */
 1083         if (!dsl_dataset_is_zapified(ds)) {
 1084                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1085                 return (SET_ERROR(EINVAL));
 1086         }
 1087         uint64_t val;
 1088         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
 1089             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
 1090         if (error != 0 || drrb->drr_toguid != val) {
 1091                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1092                 return (SET_ERROR(EINVAL));
 1093         }
 1094 
 1095         /*
 1096          * Check if the receive is still running.  If so, it will be owned.
 1097          * Note that nothing else can own the dataset (e.g. after the receive
 1098          * fails) because it will be marked inconsistent.
 1099          */
 1100         if (dsl_dataset_has_owner(ds)) {
 1101                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1102                 return (SET_ERROR(EBUSY));
 1103         }
 1104 
 1105         /* There should not be any snapshots of this fs yet. */
 1106         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
 1107                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1108                 return (SET_ERROR(EINVAL));
 1109         }
 1110 
 1111         /*
 1112          * Note: resume point will be checked when we process the first WRITE
 1113          * record.
 1114          */
 1115 
 1116         /* check that the origin matches */
 1117         val = 0;
 1118         (void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
 1119             DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
 1120         if (drrb->drr_fromguid != val) {
 1121                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1122                 return (SET_ERROR(EINVAL));
 1123         }
 1124 
 1125         if (ds->ds_prev != NULL && drrb->drr_fromguid != 0)
 1126                 drc->drc_fromsnapobj = ds->ds_prev->ds_object;
 1127 
 1128         /*
 1129          * If we're resuming, and the send is redacted, then the original send
 1130          * must have been redacted, and must have been redacted with respect to
 1131          * the same snapshots.
 1132          */
 1133         if (drc->drc_featureflags & DMU_BACKUP_FEATURE_REDACTED) {
 1134                 uint64_t num_ds_redact_snaps;
 1135                 uint64_t *ds_redact_snaps;
 1136 
 1137                 uint_t num_stream_redact_snaps;
 1138                 uint64_t *stream_redact_snaps;
 1139 
 1140                 if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
 1141                     BEGINNV_REDACT_SNAPS, &stream_redact_snaps,
 1142                     &num_stream_redact_snaps) != 0) {
 1143                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1144                         return (SET_ERROR(EINVAL));
 1145                 }
 1146 
 1147                 if (!dsl_dataset_get_uint64_array_feature(ds,
 1148                     SPA_FEATURE_REDACTED_DATASETS, &num_ds_redact_snaps,
 1149                     &ds_redact_snaps)) {
 1150                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1151                         return (SET_ERROR(EINVAL));
 1152                 }
 1153 
 1154                 for (int i = 0; i < num_ds_redact_snaps; i++) {
 1155                         if (!redact_snaps_contains(ds_redact_snaps,
 1156                             num_ds_redact_snaps, stream_redact_snaps[i])) {
 1157                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1158                                 return (SET_ERROR(EINVAL));
 1159                         }
 1160                 }
 1161         }
 1162 
 1163         error = recv_check_large_blocks(ds, drc->drc_featureflags);
 1164         if (error != 0) {
 1165                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1166                 return (error);
 1167         }
 1168 
 1169         dsl_dataset_rele_flags(ds, dsflags, FTAG);
 1170         return (0);
 1171 }
 1172 
 1173 static void
 1174 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
 1175 {
 1176         dmu_recv_begin_arg_t *drba = arg;
 1177         dsl_pool_t *dp = dmu_tx_pool(tx);
 1178         const char *tofs = drba->drba_cookie->drc_tofs;
 1179         uint64_t featureflags = drba->drba_cookie->drc_featureflags;
 1180         dsl_dataset_t *ds;
 1181         ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
 1182         /* 6 extra bytes for /%recv */
 1183         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
 1184 
 1185         (void) snprintf(recvname, sizeof (recvname), "%s/%s", tofs,
 1186             recv_clone_name);
 1187 
 1188         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
 1189                 drba->drba_cookie->drc_raw = B_TRUE;
 1190         } else {
 1191                 dsflags |= DS_HOLD_FLAG_DECRYPT;
 1192         }
 1193 
 1194         if (dsl_dataset_own_force(dp, recvname, dsflags, dmu_recv_tag, &ds)
 1195             != 0) {
 1196                 /* %recv does not exist; continue in tofs */
 1197                 VERIFY0(dsl_dataset_own_force(dp, tofs, dsflags, dmu_recv_tag,
 1198                     &ds));
 1199                 drba->drba_cookie->drc_newfs = B_TRUE;
 1200         }
 1201 
 1202         ASSERT(DS_IS_INCONSISTENT(ds));
 1203         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
 1204         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)) ||
 1205             drba->drba_cookie->drc_raw);
 1206         rrw_exit(&ds->ds_bp_rwlock, FTAG);
 1207 
 1208         drba->drba_cookie->drc_ds = ds;
 1209         VERIFY0(dmu_objset_from_ds(ds, &drba->drba_cookie->drc_os));
 1210         drba->drba_cookie->drc_should_save = B_TRUE;
 1211 
 1212         spa_history_log_internal_ds(ds, "resume receive", tx, " ");
 1213 }
 1214 
 1215 /*
 1216  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
 1217  * succeeds; otherwise we will leak the holds on the datasets.
 1218  */
 1219 int
 1220 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
 1221     boolean_t force, boolean_t heal, boolean_t resumable, nvlist_t *localprops,
 1222     nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc,
 1223     zfs_file_t *fp, offset_t *voffp)
 1224 {
 1225         dmu_recv_begin_arg_t drba = { 0 };
 1226         int err;
 1227 
 1228         memset(drc, 0, sizeof (dmu_recv_cookie_t));
 1229         drc->drc_drr_begin = drr_begin;
 1230         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
 1231         drc->drc_tosnap = tosnap;
 1232         drc->drc_tofs = tofs;
 1233         drc->drc_force = force;
 1234         drc->drc_heal = heal;
 1235         drc->drc_resumable = resumable;
 1236         drc->drc_cred = CRED();
 1237         drc->drc_proc = curproc;
 1238         drc->drc_clone = (origin != NULL);
 1239 
 1240         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
 1241                 drc->drc_byteswap = B_TRUE;
 1242                 (void) fletcher_4_incremental_byteswap(drr_begin,
 1243                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
 1244                 byteswap_record(drr_begin);
 1245         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
 1246                 (void) fletcher_4_incremental_native(drr_begin,
 1247                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
 1248         } else {
 1249                 return (SET_ERROR(EINVAL));
 1250         }
 1251 
 1252         drc->drc_fp = fp;
 1253         drc->drc_voff = *voffp;
 1254         drc->drc_featureflags =
 1255             DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
 1256 
 1257         uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
 1258         void *payload = NULL;
 1259 
 1260         /*
 1261          * Since OpenZFS 2.0.0, we have enforced a 64MB limit in userspace
 1262          * configurable via ZFS_SENDRECV_MAX_NVLIST. We enforce 256MB as a hard
 1263          * upper limit. Systems with less than 1GB of RAM will see a lower
 1264          * limit from `arc_all_memory() / 4`.
 1265          */
 1266         if (payloadlen > (MIN((1U << 28), arc_all_memory() / 4)))
 1267                 return (E2BIG);
 1268 
 1269         if (payloadlen != 0)
 1270                 payload = vmem_alloc(payloadlen, KM_SLEEP);
 1271 
 1272         err = receive_read_payload_and_next_header(drc, payloadlen,
 1273             payload);
 1274         if (err != 0) {
 1275                 vmem_free(payload, payloadlen);
 1276                 return (err);
 1277         }
 1278         if (payloadlen != 0) {
 1279                 err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl,
 1280                     KM_SLEEP);
 1281                 vmem_free(payload, payloadlen);
 1282                 if (err != 0) {
 1283                         kmem_free(drc->drc_next_rrd,
 1284                             sizeof (*drc->drc_next_rrd));
 1285                         return (err);
 1286                 }
 1287         }
 1288 
 1289         if (drc->drc_drrb->drr_flags & DRR_FLAG_SPILL_BLOCK)
 1290                 drc->drc_spill = B_TRUE;
 1291 
 1292         drba.drba_origin = origin;
 1293         drba.drba_cookie = drc;
 1294         drba.drba_cred = CRED();
 1295         drba.drba_proc = curproc;
 1296 
 1297         if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) {
 1298                 err = dsl_sync_task(tofs,
 1299                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
 1300                     &drba, 5, ZFS_SPACE_CHECK_NORMAL);
 1301         } else {
 1302                 /*
 1303                  * For non-raw, non-incremental, non-resuming receives the
 1304                  * user can specify encryption parameters on the command line
 1305                  * with "zfs recv -o". For these receives we create a dcp and
 1306                  * pass it to the sync task. Creating the dcp will implicitly
 1307                  * remove the encryption params from the localprops nvlist,
 1308                  * which avoids errors when trying to set these normally
 1309                  * read-only properties. Any other kind of receive that
 1310                  * attempts to set these properties will fail as a result.
 1311                  */
 1312                 if ((DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
 1313                     DMU_BACKUP_FEATURE_RAW) == 0 &&
 1314                     origin == NULL && drc->drc_drrb->drr_fromguid == 0) {
 1315                         err = dsl_crypto_params_create_nvlist(DCP_CMD_NONE,
 1316                             localprops, hidden_args, &drba.drba_dcp);
 1317                 }
 1318 
 1319                 if (err == 0) {
 1320                         err = dsl_sync_task(tofs,
 1321                             dmu_recv_begin_check, dmu_recv_begin_sync,
 1322                             &drba, 5, ZFS_SPACE_CHECK_NORMAL);
 1323                         dsl_crypto_params_free(drba.drba_dcp, !!err);
 1324                 }
 1325         }
 1326 
 1327         if (err != 0) {
 1328                 kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
 1329                 nvlist_free(drc->drc_begin_nvl);
 1330         }
 1331         return (err);
 1332 }
 1333 
 1334 /*
 1335  * Holds data need for corrective recv callback
 1336  */
 1337 typedef struct cr_cb_data {
 1338         uint64_t size;
 1339         zbookmark_phys_t zb;
 1340         spa_t *spa;
 1341 } cr_cb_data_t;
 1342 
 1343 static void
 1344 corrective_read_done(zio_t *zio)
 1345 {
 1346         cr_cb_data_t *data = zio->io_private;
 1347         /* Corruption corrected; update error log if needed */
 1348         if (zio->io_error == 0)
 1349                 spa_remove_error(data->spa, &data->zb);
 1350         kmem_free(data, sizeof (cr_cb_data_t));
 1351         abd_free(zio->io_abd);
 1352 }
 1353 
 1354 /*
 1355  * zio_rewrite the data pointed to by bp with the data from the rrd's abd.
 1356  */
 1357 static int
 1358 do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
 1359     struct receive_record_arg *rrd, blkptr_t *bp)
 1360 {
 1361         int err;
 1362         zio_t *io;
 1363         zbookmark_phys_t zb;
 1364         dnode_t *dn;
 1365         abd_t *abd = rrd->abd;
 1366         zio_cksum_t bp_cksum = bp->blk_cksum;
 1367         zio_flag_t flags = ZIO_FLAG_SPECULATIVE |
 1368             ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_CANFAIL;
 1369 
 1370         if (rwa->raw)
 1371                 flags |= ZIO_FLAG_RAW;
 1372 
 1373         err = dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn);
 1374         if (err != 0)
 1375                 return (err);
 1376         SET_BOOKMARK(&zb, dmu_objset_id(rwa->os), drrw->drr_object, 0,
 1377             dbuf_whichblock(dn, 0, drrw->drr_offset));
 1378         dnode_rele(dn, FTAG);
 1379 
 1380         if (!rwa->raw && DRR_WRITE_COMPRESSED(drrw)) {
 1381                 /* Decompress the stream data */
 1382                 abd_t *dabd = abd_alloc_linear(
 1383                     drrw->drr_logical_size, B_FALSE);
 1384                 err = zio_decompress_data(drrw->drr_compressiontype,
 1385                     abd, abd_to_buf(dabd), abd_get_size(abd),
 1386                     abd_get_size(dabd), NULL);
 1387 
 1388                 if (err != 0) {
 1389                         abd_free(dabd);
 1390                         return (err);
 1391                 }
 1392                 /* Swap in the newly decompressed data into the abd */
 1393                 abd_free(abd);
 1394                 abd = dabd;
 1395         }
 1396 
 1397         if (!rwa->raw && BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF) {
 1398                 /* Recompress the data */
 1399                 abd_t *cabd = abd_alloc_linear(BP_GET_PSIZE(bp),
 1400                     B_FALSE);
 1401                 uint64_t csize = zio_compress_data(BP_GET_COMPRESS(bp),
 1402                     abd, abd_to_buf(cabd), abd_get_size(abd),
 1403                     rwa->os->os_complevel);
 1404                 abd_zero_off(cabd, csize, BP_GET_PSIZE(bp) - csize);
 1405                 /* Swap in newly compressed data into the abd */
 1406                 abd_free(abd);
 1407                 abd = cabd;
 1408                 flags |= ZIO_FLAG_RAW_COMPRESS;
 1409         }
 1410 
 1411         /*
 1412          * The stream is not encrypted but the data on-disk is.
 1413          * We need to re-encrypt the buf using the same
 1414          * encryption type, salt, iv, and mac that was used to encrypt
 1415          * the block previosly.
 1416          */
 1417         if (!rwa->raw && BP_USES_CRYPT(bp)) {
 1418                 dsl_dataset_t *ds;
 1419                 dsl_crypto_key_t *dck = NULL;
 1420                 uint8_t salt[ZIO_DATA_SALT_LEN];
 1421                 uint8_t iv[ZIO_DATA_IV_LEN];
 1422                 uint8_t mac[ZIO_DATA_MAC_LEN];
 1423                 boolean_t no_crypt = B_FALSE;
 1424                 dsl_pool_t *dp = dmu_objset_pool(rwa->os);
 1425                 abd_t *eabd = abd_alloc_linear(BP_GET_PSIZE(bp), B_FALSE);
 1426 
 1427                 zio_crypt_decode_params_bp(bp, salt, iv);
 1428                 zio_crypt_decode_mac_bp(bp, mac);
 1429 
 1430                 dsl_pool_config_enter(dp, FTAG);
 1431                 err = dsl_dataset_hold_flags(dp, rwa->tofs,
 1432                     DS_HOLD_FLAG_DECRYPT, FTAG, &ds);
 1433                 if (err != 0) {
 1434                         dsl_pool_config_exit(dp, FTAG);
 1435                         abd_free(eabd);
 1436                         return (SET_ERROR(EACCES));
 1437                 }
 1438 
 1439                 /* Look up the key from the spa's keystore */
 1440                 err = spa_keystore_lookup_key(rwa->os->os_spa,
 1441                     zb.zb_objset, FTAG, &dck);
 1442                 if (err != 0) {
 1443                         dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT,
 1444                             FTAG);
 1445                         dsl_pool_config_exit(dp, FTAG);
 1446                         abd_free(eabd);
 1447                         return (SET_ERROR(EACCES));
 1448                 }
 1449 
 1450                 err = zio_do_crypt_abd(B_TRUE, &dck->dck_key,
 1451                     BP_GET_TYPE(bp), BP_SHOULD_BYTESWAP(bp), salt, iv,
 1452                     mac, abd_get_size(abd), abd, eabd, &no_crypt);
 1453 
 1454                 spa_keystore_dsl_key_rele(rwa->os->os_spa, dck, FTAG);
 1455                 dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT, FTAG);
 1456                 dsl_pool_config_exit(dp, FTAG);
 1457 
 1458                 ASSERT0(no_crypt);
 1459                 if (err != 0) {
 1460                         abd_free(eabd);
 1461                         return (err);
 1462                 }
 1463                 /* Swap in the newly encrypted data into the abd */
 1464                 abd_free(abd);
 1465                 abd = eabd;
 1466 
 1467                 /*
 1468                  * We want to prevent zio_rewrite() from trying to
 1469                  * encrypt the data again
 1470                  */
 1471                 flags |= ZIO_FLAG_RAW_ENCRYPT;
 1472         }
 1473         rrd->abd = abd;
 1474 
 1475         io = zio_rewrite(NULL, rwa->os->os_spa, bp->blk_birth, bp, abd,
 1476             BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags, &zb);
 1477 
 1478         ASSERT(abd_get_size(abd) == BP_GET_LSIZE(bp) ||
 1479             abd_get_size(abd) == BP_GET_PSIZE(bp));
 1480 
 1481         /* compute new bp checksum value and make sure it matches the old one */
 1482         zio_checksum_compute(io, BP_GET_CHECKSUM(bp), abd, abd_get_size(abd));
 1483         if (!ZIO_CHECKSUM_EQUAL(bp_cksum, io->io_bp->blk_cksum)) {
 1484                 zio_destroy(io);
 1485                 if (zfs_recv_best_effort_corrective != 0)
 1486                         return (0);
 1487                 return (SET_ERROR(ECKSUM));
 1488         }
 1489 
 1490         /* Correct the corruption in place */
 1491         err = zio_wait(io);
 1492         if (err == 0) {
 1493                 cr_cb_data_t *cb_data =
 1494                     kmem_alloc(sizeof (cr_cb_data_t), KM_SLEEP);
 1495                 cb_data->spa = rwa->os->os_spa;
 1496                 cb_data->size = drrw->drr_logical_size;
 1497                 cb_data->zb = zb;
 1498                 /* Test if healing worked by re-reading the bp */
 1499                 err = zio_wait(zio_read(rwa->heal_pio, rwa->os->os_spa, bp,
 1500                     abd_alloc_for_io(drrw->drr_logical_size, B_FALSE),
 1501                     drrw->drr_logical_size, corrective_read_done,
 1502                     cb_data, ZIO_PRIORITY_ASYNC_READ, flags, NULL));
 1503         }
 1504         if (err != 0 && zfs_recv_best_effort_corrective != 0)
 1505                 err = 0;
 1506 
 1507         return (err);
 1508 }
 1509 
 1510 static int
 1511 receive_read(dmu_recv_cookie_t *drc, int len, void *buf)
 1512 {
 1513         int done = 0;
 1514 
 1515         /*
 1516          * The code doesn't rely on this (lengths being multiples of 8).  See
 1517          * comment in dump_bytes.
 1518          */
 1519         ASSERT(len % 8 == 0 ||
 1520             (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
 1521 
 1522         while (done < len) {
 1523                 ssize_t resid = len - done;
 1524                 zfs_file_t *fp = drc->drc_fp;
 1525                 int err = zfs_file_read(fp, (char *)buf + done,
 1526                     len - done, &resid);
 1527                 if (err == 0 && resid == len - done) {
 1528                         /*
 1529                          * Note: ECKSUM or ZFS_ERR_STREAM_TRUNCATED indicates
 1530                          * that the receive was interrupted and can
 1531                          * potentially be resumed.
 1532                          */
 1533                         err = SET_ERROR(ZFS_ERR_STREAM_TRUNCATED);
 1534                 }
 1535                 drc->drc_voff += len - done - resid;
 1536                 done = len - resid;
 1537                 if (err != 0)
 1538                         return (err);
 1539         }
 1540 
 1541         drc->drc_bytes_read += len;
 1542 
 1543         ASSERT3U(done, ==, len);
 1544         return (0);
 1545 }
 1546 
 1547 static inline uint8_t
 1548 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
 1549 {
 1550         if (bonus_type == DMU_OT_SA) {
 1551                 return (1);
 1552         } else {
 1553                 return (1 +
 1554                     ((DN_OLD_MAX_BONUSLEN -
 1555                     MIN(DN_OLD_MAX_BONUSLEN, bonus_size)) >> SPA_BLKPTRSHIFT));
 1556         }
 1557 }
 1558 
 1559 static void
 1560 save_resume_state(struct receive_writer_arg *rwa,
 1561     uint64_t object, uint64_t offset, dmu_tx_t *tx)
 1562 {
 1563         int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
 1564 
 1565         if (!rwa->resumable)
 1566                 return;
 1567 
 1568         /*
 1569          * We use ds_resume_bytes[] != 0 to indicate that we need to
 1570          * update this on disk, so it must not be 0.
 1571          */
 1572         ASSERT(rwa->bytes_read != 0);
 1573 
 1574         /*
 1575          * We only resume from write records, which have a valid
 1576          * (non-meta-dnode) object number.
 1577          */
 1578         ASSERT(object != 0);
 1579 
 1580         /*
 1581          * For resuming to work correctly, we must receive records in order,
 1582          * sorted by object,offset.  This is checked by the callers, but
 1583          * assert it here for good measure.
 1584          */
 1585         ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
 1586         ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
 1587             offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
 1588         ASSERT3U(rwa->bytes_read, >=,
 1589             rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
 1590 
 1591         rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
 1592         rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
 1593         rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
 1594 }
 1595 
 1596 static int
 1597 receive_object_is_same_generation(objset_t *os, uint64_t object,
 1598     dmu_object_type_t old_bonus_type, dmu_object_type_t new_bonus_type,
 1599     const void *new_bonus, boolean_t *samegenp)
 1600 {
 1601         zfs_file_info_t zoi;
 1602         int err;
 1603 
 1604         dmu_buf_t *old_bonus_dbuf;
 1605         err = dmu_bonus_hold(os, object, FTAG, &old_bonus_dbuf);
 1606         if (err != 0)
 1607                 return (err);
 1608         err = dmu_get_file_info(os, old_bonus_type, old_bonus_dbuf->db_data,
 1609             &zoi);
 1610         dmu_buf_rele(old_bonus_dbuf, FTAG);
 1611         if (err != 0)
 1612                 return (err);
 1613         uint64_t old_gen = zoi.zfi_generation;
 1614 
 1615         err = dmu_get_file_info(os, new_bonus_type, new_bonus, &zoi);
 1616         if (err != 0)
 1617                 return (err);
 1618         uint64_t new_gen = zoi.zfi_generation;
 1619 
 1620         *samegenp = (old_gen == new_gen);
 1621         return (0);
 1622 }
 1623 
 1624 static int
 1625 receive_handle_existing_object(const struct receive_writer_arg *rwa,
 1626     const struct drr_object *drro, const dmu_object_info_t *doi,
 1627     const void *bonus_data,
 1628     uint64_t *object_to_hold, uint32_t *new_blksz)
 1629 {
 1630         uint32_t indblksz = drro->drr_indblkshift ?
 1631             1ULL << drro->drr_indblkshift : 0;
 1632         int nblkptr = deduce_nblkptr(drro->drr_bonustype,
 1633             drro->drr_bonuslen);
 1634         uint8_t dn_slots = drro->drr_dn_slots != 0 ?
 1635             drro->drr_dn_slots : DNODE_MIN_SLOTS;
 1636         boolean_t do_free_range = B_FALSE;
 1637         int err;
 1638 
 1639         *object_to_hold = drro->drr_object;
 1640 
 1641         /* nblkptr should be bounded by the bonus size and type */
 1642         if (rwa->raw && nblkptr != drro->drr_nblkptr)
 1643                 return (SET_ERROR(EINVAL));
 1644 
 1645         /*
 1646          * After the previous send stream, the sending system may
 1647          * have freed this object, and then happened to re-allocate
 1648          * this object number in a later txg. In this case, we are
 1649          * receiving a different logical file, and the block size may
 1650          * appear to be different.  i.e. we may have a different
 1651          * block size for this object than what the send stream says.
 1652          * In this case we need to remove the object's contents,
 1653          * so that its structure can be changed and then its contents
 1654          * entirely replaced by subsequent WRITE records.
 1655          *
 1656          * If this is a -L (--large-block) incremental stream, and
 1657          * the previous stream was not -L, the block size may appear
 1658          * to increase.  i.e. we may have a smaller block size for
 1659          * this object than what the send stream says.  In this case
 1660          * we need to keep the object's contents and block size
 1661          * intact, so that we don't lose parts of the object's
 1662          * contents that are not changed by this incremental send
 1663          * stream.
 1664          *
 1665          * We can distinguish between the two above cases by using
 1666          * the ZPL's generation number (see
 1667          * receive_object_is_same_generation()).  However, we only
 1668          * want to rely on the generation number when absolutely
 1669          * necessary, because with raw receives, the generation is
 1670          * encrypted.  We also want to minimize dependence on the
 1671          * ZPL, so that other types of datasets can also be received
 1672          * (e.g. ZVOLs, although note that ZVOLS currently do not
 1673          * reallocate their objects or change their structure).
 1674          * Therefore, we check a number of different cases where we
 1675          * know it is safe to discard the object's contents, before
 1676          * using the ZPL's generation number to make the above
 1677          * distinction.
 1678          */
 1679         if (drro->drr_blksz != doi->doi_data_block_size) {
 1680                 if (rwa->raw) {
 1681                         /*
 1682                          * RAW streams always have large blocks, so
 1683                          * we are sure that the data is not needed
 1684                          * due to changing --large-block to be on.
 1685                          * Which is fortunate since the bonus buffer
 1686                          * (which contains the ZPL generation) is
 1687                          * encrypted, and the key might not be
 1688                          * loaded.
 1689                          */
 1690                         do_free_range = B_TRUE;
 1691                 } else if (rwa->full) {
 1692                         /*
 1693                          * This is a full send stream, so it always
 1694                          * replaces what we have.  Even if the
 1695                          * generation numbers happen to match, this
 1696                          * can not actually be the same logical file.
 1697                          * This is relevant when receiving a full
 1698                          * send as a clone.
 1699                          */
 1700                         do_free_range = B_TRUE;
 1701                 } else if (drro->drr_type !=
 1702                     DMU_OT_PLAIN_FILE_CONTENTS ||
 1703                     doi->doi_type != DMU_OT_PLAIN_FILE_CONTENTS) {
 1704                         /*
 1705                          * PLAIN_FILE_CONTENTS are the only type of
 1706                          * objects that have ever been stored with
 1707                          * large blocks, so we don't need the special
 1708                          * logic below.  ZAP blocks can shrink (when
 1709                          * there's only one block), so we don't want
 1710                          * to hit the error below about block size
 1711                          * only increasing.
 1712                          */
 1713                         do_free_range = B_TRUE;
 1714                 } else if (doi->doi_max_offset <=
 1715                     doi->doi_data_block_size) {
 1716                         /*
 1717                          * There is only one block.  We can free it,
 1718                          * because its contents will be replaced by a
 1719                          * WRITE record.  This can not be the no-L ->
 1720                          * -L case, because the no-L case would have
 1721                          * resulted in multiple blocks.  If we
 1722                          * supported -L -> no-L, it would not be safe
 1723                          * to free the file's contents.  Fortunately,
 1724                          * that is not allowed (see
 1725                          * recv_check_large_blocks()).
 1726                          */
 1727                         do_free_range = B_TRUE;
 1728                 } else {
 1729                         boolean_t is_same_gen;
 1730                         err = receive_object_is_same_generation(rwa->os,
 1731                             drro->drr_object, doi->doi_bonus_type,
 1732                             drro->drr_bonustype, bonus_data, &is_same_gen);
 1733                         if (err != 0)
 1734                                 return (SET_ERROR(EINVAL));
 1735 
 1736                         if (is_same_gen) {
 1737                                 /*
 1738                                  * This is the same logical file, and
 1739                                  * the block size must be increasing.
 1740                                  * It could only decrease if
 1741                                  * --large-block was changed to be
 1742                                  * off, which is checked in
 1743                                  * recv_check_large_blocks().
 1744                                  */
 1745                                 if (drro->drr_blksz <=
 1746                                     doi->doi_data_block_size)
 1747                                         return (SET_ERROR(EINVAL));
 1748                                 /*
 1749                                  * We keep the existing blocksize and
 1750                                  * contents.
 1751                                  */
 1752                                 *new_blksz =
 1753                                     doi->doi_data_block_size;
 1754                         } else {
 1755                                 do_free_range = B_TRUE;
 1756                         }
 1757                 }
 1758         }
 1759 
 1760         /* nblkptr can only decrease if the object was reallocated */
 1761         if (nblkptr < doi->doi_nblkptr)
 1762                 do_free_range = B_TRUE;
 1763 
 1764         /* number of slots can only change on reallocation */
 1765         if (dn_slots != doi->doi_dnodesize >> DNODE_SHIFT)
 1766                 do_free_range = B_TRUE;
 1767 
 1768         /*
 1769          * For raw sends we also check a few other fields to
 1770          * ensure we are preserving the objset structure exactly
 1771          * as it was on the receive side:
 1772          *     - A changed indirect block size
 1773          *     - A smaller nlevels
 1774          */
 1775         if (rwa->raw) {
 1776                 if (indblksz != doi->doi_metadata_block_size)
 1777                         do_free_range = B_TRUE;
 1778                 if (drro->drr_nlevels < doi->doi_indirection)
 1779                         do_free_range = B_TRUE;
 1780         }
 1781 
 1782         if (do_free_range) {
 1783                 err = dmu_free_long_range(rwa->os, drro->drr_object,
 1784                     0, DMU_OBJECT_END);
 1785                 if (err != 0)
 1786                         return (SET_ERROR(EINVAL));
 1787         }
 1788 
 1789         /*
 1790          * The dmu does not currently support decreasing nlevels
 1791          * or changing the number of dnode slots on an object. For
 1792          * non-raw sends, this does not matter and the new object
 1793          * can just use the previous one's nlevels. For raw sends,
 1794          * however, the structure of the received dnode (including
 1795          * nlevels and dnode slots) must match that of the send
 1796          * side. Therefore, instead of using dmu_object_reclaim(),
 1797          * we must free the object completely and call
 1798          * dmu_object_claim_dnsize() instead.
 1799          */
 1800         if ((rwa->raw && drro->drr_nlevels < doi->doi_indirection) ||
 1801             dn_slots != doi->doi_dnodesize >> DNODE_SHIFT) {
 1802                 err = dmu_free_long_object(rwa->os, drro->drr_object);
 1803                 if (err != 0)
 1804                         return (SET_ERROR(EINVAL));
 1805 
 1806                 txg_wait_synced(dmu_objset_pool(rwa->os), 0);
 1807                 *object_to_hold = DMU_NEW_OBJECT;
 1808         }
 1809 
 1810         /*
 1811          * For raw receives, free everything beyond the new incoming
 1812          * maxblkid. Normally this would be done with a DRR_FREE
 1813          * record that would come after this DRR_OBJECT record is
 1814          * processed. However, for raw receives we manually set the
 1815          * maxblkid from the drr_maxblkid and so we must first free
 1816          * everything above that blkid to ensure the DMU is always
 1817          * consistent with itself. We will never free the first block
 1818          * of the object here because a maxblkid of 0 could indicate
 1819          * an object with a single block or one with no blocks. This
 1820          * free may be skipped when dmu_free_long_range() was called
 1821          * above since it covers the entire object's contents.
 1822          */
 1823         if (rwa->raw && *object_to_hold != DMU_NEW_OBJECT && !do_free_range) {
 1824                 err = dmu_free_long_range(rwa->os, drro->drr_object,
 1825                     (drro->drr_maxblkid + 1) * doi->doi_data_block_size,
 1826                     DMU_OBJECT_END);
 1827                 if (err != 0)
 1828                         return (SET_ERROR(EINVAL));
 1829         }
 1830         return (0);
 1831 }
 1832 
 1833 noinline static int
 1834 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
 1835     void *data)
 1836 {
 1837         dmu_object_info_t doi;
 1838         dmu_tx_t *tx;
 1839         int err;
 1840         uint32_t new_blksz = drro->drr_blksz;
 1841         uint8_t dn_slots = drro->drr_dn_slots != 0 ?
 1842             drro->drr_dn_slots : DNODE_MIN_SLOTS;
 1843 
 1844         if (drro->drr_type == DMU_OT_NONE ||
 1845             !DMU_OT_IS_VALID(drro->drr_type) ||
 1846             !DMU_OT_IS_VALID(drro->drr_bonustype) ||
 1847             drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
 1848             drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
 1849             P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
 1850             drro->drr_blksz < SPA_MINBLOCKSIZE ||
 1851             drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
 1852             drro->drr_bonuslen >
 1853             DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os))) ||
 1854             dn_slots >
 1855             (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) {
 1856                 return (SET_ERROR(EINVAL));
 1857         }
 1858 
 1859         if (rwa->raw) {
 1860                 /*
 1861                  * We should have received a DRR_OBJECT_RANGE record
 1862                  * containing this block and stored it in rwa.
 1863                  */
 1864                 if (drro->drr_object < rwa->or_firstobj ||
 1865                     drro->drr_object >= rwa->or_firstobj + rwa->or_numslots ||
 1866                     drro->drr_raw_bonuslen < drro->drr_bonuslen ||
 1867                     drro->drr_indblkshift > SPA_MAXBLOCKSHIFT ||
 1868                     drro->drr_nlevels > DN_MAX_LEVELS ||
 1869                     drro->drr_nblkptr > DN_MAX_NBLKPTR ||
 1870                     DN_SLOTS_TO_BONUSLEN(dn_slots) <
 1871                     drro->drr_raw_bonuslen)
 1872                         return (SET_ERROR(EINVAL));
 1873         } else {
 1874                 /*
 1875                  * The DRR_OBJECT_SPILL flag is valid when the DRR_BEGIN
 1876                  * record indicates this by setting DRR_FLAG_SPILL_BLOCK.
 1877                  */
 1878                 if (((drro->drr_flags & ~(DRR_OBJECT_SPILL))) ||
 1879                     (!rwa->spill && DRR_OBJECT_HAS_SPILL(drro->drr_flags))) {
 1880                         return (SET_ERROR(EINVAL));
 1881                 }
 1882 
 1883                 if (drro->drr_raw_bonuslen != 0 || drro->drr_nblkptr != 0 ||
 1884                     drro->drr_indblkshift != 0 || drro->drr_nlevels != 0) {
 1885                         return (SET_ERROR(EINVAL));
 1886                 }
 1887         }
 1888 
 1889         err = dmu_object_info(rwa->os, drro->drr_object, &doi);
 1890 
 1891         if (err != 0 && err != ENOENT && err != EEXIST)
 1892                 return (SET_ERROR(EINVAL));
 1893 
 1894         if (drro->drr_object > rwa->max_object)
 1895                 rwa->max_object = drro->drr_object;
 1896 
 1897         /*
 1898          * If we are losing blkptrs or changing the block size this must
 1899          * be a new file instance.  We must clear out the previous file
 1900          * contents before we can change this type of metadata in the dnode.
 1901          * Raw receives will also check that the indirect structure of the
 1902          * dnode hasn't changed.
 1903          */
 1904         uint64_t object_to_hold;
 1905         if (err == 0) {
 1906                 err = receive_handle_existing_object(rwa, drro, &doi, data,
 1907                     &object_to_hold, &new_blksz);
 1908                 if (err != 0)
 1909                         return (err);
 1910         } else if (err == EEXIST) {
 1911                 /*
 1912                  * The object requested is currently an interior slot of a
 1913                  * multi-slot dnode. This will be resolved when the next txg
 1914                  * is synced out, since the send stream will have told us
 1915                  * to free this slot when we freed the associated dnode
 1916                  * earlier in the stream.
 1917                  */
 1918                 txg_wait_synced(dmu_objset_pool(rwa->os), 0);
 1919 
 1920                 if (dmu_object_info(rwa->os, drro->drr_object, NULL) != ENOENT)
 1921                         return (SET_ERROR(EINVAL));
 1922 
 1923                 /* object was freed and we are about to allocate a new one */
 1924                 object_to_hold = DMU_NEW_OBJECT;
 1925         } else {
 1926                 /*
 1927                  * If the only record in this range so far was DRR_FREEOBJECTS
 1928                  * with at least one actually freed object, it's possible that
 1929                  * the block will now be converted to a hole. We need to wait
 1930                  * for the txg to sync to prevent races.
 1931                  */
 1932                 if (rwa->or_need_sync == ORNS_YES)
 1933                         txg_wait_synced(dmu_objset_pool(rwa->os), 0);
 1934 
 1935                 /* object is free and we are about to allocate a new one */
 1936                 object_to_hold = DMU_NEW_OBJECT;
 1937         }
 1938 
 1939         /* Only relevant for the first object in the range */
 1940         rwa->or_need_sync = ORNS_NO;
 1941 
 1942         /*
 1943          * If this is a multi-slot dnode there is a chance that this
 1944          * object will expand into a slot that is already used by
 1945          * another object from the previous snapshot. We must free
 1946          * these objects before we attempt to allocate the new dnode.
 1947          */
 1948         if (dn_slots > 1) {
 1949                 boolean_t need_sync = B_FALSE;
 1950 
 1951                 for (uint64_t slot = drro->drr_object + 1;
 1952                     slot < drro->drr_object + dn_slots;
 1953                     slot++) {
 1954                         dmu_object_info_t slot_doi;
 1955 
 1956                         err = dmu_object_info(rwa->os, slot, &slot_doi);
 1957                         if (err == ENOENT || err == EEXIST)
 1958                                 continue;
 1959                         else if (err != 0)
 1960                                 return (err);
 1961 
 1962                         err = dmu_free_long_object(rwa->os, slot);
 1963                         if (err != 0)
 1964                                 return (err);
 1965 
 1966                         need_sync = B_TRUE;
 1967                 }
 1968 
 1969                 if (need_sync)
 1970                         txg_wait_synced(dmu_objset_pool(rwa->os), 0);
 1971         }
 1972 
 1973         tx = dmu_tx_create(rwa->os);
 1974         dmu_tx_hold_bonus(tx, object_to_hold);
 1975         dmu_tx_hold_write(tx, object_to_hold, 0, 0);
 1976         err = dmu_tx_assign(tx, TXG_WAIT);
 1977         if (err != 0) {
 1978                 dmu_tx_abort(tx);
 1979                 return (err);
 1980         }
 1981 
 1982         if (object_to_hold == DMU_NEW_OBJECT) {
 1983                 /* Currently free, wants to be allocated */
 1984                 err = dmu_object_claim_dnsize(rwa->os, drro->drr_object,
 1985                     drro->drr_type, new_blksz,
 1986                     drro->drr_bonustype, drro->drr_bonuslen,
 1987                     dn_slots << DNODE_SHIFT, tx);
 1988         } else if (drro->drr_type != doi.doi_type ||
 1989             new_blksz != doi.doi_data_block_size ||
 1990             drro->drr_bonustype != doi.doi_bonus_type ||
 1991             drro->drr_bonuslen != doi.doi_bonus_size) {
 1992                 /* Currently allocated, but with different properties */
 1993                 err = dmu_object_reclaim_dnsize(rwa->os, drro->drr_object,
 1994                     drro->drr_type, new_blksz,
 1995                     drro->drr_bonustype, drro->drr_bonuslen,
 1996                     dn_slots << DNODE_SHIFT, rwa->spill ?
 1997                     DRR_OBJECT_HAS_SPILL(drro->drr_flags) : B_FALSE, tx);
 1998         } else if (rwa->spill && !DRR_OBJECT_HAS_SPILL(drro->drr_flags)) {
 1999                 /*
 2000                  * Currently allocated, the existing version of this object
 2001                  * may reference a spill block that is no longer allocated
 2002                  * at the source and needs to be freed.
 2003                  */
 2004                 err = dmu_object_rm_spill(rwa->os, drro->drr_object, tx);
 2005         }
 2006 
 2007         if (err != 0) {
 2008                 dmu_tx_commit(tx);
 2009                 return (SET_ERROR(EINVAL));
 2010         }
 2011 
 2012         if (rwa->or_crypt_params_present) {
 2013                 /*
 2014                  * Set the crypt params for the buffer associated with this
 2015                  * range of dnodes.  This causes the blkptr_t to have the
 2016                  * same crypt params (byteorder, salt, iv, mac) as on the
 2017                  * sending side.
 2018                  *
 2019                  * Since we are committing this tx now, it is possible for
 2020                  * the dnode block to end up on-disk with the incorrect MAC,
 2021                  * if subsequent objects in this block are received in a
 2022                  * different txg.  However, since the dataset is marked as
 2023                  * inconsistent, no code paths will do a non-raw read (or
 2024                  * decrypt the block / verify the MAC). The receive code and
 2025                  * scrub code can safely do raw reads and verify the
 2026                  * checksum.  They don't need to verify the MAC.
 2027                  */
 2028                 dmu_buf_t *db = NULL;
 2029                 uint64_t offset = rwa->or_firstobj * DNODE_MIN_SIZE;
 2030 
 2031                 err = dmu_buf_hold_by_dnode(DMU_META_DNODE(rwa->os),
 2032                     offset, FTAG, &db, DMU_READ_PREFETCH | DMU_READ_NO_DECRYPT);
 2033                 if (err != 0) {
 2034                         dmu_tx_commit(tx);
 2035                         return (SET_ERROR(EINVAL));
 2036                 }
 2037 
 2038                 dmu_buf_set_crypt_params(db, rwa->or_byteorder,
 2039                     rwa->or_salt, rwa->or_iv, rwa->or_mac, tx);
 2040 
 2041                 dmu_buf_rele(db, FTAG);
 2042 
 2043                 rwa->or_crypt_params_present = B_FALSE;
 2044         }
 2045 
 2046         dmu_object_set_checksum(rwa->os, drro->drr_object,
 2047             drro->drr_checksumtype, tx);
 2048         dmu_object_set_compress(rwa->os, drro->drr_object,
 2049             drro->drr_compress, tx);
 2050 
 2051         /* handle more restrictive dnode structuring for raw recvs */
 2052         if (rwa->raw) {
 2053                 /*
 2054                  * Set the indirect block size, block shift, nlevels.
 2055                  * This will not fail because we ensured all of the
 2056                  * blocks were freed earlier if this is a new object.
 2057                  * For non-new objects block size and indirect block
 2058                  * shift cannot change and nlevels can only increase.
 2059                  */
 2060                 ASSERT3U(new_blksz, ==, drro->drr_blksz);
 2061                 VERIFY0(dmu_object_set_blocksize(rwa->os, drro->drr_object,
 2062                     drro->drr_blksz, drro->drr_indblkshift, tx));
 2063                 VERIFY0(dmu_object_set_nlevels(rwa->os, drro->drr_object,
 2064                     drro->drr_nlevels, tx));
 2065 
 2066                 /*
 2067                  * Set the maxblkid. This will always succeed because
 2068                  * we freed all blocks beyond the new maxblkid above.
 2069                  */
 2070                 VERIFY0(dmu_object_set_maxblkid(rwa->os, drro->drr_object,
 2071                     drro->drr_maxblkid, tx));
 2072         }
 2073 
 2074         if (data != NULL) {
 2075                 dmu_buf_t *db;
 2076                 dnode_t *dn;
 2077                 uint32_t flags = DMU_READ_NO_PREFETCH;
 2078 
 2079                 if (rwa->raw)
 2080                         flags |= DMU_READ_NO_DECRYPT;
 2081 
 2082                 VERIFY0(dnode_hold(rwa->os, drro->drr_object, FTAG, &dn));
 2083                 VERIFY0(dmu_bonus_hold_by_dnode(dn, FTAG, &db, flags));
 2084 
 2085                 dmu_buf_will_dirty(db, tx);
 2086 
 2087                 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
 2088                 memcpy(db->db_data, data, DRR_OBJECT_PAYLOAD_SIZE(drro));
 2089 
 2090                 /*
 2091                  * Raw bonus buffers have their byteorder determined by the
 2092                  * DRR_OBJECT_RANGE record.
 2093                  */
 2094                 if (rwa->byteswap && !rwa->raw) {
 2095                         dmu_object_byteswap_t byteswap =
 2096                             DMU_OT_BYTESWAP(drro->drr_bonustype);
 2097                         dmu_ot_byteswap[byteswap].ob_func(db->db_data,
 2098                             DRR_OBJECT_PAYLOAD_SIZE(drro));
 2099                 }
 2100                 dmu_buf_rele(db, FTAG);
 2101                 dnode_rele(dn, FTAG);
 2102         }
 2103         dmu_tx_commit(tx);
 2104 
 2105         return (0);
 2106 }
 2107 
 2108 noinline static int
 2109 receive_freeobjects(struct receive_writer_arg *rwa,
 2110     struct drr_freeobjects *drrfo)
 2111 {
 2112         uint64_t obj;
 2113         int next_err = 0;
 2114 
 2115         if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
 2116                 return (SET_ERROR(EINVAL));
 2117 
 2118         for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj;
 2119             obj < drrfo->drr_firstobj + drrfo->drr_numobjs &&
 2120             obj < DN_MAX_OBJECT && next_err == 0;
 2121             next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
 2122                 dmu_object_info_t doi;
 2123                 int err;
 2124 
 2125                 err = dmu_object_info(rwa->os, obj, &doi);
 2126                 if (err == ENOENT)
 2127                         continue;
 2128                 else if (err != 0)
 2129                         return (err);
 2130 
 2131                 err = dmu_free_long_object(rwa->os, obj);
 2132 
 2133                 if (err != 0)
 2134                         return (err);
 2135 
 2136                 if (rwa->or_need_sync == ORNS_MAYBE)
 2137                         rwa->or_need_sync = ORNS_YES;
 2138         }
 2139         if (next_err != ESRCH)
 2140                 return (next_err);
 2141         return (0);
 2142 }
 2143 
 2144 /*
 2145  * Note: if this fails, the caller will clean up any records left on the
 2146  * rwa->write_batch list.
 2147  */
 2148 static int
 2149 flush_write_batch_impl(struct receive_writer_arg *rwa)
 2150 {
 2151         dnode_t *dn;
 2152         int err;
 2153 
 2154         if (dnode_hold(rwa->os, rwa->last_object, FTAG, &dn) != 0)
 2155                 return (SET_ERROR(EINVAL));
 2156 
 2157         struct receive_record_arg *last_rrd = list_tail(&rwa->write_batch);
 2158         struct drr_write *last_drrw = &last_rrd->header.drr_u.drr_write;
 2159 
 2160         struct receive_record_arg *first_rrd = list_head(&rwa->write_batch);
 2161         struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write;
 2162 
 2163         ASSERT3U(rwa->last_object, ==, last_drrw->drr_object);
 2164         ASSERT3U(rwa->last_offset, ==, last_drrw->drr_offset);
 2165 
 2166         dmu_tx_t *tx = dmu_tx_create(rwa->os);
 2167         dmu_tx_hold_write_by_dnode(tx, dn, first_drrw->drr_offset,
 2168             last_drrw->drr_offset - first_drrw->drr_offset +
 2169             last_drrw->drr_logical_size);
 2170         err = dmu_tx_assign(tx, TXG_WAIT);
 2171         if (err != 0) {
 2172                 dmu_tx_abort(tx);
 2173                 dnode_rele(dn, FTAG);
 2174                 return (err);
 2175         }
 2176 
 2177         struct receive_record_arg *rrd;
 2178         while ((rrd = list_head(&rwa->write_batch)) != NULL) {
 2179                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
 2180                 abd_t *abd = rrd->abd;
 2181 
 2182                 ASSERT3U(drrw->drr_object, ==, rwa->last_object);
 2183 
 2184                 if (drrw->drr_logical_size != dn->dn_datablksz) {
 2185                         /*
 2186                          * The WRITE record is larger than the object's block
 2187                          * size.  We must be receiving an incremental
 2188                          * large-block stream into a dataset that previously did
 2189                          * a non-large-block receive.  Lightweight writes must
 2190                          * be exactly one block, so we need to decompress the
 2191                          * data (if compressed) and do a normal dmu_write().
 2192                          */
 2193                         ASSERT3U(drrw->drr_logical_size, >, dn->dn_datablksz);
 2194                         if (DRR_WRITE_COMPRESSED(drrw)) {
 2195                                 abd_t *decomp_abd =
 2196                                     abd_alloc_linear(drrw->drr_logical_size,
 2197                                     B_FALSE);
 2198 
 2199                                 err = zio_decompress_data(
 2200                                     drrw->drr_compressiontype,
 2201                                     abd, abd_to_buf(decomp_abd),
 2202                                     abd_get_size(abd),
 2203                                     abd_get_size(decomp_abd), NULL);
 2204 
 2205                                 if (err == 0) {
 2206                                         dmu_write_by_dnode(dn,
 2207                                             drrw->drr_offset,
 2208                                             drrw->drr_logical_size,
 2209                                             abd_to_buf(decomp_abd), tx);
 2210                                 }
 2211                                 abd_free(decomp_abd);
 2212                         } else {
 2213                                 dmu_write_by_dnode(dn,
 2214                                     drrw->drr_offset,
 2215                                     drrw->drr_logical_size,
 2216                                     abd_to_buf(abd), tx);
 2217                         }
 2218                         if (err == 0)
 2219                                 abd_free(abd);
 2220                 } else {
 2221                         zio_prop_t zp;
 2222                         dmu_write_policy(rwa->os, dn, 0, 0, &zp);
 2223 
 2224                         zio_flag_t zio_flags = 0;
 2225 
 2226                         if (rwa->raw) {
 2227                                 zp.zp_encrypt = B_TRUE;
 2228                                 zp.zp_compress = drrw->drr_compressiontype;
 2229                                 zp.zp_byteorder = ZFS_HOST_BYTEORDER ^
 2230                                     !!DRR_IS_RAW_BYTESWAPPED(drrw->drr_flags) ^
 2231                                     rwa->byteswap;
 2232                                 memcpy(zp.zp_salt, drrw->drr_salt,
 2233                                     ZIO_DATA_SALT_LEN);
 2234                                 memcpy(zp.zp_iv, drrw->drr_iv,
 2235                                     ZIO_DATA_IV_LEN);
 2236                                 memcpy(zp.zp_mac, drrw->drr_mac,
 2237                                     ZIO_DATA_MAC_LEN);
 2238                                 if (DMU_OT_IS_ENCRYPTED(zp.zp_type)) {
 2239                                         zp.zp_nopwrite = B_FALSE;
 2240                                         zp.zp_copies = MIN(zp.zp_copies,
 2241                                             SPA_DVAS_PER_BP - 1);
 2242                                 }
 2243                                 zio_flags |= ZIO_FLAG_RAW;
 2244                         } else if (DRR_WRITE_COMPRESSED(drrw)) {
 2245                                 ASSERT3U(drrw->drr_compressed_size, >, 0);
 2246                                 ASSERT3U(drrw->drr_logical_size, >=,
 2247                                     drrw->drr_compressed_size);
 2248                                 zp.zp_compress = drrw->drr_compressiontype;
 2249                                 zio_flags |= ZIO_FLAG_RAW_COMPRESS;
 2250                         } else if (rwa->byteswap) {
 2251                                 /*
 2252                                  * Note: compressed blocks never need to be
 2253                                  * byteswapped, because WRITE records for
 2254                                  * metadata blocks are never compressed. The
 2255                                  * exception is raw streams, which are written
 2256                                  * in the original byteorder, and the byteorder
 2257                                  * bit is preserved in the BP by setting
 2258                                  * zp_byteorder above.
 2259                                  */
 2260                                 dmu_object_byteswap_t byteswap =
 2261                                     DMU_OT_BYTESWAP(drrw->drr_type);
 2262                                 dmu_ot_byteswap[byteswap].ob_func(
 2263                                     abd_to_buf(abd),
 2264                                     DRR_WRITE_PAYLOAD_SIZE(drrw));
 2265                         }
 2266 
 2267                         /*
 2268                          * Since this data can't be read until the receive
 2269                          * completes, we can do a "lightweight" write for
 2270                          * improved performance.
 2271                          */
 2272                         err = dmu_lightweight_write_by_dnode(dn,
 2273                             drrw->drr_offset, abd, &zp, zio_flags, tx);
 2274                 }
 2275 
 2276                 if (err != 0) {
 2277                         /*
 2278                          * This rrd is left on the list, so the caller will
 2279                          * free it (and the abd).
 2280                          */
 2281                         break;
 2282                 }
 2283 
 2284                 /*
 2285                  * Note: If the receive fails, we want the resume stream to
 2286                  * start with the same record that we last successfully
 2287                  * received (as opposed to the next record), so that we can
 2288                  * verify that we are resuming from the correct location.
 2289                  */
 2290                 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
 2291 
 2292                 list_remove(&rwa->write_batch, rrd);
 2293                 kmem_free(rrd, sizeof (*rrd));
 2294         }
 2295 
 2296         dmu_tx_commit(tx);
 2297         dnode_rele(dn, FTAG);
 2298         return (err);
 2299 }
 2300 
 2301 noinline static int
 2302 flush_write_batch(struct receive_writer_arg *rwa)
 2303 {
 2304         if (list_is_empty(&rwa->write_batch))
 2305                 return (0);
 2306         int err = rwa->err;
 2307         if (err == 0)
 2308                 err = flush_write_batch_impl(rwa);
 2309         if (err != 0) {
 2310                 struct receive_record_arg *rrd;
 2311                 while ((rrd = list_remove_head(&rwa->write_batch)) != NULL) {
 2312                         abd_free(rrd->abd);
 2313                         kmem_free(rrd, sizeof (*rrd));
 2314                 }
 2315         }
 2316         ASSERT(list_is_empty(&rwa->write_batch));
 2317         return (err);
 2318 }
 2319 
 2320 noinline static int
 2321 receive_process_write_record(struct receive_writer_arg *rwa,
 2322     struct receive_record_arg *rrd)
 2323 {
 2324         int err = 0;
 2325 
 2326         ASSERT3U(rrd->header.drr_type, ==, DRR_WRITE);
 2327         struct drr_write *drrw = &rrd->header.drr_u.drr_write;
 2328 
 2329         if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
 2330             !DMU_OT_IS_VALID(drrw->drr_type))
 2331                 return (SET_ERROR(EINVAL));
 2332 
 2333         if (rwa->heal) {
 2334                 blkptr_t *bp;
 2335                 dmu_buf_t *dbp;
 2336                 dnode_t *dn;
 2337                 int flags = DB_RF_CANFAIL;
 2338 
 2339                 if (rwa->raw)
 2340                         flags |= DB_RF_NO_DECRYPT;
 2341 
 2342                 if (rwa->byteswap) {
 2343                         dmu_object_byteswap_t byteswap =
 2344                             DMU_OT_BYTESWAP(drrw->drr_type);
 2345                         dmu_ot_byteswap[byteswap].ob_func(abd_to_buf(rrd->abd),
 2346                             DRR_WRITE_PAYLOAD_SIZE(drrw));
 2347                 }
 2348 
 2349                 err = dmu_buf_hold_noread(rwa->os, drrw->drr_object,
 2350                     drrw->drr_offset, FTAG, &dbp);
 2351                 if (err != 0)
 2352                         return (err);
 2353 
 2354                 /* Try to read the object to see if it needs healing */
 2355                 err = dbuf_read((dmu_buf_impl_t *)dbp, NULL, flags);
 2356                 /*
 2357                  * We only try to heal when dbuf_read() returns a ECKSUMs.
 2358                  * Other errors (even EIO) get returned to caller.
 2359                  * EIO indicates that the device is not present/accessible,
 2360                  * so writing to it will likely fail.
 2361                  * If the block is healthy, we don't want to overwrite it
 2362                  * unnecessarily.
 2363                  */
 2364                 if (err != ECKSUM) {
 2365                         dmu_buf_rele(dbp, FTAG);
 2366                         return (err);
 2367                 }
 2368                 dn = dmu_buf_dnode_enter(dbp);
 2369                 /* Make sure the on-disk block and recv record sizes match */
 2370                 if (drrw->drr_logical_size !=
 2371                     dn->dn_datablkszsec << SPA_MINBLOCKSHIFT) {
 2372                         err = ENOTSUP;
 2373                         dmu_buf_dnode_exit(dbp);
 2374                         dmu_buf_rele(dbp, FTAG);
 2375                         return (err);
 2376                 }
 2377                 /* Get the block pointer for the corrupted block */
 2378                 bp = dmu_buf_get_blkptr(dbp);
 2379                 err = do_corrective_recv(rwa, drrw, rrd, bp);
 2380                 dmu_buf_dnode_exit(dbp);
 2381                 dmu_buf_rele(dbp, FTAG);
 2382                 return (err);
 2383         }
 2384 
 2385         /*
 2386          * For resuming to work, records must be in increasing order
 2387          * by (object, offset).
 2388          */
 2389         if (drrw->drr_object < rwa->last_object ||
 2390             (drrw->drr_object == rwa->last_object &&
 2391             drrw->drr_offset < rwa->last_offset)) {
 2392                 return (SET_ERROR(EINVAL));
 2393         }
 2394 
 2395         struct receive_record_arg *first_rrd = list_head(&rwa->write_batch);
 2396         struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write;
 2397         uint64_t batch_size =
 2398             MIN(zfs_recv_write_batch_size, DMU_MAX_ACCESS / 2);
 2399         if (first_rrd != NULL &&
 2400             (drrw->drr_object != first_drrw->drr_object ||
 2401             drrw->drr_offset >= first_drrw->drr_offset + batch_size)) {
 2402                 err = flush_write_batch(rwa);
 2403                 if (err != 0)
 2404                         return (err);
 2405         }
 2406 
 2407         rwa->last_object = drrw->drr_object;
 2408         rwa->last_offset = drrw->drr_offset;
 2409 
 2410         if (rwa->last_object > rwa->max_object)
 2411                 rwa->max_object = rwa->last_object;
 2412 
 2413         list_insert_tail(&rwa->write_batch, rrd);
 2414         /*
 2415          * Return EAGAIN to indicate that we will use this rrd again,
 2416          * so the caller should not free it
 2417          */
 2418         return (EAGAIN);
 2419 }
 2420 
 2421 static int
 2422 receive_write_embedded(struct receive_writer_arg *rwa,
 2423     struct drr_write_embedded *drrwe, void *data)
 2424 {
 2425         dmu_tx_t *tx;
 2426         int err;
 2427 
 2428         if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
 2429                 return (SET_ERROR(EINVAL));
 2430 
 2431         if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
 2432                 return (SET_ERROR(EINVAL));
 2433 
 2434         if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
 2435                 return (SET_ERROR(EINVAL));
 2436         if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
 2437                 return (SET_ERROR(EINVAL));
 2438         if (rwa->raw)
 2439                 return (SET_ERROR(EINVAL));
 2440 
 2441         if (drrwe->drr_object > rwa->max_object)
 2442                 rwa->max_object = drrwe->drr_object;
 2443 
 2444         tx = dmu_tx_create(rwa->os);
 2445 
 2446         dmu_tx_hold_write(tx, drrwe->drr_object,
 2447             drrwe->drr_offset, drrwe->drr_length);
 2448         err = dmu_tx_assign(tx, TXG_WAIT);
 2449         if (err != 0) {
 2450                 dmu_tx_abort(tx);
 2451                 return (err);
 2452         }
 2453 
 2454         dmu_write_embedded(rwa->os, drrwe->drr_object,
 2455             drrwe->drr_offset, data, drrwe->drr_etype,
 2456             drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
 2457             rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
 2458 
 2459         /* See comment in restore_write. */
 2460         save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
 2461         dmu_tx_commit(tx);
 2462         return (0);
 2463 }
 2464 
 2465 static int
 2466 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
 2467     abd_t *abd)
 2468 {
 2469         dmu_buf_t *db, *db_spill;
 2470         int err;
 2471 
 2472         if (drrs->drr_length < SPA_MINBLOCKSIZE ||
 2473             drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
 2474                 return (SET_ERROR(EINVAL));
 2475 
 2476         /*
 2477          * This is an unmodified spill block which was added to the stream
 2478          * to resolve an issue with incorrectly removing spill blocks.  It
 2479          * should be ignored by current versions of the code which support
 2480          * the DRR_FLAG_SPILL_BLOCK flag.
 2481          */
 2482         if (rwa->spill && DRR_SPILL_IS_UNMODIFIED(drrs->drr_flags)) {
 2483                 abd_free(abd);
 2484                 return (0);
 2485         }
 2486 
 2487         if (rwa->raw) {
 2488                 if (!DMU_OT_IS_VALID(drrs->drr_type) ||
 2489                     drrs->drr_compressiontype >= ZIO_COMPRESS_FUNCTIONS ||
 2490                     drrs->drr_compressed_size == 0)
 2491                         return (SET_ERROR(EINVAL));
 2492         }
 2493 
 2494         if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
 2495                 return (SET_ERROR(EINVAL));
 2496 
 2497         if (drrs->drr_object > rwa->max_object)
 2498                 rwa->max_object = drrs->drr_object;
 2499 
 2500         VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
 2501         if ((err = dmu_spill_hold_by_bonus(db, DMU_READ_NO_DECRYPT, FTAG,
 2502             &db_spill)) != 0) {
 2503                 dmu_buf_rele(db, FTAG);
 2504                 return (err);
 2505         }
 2506 
 2507         dmu_tx_t *tx = dmu_tx_create(rwa->os);
 2508 
 2509         dmu_tx_hold_spill(tx, db->db_object);
 2510 
 2511         err = dmu_tx_assign(tx, TXG_WAIT);
 2512         if (err != 0) {
 2513                 dmu_buf_rele(db, FTAG);
 2514                 dmu_buf_rele(db_spill, FTAG);
 2515                 dmu_tx_abort(tx);
 2516                 return (err);
 2517         }
 2518 
 2519         /*
 2520          * Spill blocks may both grow and shrink.  When a change in size
 2521          * occurs any existing dbuf must be updated to match the logical
 2522          * size of the provided arc_buf_t.
 2523          */
 2524         if (db_spill->db_size != drrs->drr_length) {
 2525                 dmu_buf_will_fill(db_spill, tx);
 2526                 VERIFY0(dbuf_spill_set_blksz(db_spill,
 2527                     drrs->drr_length, tx));
 2528         }
 2529 
 2530         arc_buf_t *abuf;
 2531         if (rwa->raw) {
 2532                 boolean_t byteorder = ZFS_HOST_BYTEORDER ^
 2533                     !!DRR_IS_RAW_BYTESWAPPED(drrs->drr_flags) ^
 2534                     rwa->byteswap;
 2535 
 2536                 abuf = arc_loan_raw_buf(dmu_objset_spa(rwa->os),
 2537                     drrs->drr_object, byteorder, drrs->drr_salt,
 2538                     drrs->drr_iv, drrs->drr_mac, drrs->drr_type,
 2539                     drrs->drr_compressed_size, drrs->drr_length,
 2540                     drrs->drr_compressiontype, 0);
 2541         } else {
 2542                 abuf = arc_loan_buf(dmu_objset_spa(rwa->os),
 2543                     DMU_OT_IS_METADATA(drrs->drr_type),
 2544                     drrs->drr_length);
 2545                 if (rwa->byteswap) {
 2546                         dmu_object_byteswap_t byteswap =
 2547                             DMU_OT_BYTESWAP(drrs->drr_type);
 2548                         dmu_ot_byteswap[byteswap].ob_func(abd_to_buf(abd),
 2549                             DRR_SPILL_PAYLOAD_SIZE(drrs));
 2550                 }
 2551         }
 2552 
 2553         memcpy(abuf->b_data, abd_to_buf(abd), DRR_SPILL_PAYLOAD_SIZE(drrs));
 2554         abd_free(abd);
 2555         dbuf_assign_arcbuf((dmu_buf_impl_t *)db_spill, abuf, tx);
 2556 
 2557         dmu_buf_rele(db, FTAG);
 2558         dmu_buf_rele(db_spill, FTAG);
 2559 
 2560         dmu_tx_commit(tx);
 2561         return (0);
 2562 }
 2563 
 2564 noinline static int
 2565 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
 2566 {
 2567         int err;
 2568 
 2569         if (drrf->drr_length != -1ULL &&
 2570             drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
 2571                 return (SET_ERROR(EINVAL));
 2572 
 2573         if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
 2574                 return (SET_ERROR(EINVAL));
 2575 
 2576         if (drrf->drr_object > rwa->max_object)
 2577                 rwa->max_object = drrf->drr_object;
 2578 
 2579         err = dmu_free_long_range(rwa->os, drrf->drr_object,
 2580             drrf->drr_offset, drrf->drr_length);
 2581 
 2582         return (err);
 2583 }
 2584 
 2585 static int
 2586 receive_object_range(struct receive_writer_arg *rwa,
 2587     struct drr_object_range *drror)
 2588 {
 2589         /*
 2590          * By default, we assume this block is in our native format
 2591          * (ZFS_HOST_BYTEORDER). We then take into account whether
 2592          * the send stream is byteswapped (rwa->byteswap). Finally,
 2593          * we need to byteswap again if this particular block was
 2594          * in non-native format on the send side.
 2595          */
 2596         boolean_t byteorder = ZFS_HOST_BYTEORDER ^ rwa->byteswap ^
 2597             !!DRR_IS_RAW_BYTESWAPPED(drror->drr_flags);
 2598 
 2599         /*
 2600          * Since dnode block sizes are constant, we should not need to worry
 2601          * about making sure that the dnode block size is the same on the
 2602          * sending and receiving sides for the time being. For non-raw sends,
 2603          * this does not matter (and in fact we do not send a DRR_OBJECT_RANGE
 2604          * record at all). Raw sends require this record type because the
 2605          * encryption parameters are used to protect an entire block of bonus
 2606          * buffers. If the size of dnode blocks ever becomes variable,
 2607          * handling will need to be added to ensure that dnode block sizes
 2608          * match on the sending and receiving side.
 2609          */
 2610         if (drror->drr_numslots != DNODES_PER_BLOCK ||
 2611             P2PHASE(drror->drr_firstobj, DNODES_PER_BLOCK) != 0 ||
 2612             !rwa->raw)
 2613                 return (SET_ERROR(EINVAL));
 2614 
 2615         if (drror->drr_firstobj > rwa->max_object)
 2616                 rwa->max_object = drror->drr_firstobj;
 2617 
 2618         /*
 2619          * The DRR_OBJECT_RANGE handling must be deferred to receive_object()
 2620          * so that the block of dnodes is not written out when it's empty,
 2621          * and converted to a HOLE BP.
 2622          */
 2623         rwa->or_crypt_params_present = B_TRUE;
 2624         rwa->or_firstobj = drror->drr_firstobj;
 2625         rwa->or_numslots = drror->drr_numslots;
 2626         memcpy(rwa->or_salt, drror->drr_salt, ZIO_DATA_SALT_LEN);
 2627         memcpy(rwa->or_iv, drror->drr_iv, ZIO_DATA_IV_LEN);
 2628         memcpy(rwa->or_mac, drror->drr_mac, ZIO_DATA_MAC_LEN);
 2629         rwa->or_byteorder = byteorder;
 2630 
 2631         rwa->or_need_sync = ORNS_MAYBE;
 2632 
 2633         return (0);
 2634 }
 2635 
 2636 /*
 2637  * Until we have the ability to redact large ranges of data efficiently, we
 2638  * process these records as frees.
 2639  */
 2640 noinline static int
 2641 receive_redact(struct receive_writer_arg *rwa, struct drr_redact *drrr)
 2642 {
 2643         struct drr_free drrf = {0};
 2644         drrf.drr_length = drrr->drr_length;
 2645         drrf.drr_object = drrr->drr_object;
 2646         drrf.drr_offset = drrr->drr_offset;
 2647         drrf.drr_toguid = drrr->drr_toguid;
 2648         return (receive_free(rwa, &drrf));
 2649 }
 2650 
 2651 /* used to destroy the drc_ds on error */
 2652 static void
 2653 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
 2654 {
 2655         dsl_dataset_t *ds = drc->drc_ds;
 2656         ds_hold_flags_t dsflags;
 2657 
 2658         dsflags = (drc->drc_raw) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
 2659         /*
 2660          * Wait for the txg sync before cleaning up the receive. For
 2661          * resumable receives, this ensures that our resume state has
 2662          * been written out to disk. For raw receives, this ensures
 2663          * that the user accounting code will not attempt to do anything
 2664          * after we stopped receiving the dataset.
 2665          */
 2666         txg_wait_synced(ds->ds_dir->dd_pool, 0);
 2667         ds->ds_objset->os_raw_receive = B_FALSE;
 2668 
 2669         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
 2670         if (drc->drc_resumable && drc->drc_should_save &&
 2671             !BP_IS_HOLE(dsl_dataset_get_blkptr(ds))) {
 2672                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
 2673                 dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
 2674         } else {
 2675                 char name[ZFS_MAX_DATASET_NAME_LEN];
 2676                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
 2677                 dsl_dataset_name(ds, name);
 2678                 dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
 2679                 if (!drc->drc_heal)
 2680                         (void) dsl_destroy_head(name);
 2681         }
 2682 }
 2683 
 2684 static void
 2685 receive_cksum(dmu_recv_cookie_t *drc, int len, void *buf)
 2686 {
 2687         if (drc->drc_byteswap) {
 2688                 (void) fletcher_4_incremental_byteswap(buf, len,
 2689                     &drc->drc_cksum);
 2690         } else {
 2691                 (void) fletcher_4_incremental_native(buf, len, &drc->drc_cksum);
 2692         }
 2693 }
 2694 
 2695 /*
 2696  * Read the payload into a buffer of size len, and update the current record's
 2697  * payload field.
 2698  * Allocate drc->drc_next_rrd and read the next record's header into
 2699  * drc->drc_next_rrd->header.
 2700  * Verify checksum of payload and next record.
 2701  */
 2702 static int
 2703 receive_read_payload_and_next_header(dmu_recv_cookie_t *drc, int len, void *buf)
 2704 {
 2705         int err;
 2706 
 2707         if (len != 0) {
 2708                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
 2709                 err = receive_read(drc, len, buf);
 2710                 if (err != 0)
 2711                         return (err);
 2712                 receive_cksum(drc, len, buf);
 2713 
 2714                 /* note: rrd is NULL when reading the begin record's payload */
 2715                 if (drc->drc_rrd != NULL) {
 2716                         drc->drc_rrd->payload = buf;
 2717                         drc->drc_rrd->payload_size = len;
 2718                         drc->drc_rrd->bytes_read = drc->drc_bytes_read;
 2719                 }
 2720         } else {
 2721                 ASSERT3P(buf, ==, NULL);
 2722         }
 2723 
 2724         drc->drc_prev_cksum = drc->drc_cksum;
 2725 
 2726         drc->drc_next_rrd = kmem_zalloc(sizeof (*drc->drc_next_rrd), KM_SLEEP);
 2727         err = receive_read(drc, sizeof (drc->drc_next_rrd->header),
 2728             &drc->drc_next_rrd->header);
 2729         drc->drc_next_rrd->bytes_read = drc->drc_bytes_read;
 2730 
 2731         if (err != 0) {
 2732                 kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
 2733                 drc->drc_next_rrd = NULL;
 2734                 return (err);
 2735         }
 2736         if (drc->drc_next_rrd->header.drr_type == DRR_BEGIN) {
 2737                 kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
 2738                 drc->drc_next_rrd = NULL;
 2739                 return (SET_ERROR(EINVAL));
 2740         }
 2741 
 2742         /*
 2743          * Note: checksum is of everything up to but not including the
 2744          * checksum itself.
 2745          */
 2746         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 2747             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
 2748         receive_cksum(drc,
 2749             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 2750             &drc->drc_next_rrd->header);
 2751 
 2752         zio_cksum_t cksum_orig =
 2753             drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum;
 2754         zio_cksum_t *cksump =
 2755             &drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum;
 2756 
 2757         if (drc->drc_byteswap)
 2758                 byteswap_record(&drc->drc_next_rrd->header);
 2759 
 2760         if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
 2761             !ZIO_CHECKSUM_EQUAL(drc->drc_cksum, *cksump)) {
 2762                 kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
 2763                 drc->drc_next_rrd = NULL;
 2764                 return (SET_ERROR(ECKSUM));
 2765         }
 2766 
 2767         receive_cksum(drc, sizeof (cksum_orig), &cksum_orig);
 2768 
 2769         return (0);
 2770 }
 2771 
 2772 /*
 2773  * Issue the prefetch reads for any necessary indirect blocks.
 2774  *
 2775  * We use the object ignore list to tell us whether or not to issue prefetches
 2776  * for a given object.  We do this for both correctness (in case the blocksize
 2777  * of an object has changed) and performance (if the object doesn't exist, don't
 2778  * needlessly try to issue prefetches).  We also trim the list as we go through
 2779  * the stream to prevent it from growing to an unbounded size.
 2780  *
 2781  * The object numbers within will always be in sorted order, and any write
 2782  * records we see will also be in sorted order, but they're not sorted with
 2783  * respect to each other (i.e. we can get several object records before
 2784  * receiving each object's write records).  As a result, once we've reached a
 2785  * given object number, we can safely remove any reference to lower object
 2786  * numbers in the ignore list. In practice, we receive up to 32 object records
 2787  * before receiving write records, so the list can have up to 32 nodes in it.
 2788  */
 2789 static void
 2790 receive_read_prefetch(dmu_recv_cookie_t *drc, uint64_t object, uint64_t offset,
 2791     uint64_t length)
 2792 {
 2793         if (!objlist_exists(drc->drc_ignore_objlist, object)) {
 2794                 dmu_prefetch(drc->drc_os, object, 1, offset, length,
 2795                     ZIO_PRIORITY_SYNC_READ);
 2796         }
 2797 }
 2798 
 2799 /*
 2800  * Read records off the stream, issuing any necessary prefetches.
 2801  */
 2802 static int
 2803 receive_read_record(dmu_recv_cookie_t *drc)
 2804 {
 2805         int err;
 2806 
 2807         switch (drc->drc_rrd->header.drr_type) {
 2808         case DRR_OBJECT:
 2809         {
 2810                 struct drr_object *drro =
 2811                     &drc->drc_rrd->header.drr_u.drr_object;
 2812                 uint32_t size = DRR_OBJECT_PAYLOAD_SIZE(drro);
 2813                 void *buf = NULL;
 2814                 dmu_object_info_t doi;
 2815 
 2816                 if (size != 0)
 2817                         buf = kmem_zalloc(size, KM_SLEEP);
 2818 
 2819                 err = receive_read_payload_and_next_header(drc, size, buf);
 2820                 if (err != 0) {
 2821                         kmem_free(buf, size);
 2822                         return (err);
 2823                 }
 2824                 err = dmu_object_info(drc->drc_os, drro->drr_object, &doi);
 2825                 /*
 2826                  * See receive_read_prefetch for an explanation why we're
 2827                  * storing this object in the ignore_obj_list.
 2828                  */
 2829                 if (err == ENOENT || err == EEXIST ||
 2830                     (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
 2831                         objlist_insert(drc->drc_ignore_objlist,
 2832                             drro->drr_object);
 2833                         err = 0;
 2834                 }
 2835                 return (err);
 2836         }
 2837         case DRR_FREEOBJECTS:
 2838         {
 2839                 err = receive_read_payload_and_next_header(drc, 0, NULL);
 2840                 return (err);
 2841         }
 2842         case DRR_WRITE:
 2843         {
 2844                 struct drr_write *drrw = &drc->drc_rrd->header.drr_u.drr_write;
 2845                 int size = DRR_WRITE_PAYLOAD_SIZE(drrw);
 2846                 abd_t *abd = abd_alloc_linear(size, B_FALSE);
 2847                 err = receive_read_payload_and_next_header(drc, size,
 2848                     abd_to_buf(abd));
 2849                 if (err != 0) {
 2850                         abd_free(abd);
 2851                         return (err);
 2852                 }
 2853                 drc->drc_rrd->abd = abd;
 2854                 receive_read_prefetch(drc, drrw->drr_object, drrw->drr_offset,
 2855                     drrw->drr_logical_size);
 2856                 return (err);
 2857         }
 2858         case DRR_WRITE_EMBEDDED:
 2859         {
 2860                 struct drr_write_embedded *drrwe =
 2861                     &drc->drc_rrd->header.drr_u.drr_write_embedded;
 2862                 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
 2863                 void *buf = kmem_zalloc(size, KM_SLEEP);
 2864 
 2865                 err = receive_read_payload_and_next_header(drc, size, buf);
 2866                 if (err != 0) {
 2867                         kmem_free(buf, size);
 2868                         return (err);
 2869                 }
 2870 
 2871                 receive_read_prefetch(drc, drrwe->drr_object, drrwe->drr_offset,
 2872                     drrwe->drr_length);
 2873                 return (err);
 2874         }
 2875         case DRR_FREE:
 2876         case DRR_REDACT:
 2877         {
 2878                 /*
 2879                  * It might be beneficial to prefetch indirect blocks here, but
 2880                  * we don't really have the data to decide for sure.
 2881                  */
 2882                 err = receive_read_payload_and_next_header(drc, 0, NULL);
 2883                 return (err);
 2884         }
 2885         case DRR_END:
 2886         {
 2887                 struct drr_end *drre = &drc->drc_rrd->header.drr_u.drr_end;
 2888                 if (!ZIO_CHECKSUM_EQUAL(drc->drc_prev_cksum,
 2889                     drre->drr_checksum))
 2890                         return (SET_ERROR(ECKSUM));
 2891                 return (0);
 2892         }
 2893         case DRR_SPILL:
 2894         {
 2895                 struct drr_spill *drrs = &drc->drc_rrd->header.drr_u.drr_spill;
 2896                 int size = DRR_SPILL_PAYLOAD_SIZE(drrs);
 2897                 abd_t *abd = abd_alloc_linear(size, B_FALSE);
 2898                 err = receive_read_payload_and_next_header(drc, size,
 2899                     abd_to_buf(abd));
 2900                 if (err != 0)
 2901                         abd_free(abd);
 2902                 else
 2903                         drc->drc_rrd->abd = abd;
 2904                 return (err);
 2905         }
 2906         case DRR_OBJECT_RANGE:
 2907         {
 2908                 err = receive_read_payload_and_next_header(drc, 0, NULL);
 2909                 return (err);
 2910 
 2911         }
 2912         default:
 2913                 return (SET_ERROR(EINVAL));
 2914         }
 2915 }
 2916 
 2917 
 2918 
 2919 static void
 2920 dprintf_drr(struct receive_record_arg *rrd, int err)
 2921 {
 2922 #ifdef ZFS_DEBUG
 2923         switch (rrd->header.drr_type) {
 2924         case DRR_OBJECT:
 2925         {
 2926                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
 2927                 dprintf("drr_type = OBJECT obj = %llu type = %u "
 2928                     "bonustype = %u blksz = %u bonuslen = %u cksumtype = %u "
 2929                     "compress = %u dn_slots = %u err = %d\n",
 2930                     (u_longlong_t)drro->drr_object, drro->drr_type,
 2931                     drro->drr_bonustype, drro->drr_blksz, drro->drr_bonuslen,
 2932                     drro->drr_checksumtype, drro->drr_compress,
 2933                     drro->drr_dn_slots, err);
 2934                 break;
 2935         }
 2936         case DRR_FREEOBJECTS:
 2937         {
 2938                 struct drr_freeobjects *drrfo =
 2939                     &rrd->header.drr_u.drr_freeobjects;
 2940                 dprintf("drr_type = FREEOBJECTS firstobj = %llu "
 2941                     "numobjs = %llu err = %d\n",
 2942                     (u_longlong_t)drrfo->drr_firstobj,
 2943                     (u_longlong_t)drrfo->drr_numobjs, err);
 2944                 break;
 2945         }
 2946         case DRR_WRITE:
 2947         {
 2948                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
 2949                 dprintf("drr_type = WRITE obj = %llu type = %u offset = %llu "
 2950                     "lsize = %llu cksumtype = %u flags = %u "
 2951                     "compress = %u psize = %llu err = %d\n",
 2952                     (u_longlong_t)drrw->drr_object, drrw->drr_type,
 2953                     (u_longlong_t)drrw->drr_offset,
 2954                     (u_longlong_t)drrw->drr_logical_size,
 2955                     drrw->drr_checksumtype, drrw->drr_flags,
 2956                     drrw->drr_compressiontype,
 2957                     (u_longlong_t)drrw->drr_compressed_size, err);
 2958                 break;
 2959         }
 2960         case DRR_WRITE_BYREF:
 2961         {
 2962                 struct drr_write_byref *drrwbr =
 2963                     &rrd->header.drr_u.drr_write_byref;
 2964                 dprintf("drr_type = WRITE_BYREF obj = %llu offset = %llu "
 2965                     "length = %llu toguid = %llx refguid = %llx "
 2966                     "refobject = %llu refoffset = %llu cksumtype = %u "
 2967                     "flags = %u err = %d\n",
 2968                     (u_longlong_t)drrwbr->drr_object,
 2969                     (u_longlong_t)drrwbr->drr_offset,
 2970                     (u_longlong_t)drrwbr->drr_length,
 2971                     (u_longlong_t)drrwbr->drr_toguid,
 2972                     (u_longlong_t)drrwbr->drr_refguid,
 2973                     (u_longlong_t)drrwbr->drr_refobject,
 2974                     (u_longlong_t)drrwbr->drr_refoffset,
 2975                     drrwbr->drr_checksumtype, drrwbr->drr_flags, err);
 2976                 break;
 2977         }
 2978         case DRR_WRITE_EMBEDDED:
 2979         {
 2980                 struct drr_write_embedded *drrwe =
 2981                     &rrd->header.drr_u.drr_write_embedded;
 2982                 dprintf("drr_type = WRITE_EMBEDDED obj = %llu offset = %llu "
 2983                     "length = %llu compress = %u etype = %u lsize = %u "
 2984                     "psize = %u err = %d\n",
 2985                     (u_longlong_t)drrwe->drr_object,
 2986                     (u_longlong_t)drrwe->drr_offset,
 2987                     (u_longlong_t)drrwe->drr_length,
 2988                     drrwe->drr_compression, drrwe->drr_etype,
 2989                     drrwe->drr_lsize, drrwe->drr_psize, err);
 2990                 break;
 2991         }
 2992         case DRR_FREE:
 2993         {
 2994                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
 2995                 dprintf("drr_type = FREE obj = %llu offset = %llu "
 2996                     "length = %lld err = %d\n",
 2997                     (u_longlong_t)drrf->drr_object,
 2998                     (u_longlong_t)drrf->drr_offset,
 2999                     (longlong_t)drrf->drr_length,
 3000                     err);
 3001                 break;
 3002         }
 3003         case DRR_SPILL:
 3004         {
 3005                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
 3006                 dprintf("drr_type = SPILL obj = %llu length = %llu "
 3007                     "err = %d\n", (u_longlong_t)drrs->drr_object,
 3008                     (u_longlong_t)drrs->drr_length, err);
 3009                 break;
 3010         }
 3011         case DRR_OBJECT_RANGE:
 3012         {
 3013                 struct drr_object_range *drror =
 3014                     &rrd->header.drr_u.drr_object_range;
 3015                 dprintf("drr_type = OBJECT_RANGE firstobj = %llu "
 3016                     "numslots = %llu flags = %u err = %d\n",
 3017                     (u_longlong_t)drror->drr_firstobj,
 3018                     (u_longlong_t)drror->drr_numslots,
 3019                     drror->drr_flags, err);
 3020                 break;
 3021         }
 3022         default:
 3023                 return;
 3024         }
 3025 #endif
 3026 }
 3027 
 3028 /*
 3029  * Commit the records to the pool.
 3030  */
 3031 static int
 3032 receive_process_record(struct receive_writer_arg *rwa,
 3033     struct receive_record_arg *rrd)
 3034 {
 3035         int err;
 3036 
 3037         /* Processing in order, therefore bytes_read should be increasing. */
 3038         ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
 3039         rwa->bytes_read = rrd->bytes_read;
 3040 
 3041         /* We can only heal write records; other ones get ignored */
 3042         if (rwa->heal && rrd->header.drr_type != DRR_WRITE) {
 3043                 if (rrd->abd != NULL) {
 3044                         abd_free(rrd->abd);
 3045                         rrd->abd = NULL;
 3046                 } else if (rrd->payload != NULL) {
 3047                         kmem_free(rrd->payload, rrd->payload_size);
 3048                         rrd->payload = NULL;
 3049                 }
 3050                 return (0);
 3051         }
 3052 
 3053         if (!rwa->heal && rrd->header.drr_type != DRR_WRITE) {
 3054                 err = flush_write_batch(rwa);
 3055                 if (err != 0) {
 3056                         if (rrd->abd != NULL) {
 3057                                 abd_free(rrd->abd);
 3058                                 rrd->abd = NULL;
 3059                                 rrd->payload = NULL;
 3060                         } else if (rrd->payload != NULL) {
 3061                                 kmem_free(rrd->payload, rrd->payload_size);
 3062                                 rrd->payload = NULL;
 3063                         }
 3064 
 3065                         return (err);
 3066                 }
 3067         }
 3068 
 3069         switch (rrd->header.drr_type) {
 3070         case DRR_OBJECT:
 3071         {
 3072                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
 3073                 err = receive_object(rwa, drro, rrd->payload);
 3074                 kmem_free(rrd->payload, rrd->payload_size);
 3075                 rrd->payload = NULL;
 3076                 break;
 3077         }
 3078         case DRR_FREEOBJECTS:
 3079         {
 3080                 struct drr_freeobjects *drrfo =
 3081                     &rrd->header.drr_u.drr_freeobjects;
 3082                 err = receive_freeobjects(rwa, drrfo);
 3083                 break;
 3084         }
 3085         case DRR_WRITE:
 3086         {
 3087                 err = receive_process_write_record(rwa, rrd);
 3088                 if (rwa->heal) {
 3089                         /*
 3090                          * If healing - always free the abd after processing
 3091                          */
 3092                         abd_free(rrd->abd);
 3093                         rrd->abd = NULL;
 3094                 } else if (err != EAGAIN) {
 3095                         /*
 3096                          * On success, a non-healing
 3097                          * receive_process_write_record() returns
 3098                          * EAGAIN to indicate that we do not want to free
 3099                          * the rrd or arc_buf.
 3100                          */
 3101                         ASSERT(err != 0);
 3102                         abd_free(rrd->abd);
 3103                         rrd->abd = NULL;
 3104                 }
 3105                 break;
 3106         }
 3107         case DRR_WRITE_EMBEDDED:
 3108         {
 3109                 struct drr_write_embedded *drrwe =
 3110                     &rrd->header.drr_u.drr_write_embedded;
 3111                 err = receive_write_embedded(rwa, drrwe, rrd->payload);
 3112                 kmem_free(rrd->payload, rrd->payload_size);
 3113                 rrd->payload = NULL;
 3114                 break;
 3115         }
 3116         case DRR_FREE:
 3117         {
 3118                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
 3119                 err = receive_free(rwa, drrf);
 3120                 break;
 3121         }
 3122         case DRR_SPILL:
 3123         {
 3124                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
 3125                 err = receive_spill(rwa, drrs, rrd->abd);
 3126                 if (err != 0)
 3127                         abd_free(rrd->abd);
 3128                 rrd->abd = NULL;
 3129                 rrd->payload = NULL;
 3130                 break;
 3131         }
 3132         case DRR_OBJECT_RANGE:
 3133         {
 3134                 struct drr_object_range *drror =
 3135                     &rrd->header.drr_u.drr_object_range;
 3136                 err = receive_object_range(rwa, drror);
 3137                 break;
 3138         }
 3139         case DRR_REDACT:
 3140         {
 3141                 struct drr_redact *drrr = &rrd->header.drr_u.drr_redact;
 3142                 err = receive_redact(rwa, drrr);
 3143                 break;
 3144         }
 3145         default:
 3146                 err = (SET_ERROR(EINVAL));
 3147         }
 3148 
 3149         if (err != 0)
 3150                 dprintf_drr(rrd, err);
 3151 
 3152         return (err);
 3153 }
 3154 
 3155 /*
 3156  * dmu_recv_stream's worker thread; pull records off the queue, and then call
 3157  * receive_process_record  When we're done, signal the main thread and exit.
 3158  */
 3159 static __attribute__((noreturn)) void
 3160 receive_writer_thread(void *arg)
 3161 {
 3162         struct receive_writer_arg *rwa = arg;
 3163         struct receive_record_arg *rrd;
 3164         fstrans_cookie_t cookie = spl_fstrans_mark();
 3165 
 3166         for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
 3167             rrd = bqueue_dequeue(&rwa->q)) {
 3168                 /*
 3169                  * If there's an error, the main thread will stop putting things
 3170                  * on the queue, but we need to clear everything in it before we
 3171                  * can exit.
 3172                  */
 3173                 int err = 0;
 3174                 if (rwa->err == 0) {
 3175                         err = receive_process_record(rwa, rrd);
 3176                 } else if (rrd->abd != NULL) {
 3177                         abd_free(rrd->abd);
 3178                         rrd->abd = NULL;
 3179                         rrd->payload = NULL;
 3180                 } else if (rrd->payload != NULL) {
 3181                         kmem_free(rrd->payload, rrd->payload_size);
 3182                         rrd->payload = NULL;
 3183                 }
 3184                 /*
 3185                  * EAGAIN indicates that this record has been saved (on
 3186                  * raw->write_batch), and will be used again, so we don't
 3187                  * free it.
 3188                  * When healing data we always need to free the record.
 3189                  */
 3190                 if (err != EAGAIN || rwa->heal) {
 3191                         if (rwa->err == 0)
 3192                                 rwa->err = err;
 3193                         kmem_free(rrd, sizeof (*rrd));
 3194                 }
 3195         }
 3196         kmem_free(rrd, sizeof (*rrd));
 3197 
 3198         if (rwa->heal) {
 3199                 zio_wait(rwa->heal_pio);
 3200         } else {
 3201                 int err = flush_write_batch(rwa);
 3202                 if (rwa->err == 0)
 3203                         rwa->err = err;
 3204         }
 3205         mutex_enter(&rwa->mutex);
 3206         rwa->done = B_TRUE;
 3207         cv_signal(&rwa->cv);
 3208         mutex_exit(&rwa->mutex);
 3209         spl_fstrans_unmark(cookie);
 3210         thread_exit();
 3211 }
 3212 
 3213 static int
 3214 resume_check(dmu_recv_cookie_t *drc, nvlist_t *begin_nvl)
 3215 {
 3216         uint64_t val;
 3217         objset_t *mos = dmu_objset_pool(drc->drc_os)->dp_meta_objset;
 3218         uint64_t dsobj = dmu_objset_id(drc->drc_os);
 3219         uint64_t resume_obj, resume_off;
 3220 
 3221         if (nvlist_lookup_uint64(begin_nvl,
 3222             "resume_object", &resume_obj) != 0 ||
 3223             nvlist_lookup_uint64(begin_nvl,
 3224             "resume_offset", &resume_off) != 0) {
 3225                 return (SET_ERROR(EINVAL));
 3226         }
 3227         VERIFY0(zap_lookup(mos, dsobj,
 3228             DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
 3229         if (resume_obj != val)
 3230                 return (SET_ERROR(EINVAL));
 3231         VERIFY0(zap_lookup(mos, dsobj,
 3232             DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
 3233         if (resume_off != val)
 3234                 return (SET_ERROR(EINVAL));
 3235 
 3236         return (0);
 3237 }
 3238 
 3239 /*
 3240  * Read in the stream's records, one by one, and apply them to the pool.  There
 3241  * are two threads involved; the thread that calls this function will spin up a
 3242  * worker thread, read the records off the stream one by one, and issue
 3243  * prefetches for any necessary indirect blocks.  It will then push the records
 3244  * onto an internal blocking queue.  The worker thread will pull the records off
 3245  * the queue, and actually write the data into the DMU.  This way, the worker
 3246  * thread doesn't have to wait for reads to complete, since everything it needs
 3247  * (the indirect blocks) will be prefetched.
 3248  *
 3249  * NB: callers *must* call dmu_recv_end() if this succeeds.
 3250  */
 3251 int
 3252 dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
 3253 {
 3254         int err = 0;
 3255         struct receive_writer_arg *rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
 3256 
 3257         if (dsl_dataset_has_resume_receive_state(drc->drc_ds)) {
 3258                 uint64_t bytes = 0;
 3259                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
 3260                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
 3261                     sizeof (bytes), 1, &bytes);
 3262                 drc->drc_bytes_read += bytes;
 3263         }
 3264 
 3265         drc->drc_ignore_objlist = objlist_create();
 3266 
 3267         /* these were verified in dmu_recv_begin */
 3268         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
 3269             DMU_SUBSTREAM);
 3270         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
 3271 
 3272         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
 3273         ASSERT0(drc->drc_os->os_encrypted &&
 3274             (drc->drc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA));
 3275 
 3276         /* handle DSL encryption key payload */
 3277         if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) {
 3278                 nvlist_t *keynvl = NULL;
 3279 
 3280                 ASSERT(drc->drc_os->os_encrypted);
 3281                 ASSERT(drc->drc_raw);
 3282 
 3283                 err = nvlist_lookup_nvlist(drc->drc_begin_nvl, "crypt_keydata",
 3284                     &keynvl);
 3285                 if (err != 0)
 3286                         goto out;
 3287 
 3288                 if (!drc->drc_heal) {
 3289                         /*
 3290                          * If this is a new dataset we set the key immediately.
 3291                          * Otherwise we don't want to change the key until we
 3292                          * are sure the rest of the receive succeeded so we
 3293                          * stash the keynvl away until then.
 3294                          */
 3295                         err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa),
 3296                             drc->drc_ds->ds_object, drc->drc_fromsnapobj,
 3297                             drc->drc_drrb->drr_type, keynvl, drc->drc_newfs);
 3298                         if (err != 0)
 3299                                 goto out;
 3300                 }
 3301 
 3302                 /* see comment in dmu_recv_end_sync() */
 3303                 drc->drc_ivset_guid = 0;
 3304                 (void) nvlist_lookup_uint64(keynvl, "to_ivset_guid",
 3305                     &drc->drc_ivset_guid);
 3306 
 3307                 if (!drc->drc_newfs)
 3308                         drc->drc_keynvl = fnvlist_dup(keynvl);
 3309         }
 3310 
 3311         if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) {
 3312                 err = resume_check(drc, drc->drc_begin_nvl);
 3313                 if (err != 0)
 3314                         goto out;
 3315         }
 3316 
 3317         /*
 3318          * If we failed before this point we will clean up any new resume
 3319          * state that was created. Now that we've gotten past the initial
 3320          * checks we are ok to retain that resume state.
 3321          */
 3322         drc->drc_should_save = B_TRUE;
 3323 
 3324         (void) bqueue_init(&rwa->q, zfs_recv_queue_ff,
 3325             MAX(zfs_recv_queue_length, 2 * zfs_max_recordsize),
 3326             offsetof(struct receive_record_arg, node));
 3327         cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL);
 3328         mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
 3329         rwa->os = drc->drc_os;
 3330         rwa->byteswap = drc->drc_byteswap;
 3331         rwa->heal = drc->drc_heal;
 3332         rwa->tofs = drc->drc_tofs;
 3333         rwa->resumable = drc->drc_resumable;
 3334         rwa->raw = drc->drc_raw;
 3335         rwa->spill = drc->drc_spill;
 3336         rwa->full = (drc->drc_drr_begin->drr_u.drr_begin.drr_fromguid == 0);
 3337         rwa->os->os_raw_receive = drc->drc_raw;
 3338         if (drc->drc_heal) {
 3339                 rwa->heal_pio = zio_root(drc->drc_os->os_spa, NULL, NULL,
 3340                     ZIO_FLAG_GODFATHER);
 3341         }
 3342         list_create(&rwa->write_batch, sizeof (struct receive_record_arg),
 3343             offsetof(struct receive_record_arg, node.bqn_node));
 3344 
 3345         (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
 3346             TS_RUN, minclsyspri);
 3347         /*
 3348          * We're reading rwa->err without locks, which is safe since we are the
 3349          * only reader, and the worker thread is the only writer.  It's ok if we
 3350          * miss a write for an iteration or two of the loop, since the writer
 3351          * thread will keep freeing records we send it until we send it an eos
 3352          * marker.
 3353          *
 3354          * We can leave this loop in 3 ways:  First, if rwa->err is
 3355          * non-zero.  In that case, the writer thread will free the rrd we just
 3356          * pushed.  Second, if  we're interrupted; in that case, either it's the
 3357          * first loop and drc->drc_rrd was never allocated, or it's later, and
 3358          * drc->drc_rrd has been handed off to the writer thread who will free
 3359          * it.  Finally, if receive_read_record fails or we're at the end of the
 3360          * stream, then we free drc->drc_rrd and exit.
 3361          */
 3362         while (rwa->err == 0) {
 3363                 if (issig(JUSTLOOKING) && issig(FORREAL)) {
 3364                         err = SET_ERROR(EINTR);
 3365                         break;
 3366                 }
 3367 
 3368                 ASSERT3P(drc->drc_rrd, ==, NULL);
 3369                 drc->drc_rrd = drc->drc_next_rrd;
 3370                 drc->drc_next_rrd = NULL;
 3371                 /* Allocates and loads header into drc->drc_next_rrd */
 3372                 err = receive_read_record(drc);
 3373 
 3374                 if (drc->drc_rrd->header.drr_type == DRR_END || err != 0) {
 3375                         kmem_free(drc->drc_rrd, sizeof (*drc->drc_rrd));
 3376                         drc->drc_rrd = NULL;
 3377                         break;
 3378                 }
 3379 
 3380                 bqueue_enqueue(&rwa->q, drc->drc_rrd,
 3381                     sizeof (struct receive_record_arg) +
 3382                     drc->drc_rrd->payload_size);
 3383                 drc->drc_rrd = NULL;
 3384         }
 3385 
 3386         ASSERT3P(drc->drc_rrd, ==, NULL);
 3387         drc->drc_rrd = kmem_zalloc(sizeof (*drc->drc_rrd), KM_SLEEP);
 3388         drc->drc_rrd->eos_marker = B_TRUE;
 3389         bqueue_enqueue_flush(&rwa->q, drc->drc_rrd, 1);
 3390 
 3391         mutex_enter(&rwa->mutex);
 3392         while (!rwa->done) {
 3393                 /*
 3394                  * We need to use cv_wait_sig() so that any process that may
 3395                  * be sleeping here can still fork.
 3396                  */
 3397                 (void) cv_wait_sig(&rwa->cv, &rwa->mutex);
 3398         }
 3399         mutex_exit(&rwa->mutex);
 3400 
 3401         /*
 3402          * If we are receiving a full stream as a clone, all object IDs which
 3403          * are greater than the maximum ID referenced in the stream are
 3404          * by definition unused and must be freed.
 3405          */
 3406         if (drc->drc_clone && drc->drc_drrb->drr_fromguid == 0) {
 3407                 uint64_t obj = rwa->max_object + 1;
 3408                 int free_err = 0;
 3409                 int next_err = 0;
 3410 
 3411                 while (next_err == 0) {
 3412                         free_err = dmu_free_long_object(rwa->os, obj);
 3413                         if (free_err != 0 && free_err != ENOENT)
 3414                                 break;
 3415 
 3416                         next_err = dmu_object_next(rwa->os, &obj, FALSE, 0);
 3417                 }
 3418 
 3419                 if (err == 0) {
 3420                         if (free_err != 0 && free_err != ENOENT)
 3421                                 err = free_err;
 3422                         else if (next_err != ESRCH)
 3423                                 err = next_err;
 3424                 }
 3425         }
 3426 
 3427         cv_destroy(&rwa->cv);
 3428         mutex_destroy(&rwa->mutex);
 3429         bqueue_destroy(&rwa->q);
 3430         list_destroy(&rwa->write_batch);
 3431         if (err == 0)
 3432                 err = rwa->err;
 3433 
 3434 out:
 3435         /*
 3436          * If we hit an error before we started the receive_writer_thread
 3437          * we need to clean up the next_rrd we create by processing the
 3438          * DRR_BEGIN record.
 3439          */
 3440         if (drc->drc_next_rrd != NULL)
 3441                 kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
 3442 
 3443         /*
 3444          * The objset will be invalidated by dmu_recv_end() when we do
 3445          * dsl_dataset_clone_swap_sync_impl().
 3446          */
 3447         drc->drc_os = NULL;
 3448 
 3449         kmem_free(rwa, sizeof (*rwa));
 3450         nvlist_free(drc->drc_begin_nvl);
 3451 
 3452         if (err != 0) {
 3453                 /*
 3454                  * Clean up references. If receive is not resumable,
 3455                  * destroy what we created, so we don't leave it in
 3456                  * the inconsistent state.
 3457                  */
 3458                 dmu_recv_cleanup_ds(drc);
 3459                 nvlist_free(drc->drc_keynvl);
 3460         }
 3461 
 3462         objlist_destroy(drc->drc_ignore_objlist);
 3463         drc->drc_ignore_objlist = NULL;
 3464         *voffp = drc->drc_voff;
 3465         return (err);
 3466 }
 3467 
 3468 static int
 3469 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
 3470 {
 3471         dmu_recv_cookie_t *drc = arg;
 3472         dsl_pool_t *dp = dmu_tx_pool(tx);
 3473         int error;
 3474 
 3475         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
 3476 
 3477         if (drc->drc_heal) {
 3478                 error = 0;
 3479         } else if (!drc->drc_newfs) {
 3480                 dsl_dataset_t *origin_head;
 3481 
 3482                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
 3483                 if (error != 0)
 3484                         return (error);
 3485                 if (drc->drc_force) {
 3486                         /*
 3487                          * We will destroy any snapshots in tofs (i.e. before
 3488                          * origin_head) that are after the origin (which is
 3489                          * the snap before drc_ds, because drc_ds can not
 3490                          * have any snaps of its own).
 3491                          */
 3492                         uint64_t obj;
 3493 
 3494                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
 3495                         while (obj !=
 3496                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
 3497                                 dsl_dataset_t *snap;
 3498                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,
 3499                                     &snap);
 3500                                 if (error != 0)
 3501                                         break;
 3502                                 if (snap->ds_dir != origin_head->ds_dir)
 3503                                         error = SET_ERROR(EINVAL);
 3504                                 if (error == 0)  {
 3505                                         error = dsl_destroy_snapshot_check_impl(
 3506                                             snap, B_FALSE);
 3507                                 }
 3508                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
 3509                                 dsl_dataset_rele(snap, FTAG);
 3510                                 if (error != 0)
 3511                                         break;
 3512                         }
 3513                         if (error != 0) {
 3514                                 dsl_dataset_rele(origin_head, FTAG);
 3515                                 return (error);
 3516                         }
 3517                 }
 3518                 if (drc->drc_keynvl != NULL) {
 3519                         error = dsl_crypto_recv_raw_key_check(drc->drc_ds,
 3520                             drc->drc_keynvl, tx);
 3521                         if (error != 0) {
 3522                                 dsl_dataset_rele(origin_head, FTAG);
 3523                                 return (error);
 3524                         }
 3525                 }
 3526 
 3527                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
 3528                     origin_head, drc->drc_force, drc->drc_owner, tx);
 3529                 if (error != 0) {
 3530                         dsl_dataset_rele(origin_head, FTAG);
 3531                         return (error);
 3532                 }
 3533                 error = dsl_dataset_snapshot_check_impl(origin_head,
 3534                     drc->drc_tosnap, tx, B_TRUE, 1,
 3535                     drc->drc_cred, drc->drc_proc);
 3536                 dsl_dataset_rele(origin_head, FTAG);
 3537                 if (error != 0)
 3538                         return (error);
 3539 
 3540                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
 3541         } else {
 3542                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
 3543                     drc->drc_tosnap, tx, B_TRUE, 1,
 3544                     drc->drc_cred, drc->drc_proc);
 3545         }
 3546         return (error);
 3547 }
 3548 
 3549 static void
 3550 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
 3551 {
 3552         dmu_recv_cookie_t *drc = arg;
 3553         dsl_pool_t *dp = dmu_tx_pool(tx);
 3554         boolean_t encrypted = drc->drc_ds->ds_dir->dd_crypto_obj != 0;
 3555         uint64_t newsnapobj = 0;
 3556 
 3557         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
 3558             tx, "snap=%s", drc->drc_tosnap);
 3559         drc->drc_ds->ds_objset->os_raw_receive = B_FALSE;
 3560 
 3561         if (drc->drc_heal) {
 3562                 if (drc->drc_keynvl != NULL) {
 3563                         nvlist_free(drc->drc_keynvl);
 3564                         drc->drc_keynvl = NULL;
 3565                 }
 3566         } else if (!drc->drc_newfs) {
 3567                 dsl_dataset_t *origin_head;
 3568 
 3569                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
 3570                     &origin_head));
 3571 
 3572                 if (drc->drc_force) {
 3573                         /*
 3574                          * Destroy any snapshots of drc_tofs (origin_head)
 3575                          * after the origin (the snap before drc_ds).
 3576                          */
 3577                         uint64_t obj;
 3578 
 3579                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
 3580                         while (obj !=
 3581                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
 3582                                 dsl_dataset_t *snap;
 3583                                 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
 3584                                     &snap));
 3585                                 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
 3586                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
 3587                                 dsl_destroy_snapshot_sync_impl(snap,
 3588                                     B_FALSE, tx);
 3589                                 dsl_dataset_rele(snap, FTAG);
 3590                         }
 3591                 }
 3592                 if (drc->drc_keynvl != NULL) {
 3593                         dsl_crypto_recv_raw_key_sync(drc->drc_ds,
 3594                             drc->drc_keynvl, tx);
 3595                         nvlist_free(drc->drc_keynvl);
 3596                         drc->drc_keynvl = NULL;
 3597                 }
 3598 
 3599                 VERIFY3P(drc->drc_ds->ds_prev, ==,
 3600                     origin_head->ds_prev);
 3601 
 3602                 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
 3603                     origin_head, tx);
 3604                 /*
 3605                  * The objset was evicted by dsl_dataset_clone_swap_sync_impl,
 3606                  * so drc_os is no longer valid.
 3607                  */
 3608                 drc->drc_os = NULL;
 3609 
 3610                 dsl_dataset_snapshot_sync_impl(origin_head,
 3611                     drc->drc_tosnap, tx);
 3612 
 3613                 /* set snapshot's creation time and guid */
 3614                 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
 3615                 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
 3616                     drc->drc_drrb->drr_creation_time;
 3617                 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
 3618                     drc->drc_drrb->drr_toguid;
 3619                 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
 3620                     ~DS_FLAG_INCONSISTENT;
 3621 
 3622                 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
 3623                 dsl_dataset_phys(origin_head)->ds_flags &=
 3624                     ~DS_FLAG_INCONSISTENT;
 3625 
 3626                 newsnapobj =
 3627                     dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
 3628 
 3629                 dsl_dataset_rele(origin_head, FTAG);
 3630                 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
 3631 
 3632                 if (drc->drc_owner != NULL)
 3633                         VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
 3634         } else {
 3635                 dsl_dataset_t *ds = drc->drc_ds;
 3636 
 3637                 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
 3638 
 3639                 /* set snapshot's creation time and guid */
 3640                 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
 3641                 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
 3642                     drc->drc_drrb->drr_creation_time;
 3643                 dsl_dataset_phys(ds->ds_prev)->ds_guid =
 3644                     drc->drc_drrb->drr_toguid;
 3645                 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
 3646                     ~DS_FLAG_INCONSISTENT;
 3647 
 3648                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
 3649                 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
 3650                 if (dsl_dataset_has_resume_receive_state(ds)) {
 3651                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3652                             DS_FIELD_RESUME_FROMGUID, tx);
 3653                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3654                             DS_FIELD_RESUME_OBJECT, tx);
 3655                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3656                             DS_FIELD_RESUME_OFFSET, tx);
 3657                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3658                             DS_FIELD_RESUME_BYTES, tx);
 3659                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3660                             DS_FIELD_RESUME_TOGUID, tx);
 3661                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3662                             DS_FIELD_RESUME_TONAME, tx);
 3663                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
 3664                             DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS, tx);
 3665                 }
 3666                 newsnapobj =
 3667                     dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
 3668         }
 3669 
 3670         /*
 3671          * If this is a raw receive, the crypt_keydata nvlist will include
 3672          * a to_ivset_guid for us to set on the new snapshot. This value
 3673          * will override the value generated by the snapshot code. However,
 3674          * this value may not be present, because older implementations of
 3675          * the raw send code did not include this value, and we are still
 3676          * allowed to receive them if the zfs_disable_ivset_guid_check
 3677          * tunable is set, in which case we will leave the newly-generated
 3678          * value.
 3679          */
 3680         if (!drc->drc_heal && drc->drc_raw && drc->drc_ivset_guid != 0) {
 3681                 dmu_object_zapify(dp->dp_meta_objset, newsnapobj,
 3682                     DMU_OT_DSL_DATASET, tx);
 3683                 VERIFY0(zap_update(dp->dp_meta_objset, newsnapobj,
 3684                     DS_FIELD_IVSET_GUID, sizeof (uint64_t), 1,
 3685                     &drc->drc_ivset_guid, tx));
 3686         }
 3687 
 3688         /*
 3689          * Release the hold from dmu_recv_begin.  This must be done before
 3690          * we return to open context, so that when we free the dataset's dnode
 3691          * we can evict its bonus buffer. Since the dataset may be destroyed
 3692          * at this point (and therefore won't have a valid pointer to the spa)
 3693          * we release the key mapping manually here while we do have a valid
 3694          * pointer, if it exists.
 3695          */
 3696         if (!drc->drc_raw && encrypted) {
 3697                 (void) spa_keystore_remove_mapping(dmu_tx_pool(tx)->dp_spa,
 3698                     drc->drc_ds->ds_object, drc->drc_ds);
 3699         }
 3700         dsl_dataset_disown(drc->drc_ds, 0, dmu_recv_tag);
 3701         drc->drc_ds = NULL;
 3702 }
 3703 
 3704 static int dmu_recv_end_modified_blocks = 3;
 3705 
 3706 static int
 3707 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
 3708 {
 3709 #ifdef _KERNEL
 3710         /*
 3711          * We will be destroying the ds; make sure its origin is unmounted if
 3712          * necessary.
 3713          */
 3714         char name[ZFS_MAX_DATASET_NAME_LEN];
 3715         dsl_dataset_name(drc->drc_ds, name);
 3716         zfs_destroy_unmount_origin(name);
 3717 #endif
 3718 
 3719         return (dsl_sync_task(drc->drc_tofs,
 3720             dmu_recv_end_check, dmu_recv_end_sync, drc,
 3721             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
 3722 }
 3723 
 3724 static int
 3725 dmu_recv_new_end(dmu_recv_cookie_t *drc)
 3726 {
 3727         return (dsl_sync_task(drc->drc_tofs,
 3728             dmu_recv_end_check, dmu_recv_end_sync, drc,
 3729             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
 3730 }
 3731 
 3732 int
 3733 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
 3734 {
 3735         int error;
 3736 
 3737         drc->drc_owner = owner;
 3738 
 3739         if (drc->drc_newfs)
 3740                 error = dmu_recv_new_end(drc);
 3741         else
 3742                 error = dmu_recv_existing_end(drc);
 3743 
 3744         if (error != 0) {
 3745                 dmu_recv_cleanup_ds(drc);
 3746                 nvlist_free(drc->drc_keynvl);
 3747         } else if (!drc->drc_heal) {
 3748                 if (drc->drc_newfs) {
 3749                         zvol_create_minor(drc->drc_tofs);
 3750                 }
 3751                 char *snapname = kmem_asprintf("%s@%s",
 3752                     drc->drc_tofs, drc->drc_tosnap);
 3753                 zvol_create_minor(snapname);
 3754                 kmem_strfree(snapname);
 3755         }
 3756         return (error);
 3757 }
 3758 
 3759 /*
 3760  * Return TRUE if this objset is currently being received into.
 3761  */
 3762 boolean_t
 3763 dmu_objset_is_receiving(objset_t *os)
 3764 {
 3765         return (os->os_dsl_dataset != NULL &&
 3766             os->os_dsl_dataset->ds_owner == dmu_recv_tag);
 3767 }
 3768 
 3769 ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, UINT, ZMOD_RW,
 3770         "Maximum receive queue length");
 3771 
 3772 ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, UINT, ZMOD_RW,
 3773         "Receive queue fill fraction");
 3774 
 3775 ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, UINT, ZMOD_RW,
 3776         "Maximum amount of writes to batch into one transaction");
 3777 
 3778 ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, best_effort_corrective, INT, ZMOD_RW,
 3779         "Ignore errors during corrective receive");
 3780 /* END CSTYLED */
Cache object: 9c2a7f76ab21906f881eb84d0ee4dfe0 
 
 |