1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
  24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
  26  * Copyright 2014 HybridCluster. All rights reserved.
  27  * Copyright 2015 RackTop Systems.
  28  */
  29 
  30 #include <sys/dmu.h>
  31 #include <sys/dmu_impl.h>
  32 #include <sys/dmu_tx.h>
  33 #include <sys/dbuf.h>
  34 #include <sys/dnode.h>
  35 #include <sys/zfs_context.h>
  36 #include <sys/dmu_objset.h>
  37 #include <sys/dmu_traverse.h>
  38 #include <sys/dsl_dataset.h>
  39 #include <sys/dsl_dir.h>
  40 #include <sys/dsl_prop.h>
  41 #include <sys/dsl_pool.h>
  42 #include <sys/dsl_synctask.h>
  43 #include <sys/zfs_ioctl.h>
  44 #include <sys/zap.h>
  45 #include <sys/zio_checksum.h>
  46 #include <sys/zfs_znode.h>
  47 #include <zfs_fletcher.h>
  48 #include <sys/avl.h>
  49 #include <sys/ddt.h>
  50 #include <sys/zfs_onexit.h>
  51 #include <sys/dmu_send.h>
  52 #include <sys/dsl_destroy.h>
  53 #include <sys/blkptr.h>
  54 #include <sys/dsl_bookmark.h>
  55 #include <sys/zfeature.h>
  56 #include <sys/bqueue.h>
  57 
  58 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
  59 int zfs_send_corrupt_data = B_FALSE;
  60 int zfs_send_queue_length = 16 * 1024 * 1024;
  61 int zfs_recv_queue_length = 16 * 1024 * 1024;
  62 
  63 static char *dmu_recv_tag = "dmu_recv_tag";
  64 const char *recv_clone_name = "%recv";
  65 
  66 #define BP_SPAN(datablkszsec, indblkshift, level) \
  67         (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
  68         (level) * (indblkshift - SPA_BLKPTRSHIFT)))
  69 
  70 static void byteswap_record(dmu_replay_record_t *drr);
  71 
  72 struct send_thread_arg {
  73         bqueue_t        q;
  74         dsl_dataset_t   *ds;            /* Dataset to traverse */
  75         uint64_t        fromtxg;        /* Traverse from this txg */
  76         int             flags;          /* flags to pass to traverse_dataset */
  77         int             error_code;
  78         boolean_t       cancel;
  79         zbookmark_phys_t resume;
  80 };
  81 
  82 struct send_block_record {
  83         boolean_t               eos_marker; /* Marks the end of the stream */
  84         blkptr_t                bp;
  85         zbookmark_phys_t        zb;
  86         uint8_t                 indblkshift;
  87         uint16_t                datablkszsec;
  88         bqueue_node_t           ln;
  89 };
  90 
  91 static int
  92 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
  93 {
  94         dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
  95         ssize_t resid; /* have to get resid to get detailed errno */
  96         ASSERT0(len % 8);
  97 
  98         dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
  99             (caddr_t)buf, len,
 100             0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
 101 
 102         mutex_enter(&ds->ds_sendstream_lock);
 103         *dsp->dsa_off += len;
 104         mutex_exit(&ds->ds_sendstream_lock);
 105 
 106         return (dsp->dsa_err);
 107 }
 108 
 109 /*
 110  * For all record types except BEGIN, fill in the checksum (overlaid in
 111  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
 112  * up to the start of the checksum itself.
 113  */
 114 static int
 115 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
 116 {
 117         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 118             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
 119         fletcher_4_incremental_native(dsp->dsa_drr,
 120             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 121             &dsp->dsa_zc);
 122         if (dsp->dsa_drr->drr_type != DRR_BEGIN) {
 123                 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
 124                     drr_checksum.drr_checksum));
 125                 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
 126         }
 127         fletcher_4_incremental_native(&dsp->dsa_drr->
 128             drr_u.drr_checksum.drr_checksum,
 129             sizeof (zio_cksum_t), &dsp->dsa_zc);
 130         if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
 131                 return (SET_ERROR(EINTR));
 132         if (payload_len != 0) {
 133                 fletcher_4_incremental_native(payload, payload_len,
 134                     &dsp->dsa_zc);
 135                 if (dump_bytes(dsp, payload, payload_len) != 0)
 136                         return (SET_ERROR(EINTR));
 137         }
 138         return (0);
 139 }
 140 
 141 /*
 142  * Fill in the drr_free struct, or perform aggregation if the previous record is
 143  * also a free record, and the two are adjacent.
 144  *
 145  * Note that we send free records even for a full send, because we want to be
 146  * able to receive a full send as a clone, which requires a list of all the free
 147  * and freeobject records that were generated on the source.
 148  */
 149 static int
 150 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
 151     uint64_t length)
 152 {
 153         struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
 154 
 155         /*
 156          * Skip free records if asked not to send them.  The resulting
 157          * stream cannot be received as a clone.
 158          */
 159         if (dsp->dsa_skip_free)
 160                 return (0);
 161 
 162         /*
 163          * When we receive a free record, dbuf_free_range() assumes
 164          * that the receiving system doesn't have any dbufs in the range
 165          * being freed.  This is always true because there is a one-record
 166          * constraint: we only send one WRITE record for any given
 167          * object,offset.  We know that the one-record constraint is
 168          * true because we always send data in increasing order by
 169          * object,offset.
 170          *
 171          * If the increasing-order constraint ever changes, we should find
 172          * another way to assert that the one-record constraint is still
 173          * satisfied.
 174          */
 175         ASSERT(object > dsp->dsa_last_data_object ||
 176             (object == dsp->dsa_last_data_object &&
 177             offset > dsp->dsa_last_data_offset));
 178 
 179         if (length != -1ULL && offset + length < offset)
 180                 length = -1ULL;
 181 
 182         /*
 183          * If there is a pending op, but it's not PENDING_FREE, push it out,
 184          * since free block aggregation can only be done for blocks of the
 185          * same type (i.e., DRR_FREE records can only be aggregated with
 186          * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
 187          * aggregated with other DRR_FREEOBJECTS records.
 188          */
 189         if (dsp->dsa_pending_op != PENDING_NONE &&
 190             dsp->dsa_pending_op != PENDING_FREE) {
 191                 if (dump_record(dsp, NULL, 0) != 0)
 192                         return (SET_ERROR(EINTR));
 193                 dsp->dsa_pending_op = PENDING_NONE;
 194         }
 195 
 196         if (dsp->dsa_pending_op == PENDING_FREE) {
 197                 /*
 198                  * There should never be a PENDING_FREE if length is -1
 199                  * (because dump_dnode is the only place where this
 200                  * function is called with a -1, and only after flushing
 201                  * any pending record).
 202                  */
 203                 ASSERT(length != -1ULL);
 204                 /*
 205                  * Check to see whether this free block can be aggregated
 206                  * with pending one.
 207                  */
 208                 if (drrf->drr_object == object && drrf->drr_offset +
 209                     drrf->drr_length == offset) {
 210                         drrf->drr_length += length;
 211                         return (0);
 212                 } else {
 213                         /* not a continuation.  Push out pending record */
 214                         if (dump_record(dsp, NULL, 0) != 0)
 215                                 return (SET_ERROR(EINTR));
 216                         dsp->dsa_pending_op = PENDING_NONE;
 217                 }
 218         }
 219         /* create a FREE record and make it pending */
 220         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 221         dsp->dsa_drr->drr_type = DRR_FREE;
 222         drrf->drr_object = object;
 223         drrf->drr_offset = offset;
 224         drrf->drr_length = length;
 225         drrf->drr_toguid = dsp->dsa_toguid;
 226         if (length == -1ULL) {
 227                 if (dump_record(dsp, NULL, 0) != 0)
 228                         return (SET_ERROR(EINTR));
 229         } else {
 230                 dsp->dsa_pending_op = PENDING_FREE;
 231         }
 232 
 233         return (0);
 234 }
 235 
 236 static int
 237 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
 238     uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data)
 239 {
 240         struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
 241 
 242         /*
 243          * We send data in increasing object, offset order.
 244          * See comment in dump_free() for details.
 245          */
 246         ASSERT(object > dsp->dsa_last_data_object ||
 247             (object == dsp->dsa_last_data_object &&
 248             offset > dsp->dsa_last_data_offset));
 249         dsp->dsa_last_data_object = object;
 250         dsp->dsa_last_data_offset = offset + blksz - 1;
 251 
 252         /*
 253          * If there is any kind of pending aggregation (currently either
 254          * a grouping of free objects or free blocks), push it out to
 255          * the stream, since aggregation can't be done across operations
 256          * of different types.
 257          */
 258         if (dsp->dsa_pending_op != PENDING_NONE) {
 259                 if (dump_record(dsp, NULL, 0) != 0)
 260                         return (SET_ERROR(EINTR));
 261                 dsp->dsa_pending_op = PENDING_NONE;
 262         }
 263         /* write a WRITE record */
 264         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 265         dsp->dsa_drr->drr_type = DRR_WRITE;
 266         drrw->drr_object = object;
 267         drrw->drr_type = type;
 268         drrw->drr_offset = offset;
 269         drrw->drr_length = blksz;
 270         drrw->drr_toguid = dsp->dsa_toguid;
 271         if (bp == NULL || BP_IS_EMBEDDED(bp)) {
 272                 /*
 273                  * There's no pre-computed checksum for partial-block
 274                  * writes or embedded BP's, so (like
 275                  * fletcher4-checkummed blocks) userland will have to
 276                  * compute a dedup-capable checksum itself.
 277                  */
 278                 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
 279         } else {
 280                 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
 281                 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
 282                     ZCHECKSUM_FLAG_DEDUP)
 283                         drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
 284                 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
 285                 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
 286                 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
 287                 drrw->drr_key.ddk_cksum = bp->blk_cksum;
 288         }
 289 
 290         if (dump_record(dsp, data, blksz) != 0)
 291                 return (SET_ERROR(EINTR));
 292         return (0);
 293 }
 294 
 295 static int
 296 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
 297     int blksz, const blkptr_t *bp)
 298 {
 299         char buf[BPE_PAYLOAD_SIZE];
 300         struct drr_write_embedded *drrw =
 301             &(dsp->dsa_drr->drr_u.drr_write_embedded);
 302 
 303         if (dsp->dsa_pending_op != PENDING_NONE) {
 304                 if (dump_record(dsp, NULL, 0) != 0)
 305                         return (EINTR);
 306                 dsp->dsa_pending_op = PENDING_NONE;
 307         }
 308 
 309         ASSERT(BP_IS_EMBEDDED(bp));
 310 
 311         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 312         dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
 313         drrw->drr_object = object;
 314         drrw->drr_offset = offset;
 315         drrw->drr_length = blksz;
 316         drrw->drr_toguid = dsp->dsa_toguid;
 317         drrw->drr_compression = BP_GET_COMPRESS(bp);
 318         drrw->drr_etype = BPE_GET_ETYPE(bp);
 319         drrw->drr_lsize = BPE_GET_LSIZE(bp);
 320         drrw->drr_psize = BPE_GET_PSIZE(bp);
 321 
 322         decode_embedded_bp_compressed(bp, buf);
 323 
 324         if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
 325                 return (EINTR);
 326         return (0);
 327 }
 328 
 329 static int
 330 dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
 331 {
 332         struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
 333 
 334         if (dsp->dsa_pending_op != PENDING_NONE) {
 335                 if (dump_record(dsp, NULL, 0) != 0)
 336                         return (SET_ERROR(EINTR));
 337                 dsp->dsa_pending_op = PENDING_NONE;
 338         }
 339 
 340         /* write a SPILL record */
 341         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 342         dsp->dsa_drr->drr_type = DRR_SPILL;
 343         drrs->drr_object = object;
 344         drrs->drr_length = blksz;
 345         drrs->drr_toguid = dsp->dsa_toguid;
 346 
 347         if (dump_record(dsp, data, blksz) != 0)
 348                 return (SET_ERROR(EINTR));
 349         return (0);
 350 }
 351 
 352 static int
 353 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
 354 {
 355         struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
 356 
 357         /* See comment in dump_free(). */
 358         if (dsp->dsa_skip_free)
 359                 return (0);
 360 
 361         /*
 362          * If there is a pending op, but it's not PENDING_FREEOBJECTS,
 363          * push it out, since free block aggregation can only be done for
 364          * blocks of the same type (i.e., DRR_FREE records can only be
 365          * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
 366          * can only be aggregated with other DRR_FREEOBJECTS records.
 367          */
 368         if (dsp->dsa_pending_op != PENDING_NONE &&
 369             dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
 370                 if (dump_record(dsp, NULL, 0) != 0)
 371                         return (SET_ERROR(EINTR));
 372                 dsp->dsa_pending_op = PENDING_NONE;
 373         }
 374         if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
 375                 /*
 376                  * See whether this free object array can be aggregated
 377                  * with pending one
 378                  */
 379                 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
 380                         drrfo->drr_numobjs += numobjs;
 381                         return (0);
 382                 } else {
 383                         /* can't be aggregated.  Push out pending record */
 384                         if (dump_record(dsp, NULL, 0) != 0)
 385                                 return (SET_ERROR(EINTR));
 386                         dsp->dsa_pending_op = PENDING_NONE;
 387                 }
 388         }
 389 
 390         /* write a FREEOBJECTS record */
 391         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 392         dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
 393         drrfo->drr_firstobj = firstobj;
 394         drrfo->drr_numobjs = numobjs;
 395         drrfo->drr_toguid = dsp->dsa_toguid;
 396 
 397         dsp->dsa_pending_op = PENDING_FREEOBJECTS;
 398 
 399         return (0);
 400 }
 401 
 402 static int
 403 dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
 404 {
 405         struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
 406 
 407         if (object < dsp->dsa_resume_object) {
 408                 /*
 409                  * Note: when resuming, we will visit all the dnodes in
 410                  * the block of dnodes that we are resuming from.  In
 411                  * this case it's unnecessary to send the dnodes prior to
 412                  * the one we are resuming from.  We should be at most one
 413                  * block's worth of dnodes behind the resume point.
 414                  */
 415                 ASSERT3U(dsp->dsa_resume_object - object, <,
 416                     1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
 417                 return (0);
 418         }
 419 
 420         if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
 421                 return (dump_freeobjects(dsp, object, 1));
 422 
 423         if (dsp->dsa_pending_op != PENDING_NONE) {
 424                 if (dump_record(dsp, NULL, 0) != 0)
 425                         return (SET_ERROR(EINTR));
 426                 dsp->dsa_pending_op = PENDING_NONE;
 427         }
 428 
 429         /* write an OBJECT record */
 430         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 431         dsp->dsa_drr->drr_type = DRR_OBJECT;
 432         drro->drr_object = object;
 433         drro->drr_type = dnp->dn_type;
 434         drro->drr_bonustype = dnp->dn_bonustype;
 435         drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
 436         drro->drr_bonuslen = dnp->dn_bonuslen;
 437         drro->drr_checksumtype = dnp->dn_checksum;
 438         drro->drr_compress = dnp->dn_compress;
 439         drro->drr_toguid = dsp->dsa_toguid;
 440 
 441         if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
 442             drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
 443                 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
 444 
 445         if (dump_record(dsp, DN_BONUS(dnp),
 446             P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
 447                 return (SET_ERROR(EINTR));
 448         }
 449 
 450         /* Free anything past the end of the file. */
 451         if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
 452             (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
 453                 return (SET_ERROR(EINTR));
 454         if (dsp->dsa_err != 0)
 455                 return (SET_ERROR(EINTR));
 456         return (0);
 457 }
 458 
 459 static boolean_t
 460 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
 461 {
 462         if (!BP_IS_EMBEDDED(bp))
 463                 return (B_FALSE);
 464 
 465         /*
 466          * Compression function must be legacy, or explicitly enabled.
 467          */
 468         if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
 469             !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4)))
 470                 return (B_FALSE);
 471 
 472         /*
 473          * Embed type must be explicitly enabled.
 474          */
 475         switch (BPE_GET_ETYPE(bp)) {
 476         case BP_EMBEDDED_TYPE_DATA:
 477                 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
 478                         return (B_TRUE);
 479                 break;
 480         default:
 481                 return (B_FALSE);
 482         }
 483         return (B_FALSE);
 484 }
 485 
 486 /*
 487  * This is the callback function to traverse_dataset that acts as the worker
 488  * thread for dmu_send_impl.
 489  */
 490 /*ARGSUSED*/
 491 static int
 492 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
 493     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
 494 {
 495         struct send_thread_arg *sta = arg;
 496         struct send_block_record *record;
 497         uint64_t record_size;
 498         int err = 0;
 499 
 500         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
 501             zb->zb_object >= sta->resume.zb_object);
 502 
 503         if (sta->cancel)
 504                 return (SET_ERROR(EINTR));
 505 
 506         if (bp == NULL) {
 507                 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
 508                 return (0);
 509         } else if (zb->zb_level < 0) {
 510                 return (0);
 511         }
 512 
 513         record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
 514         record->eos_marker = B_FALSE;
 515         record->bp = *bp;
 516         record->zb = *zb;
 517         record->indblkshift = dnp->dn_indblkshift;
 518         record->datablkszsec = dnp->dn_datablkszsec;
 519         record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
 520         bqueue_enqueue(&sta->q, record, record_size);
 521 
 522         return (err);
 523 }
 524 
 525 /*
 526  * This function kicks off the traverse_dataset.  It also handles setting the
 527  * error code of the thread in case something goes wrong, and pushes the End of
 528  * Stream record when the traverse_dataset call has finished.  If there is no
 529  * dataset to traverse, the thread immediately pushes End of Stream marker.
 530  */
 531 static void
 532 send_traverse_thread(void *arg)
 533 {
 534         struct send_thread_arg *st_arg = arg;
 535         int err;
 536         struct send_block_record *data;
 537 
 538         if (st_arg->ds != NULL) {
 539                 err = traverse_dataset_resume(st_arg->ds,
 540                     st_arg->fromtxg, &st_arg->resume,
 541                     st_arg->flags, send_cb, st_arg);
 542 
 543                 if (err != EINTR)
 544                         st_arg->error_code = err;
 545         }
 546         data = kmem_zalloc(sizeof (*data), KM_SLEEP);
 547         data->eos_marker = B_TRUE;
 548         bqueue_enqueue(&st_arg->q, data, 1);
 549 }
 550 
 551 /*
 552  * This function actually handles figuring out what kind of record needs to be
 553  * dumped, reading the data (which has hopefully been prefetched), and calling
 554  * the appropriate helper function.
 555  */
 556 static int
 557 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
 558 {
 559         dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
 560         const blkptr_t *bp = &data->bp;
 561         const zbookmark_phys_t *zb = &data->zb;
 562         uint8_t indblkshift = data->indblkshift;
 563         uint16_t dblkszsec = data->datablkszsec;
 564         spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
 565         dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
 566         int err = 0;
 567 
 568         ASSERT3U(zb->zb_level, >=, 0);
 569 
 570         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
 571             zb->zb_object >= dsa->dsa_resume_object);
 572 
 573         if (zb->zb_object != DMU_META_DNODE_OBJECT &&
 574             DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
 575                 return (0);
 576         } else if (BP_IS_HOLE(bp) &&
 577             zb->zb_object == DMU_META_DNODE_OBJECT) {
 578                 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
 579                 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
 580                 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
 581         } else if (BP_IS_HOLE(bp)) {
 582                 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
 583                 uint64_t offset = zb->zb_blkid * span;
 584                 err = dump_free(dsa, zb->zb_object, offset, span);
 585         } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
 586                 return (0);
 587         } else if (type == DMU_OT_DNODE) {
 588                 int blksz = BP_GET_LSIZE(bp);
 589                 arc_flags_t aflags = ARC_FLAG_WAIT;
 590                 arc_buf_t *abuf;
 591 
 592                 ASSERT0(zb->zb_level);
 593 
 594                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 595                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 596                     &aflags, zb) != 0)
 597                         return (SET_ERROR(EIO));
 598 
 599                 dnode_phys_t *blk = abuf->b_data;
 600                 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
 601                 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
 602                         err = dump_dnode(dsa, dnobj + i, blk + i);
 603                         if (err != 0)
 604                                 break;
 605                 }
 606                 (void) arc_buf_remove_ref(abuf, &abuf);
 607         } else if (type == DMU_OT_SA) {
 608                 arc_flags_t aflags = ARC_FLAG_WAIT;
 609                 arc_buf_t *abuf;
 610                 int blksz = BP_GET_LSIZE(bp);
 611 
 612                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 613                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 614                     &aflags, zb) != 0)
 615                         return (SET_ERROR(EIO));
 616 
 617                 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
 618                 (void) arc_buf_remove_ref(abuf, &abuf);
 619         } else if (backup_do_embed(dsa, bp)) {
 620                 /* it's an embedded level-0 block of a regular object */
 621                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 622                 ASSERT0(zb->zb_level);
 623                 err = dump_write_embedded(dsa, zb->zb_object,
 624                     zb->zb_blkid * blksz, blksz, bp);
 625         } else {
 626                 /* it's a level-0 block of a regular object */
 627                 arc_flags_t aflags = ARC_FLAG_WAIT;
 628                 arc_buf_t *abuf;
 629                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 630                 uint64_t offset;
 631 
 632                 ASSERT0(zb->zb_level);
 633                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
 634                     (zb->zb_object == dsa->dsa_resume_object &&
 635                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
 636 
 637                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 638                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 639                     &aflags, zb) != 0) {
 640                         if (zfs_send_corrupt_data) {
 641                                 /* Send a block filled with 0x"zfs badd bloc" */
 642                                 abuf = arc_buf_alloc(spa, blksz, &abuf,
 643                                     ARC_BUFC_DATA);
 644                                 uint64_t *ptr;
 645                                 for (ptr = abuf->b_data;
 646                                     (char *)ptr < (char *)abuf->b_data + blksz;
 647                                     ptr++)
 648                                         *ptr = 0x2f5baddb10cULL;
 649                         } else {
 650                                 return (SET_ERROR(EIO));
 651                         }
 652                 }
 653 
 654                 offset = zb->zb_blkid * blksz;
 655 
 656                 if (!(dsa->dsa_featureflags &
 657                     DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
 658                     blksz > SPA_OLD_MAXBLOCKSIZE) {
 659                         char *buf = abuf->b_data;
 660                         while (blksz > 0 && err == 0) {
 661                                 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
 662                                 err = dump_write(dsa, type, zb->zb_object,
 663                                     offset, n, NULL, buf);
 664                                 offset += n;
 665                                 buf += n;
 666                                 blksz -= n;
 667                         }
 668                 } else {
 669                         err = dump_write(dsa, type, zb->zb_object,
 670                             offset, blksz, bp, abuf->b_data);
 671                 }
 672                 (void) arc_buf_remove_ref(abuf, &abuf);
 673         }
 674 
 675         ASSERT(err == 0 || err == EINTR);
 676         return (err);
 677 }
 678 
 679 /*
 680  * Pop the new data off the queue, and free the old data.
 681  */
 682 static struct send_block_record *
 683 get_next_record(bqueue_t *bq, struct send_block_record *data)
 684 {
 685         struct send_block_record *tmp = bqueue_dequeue(bq);
 686         kmem_free(data, sizeof (*data));
 687         return (tmp);
 688 }
 689 
 690 /*
 691  * Actually do the bulk of the work in a zfs send.
 692  *
 693  * Note: Releases dp using the specified tag.
 694  */
 695 static int
 696 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
 697     zfs_bookmark_phys_t *ancestor_zb,
 698     boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd,
 699     uint64_t resumeobj, uint64_t resumeoff, boolean_t skip_free,
 700     vnode_t *vp, offset_t *off)
 701 {
 702         objset_t *os;
 703         dmu_replay_record_t *drr;
 704         dmu_sendarg_t *dsp;
 705         int err;
 706         uint64_t fromtxg = 0;
 707         uint64_t featureflags = 0;
 708         struct send_thread_arg to_arg = { 0 };
 709 
 710         err = dmu_objset_from_ds(to_ds, &os);
 711         if (err != 0) {
 712                 dsl_pool_rele(dp, tag);
 713                 return (err);
 714         }
 715 
 716         drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 717         drr->drr_type = DRR_BEGIN;
 718         drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
 719         DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
 720             DMU_SUBSTREAM);
 721 
 722 #ifdef _KERNEL
 723         if (dmu_objset_type(os) == DMU_OST_ZFS) {
 724                 uint64_t version;
 725                 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
 726                         kmem_free(drr, sizeof (dmu_replay_record_t));
 727                         dsl_pool_rele(dp, tag);
 728                         return (SET_ERROR(EINVAL));
 729                 }
 730                 if (version >= ZPL_VERSION_SA) {
 731                         featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
 732                 }
 733         }
 734 #endif
 735 
 736         if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
 737                 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
 738         if (embedok &&
 739             spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
 740                 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
 741                 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
 742                         featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4;
 743         }
 744 
 745         if (resumeobj != 0 || resumeoff != 0) {
 746                 featureflags |= DMU_BACKUP_FEATURE_RESUMING;
 747         }
 748 
 749         DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
 750             featureflags);
 751 
 752         drr->drr_u.drr_begin.drr_creation_time =
 753             dsl_dataset_phys(to_ds)->ds_creation_time;
 754         drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
 755         if (is_clone)
 756                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
 757         drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
 758         if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
 759                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
 760         if (!skip_free)
 761                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
 762 
 763         if (ancestor_zb != NULL) {
 764                 drr->drr_u.drr_begin.drr_fromguid =
 765                     ancestor_zb->zbm_guid;
 766                 fromtxg = ancestor_zb->zbm_creation_txg;
 767         }
 768         dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
 769         if (!to_ds->ds_is_snapshot) {
 770                 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
 771                     sizeof (drr->drr_u.drr_begin.drr_toname));
 772         }
 773 
 774         dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
 775 
 776         dsp->dsa_drr = drr;
 777         dsp->dsa_vp = vp;
 778         dsp->dsa_outfd = outfd;
 779         dsp->dsa_proc = curproc;
 780         dsp->dsa_os = os;
 781         dsp->dsa_off = off;
 782         dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
 783         dsp->dsa_pending_op = PENDING_NONE;
 784         dsp->dsa_featureflags = featureflags;
 785         dsp->dsa_resume_object = resumeobj;
 786         dsp->dsa_resume_offset = resumeoff;
 787         dsp->dsa_skip_free = skip_free;
 788 
 789         mutex_enter(&to_ds->ds_sendstream_lock);
 790         list_insert_head(&to_ds->ds_sendstreams, dsp);
 791         mutex_exit(&to_ds->ds_sendstream_lock);
 792 
 793         dsl_dataset_long_hold(to_ds, FTAG);
 794         dsl_pool_rele(dp, tag);
 795 
 796         void *payload = NULL;
 797         size_t payload_len = 0;
 798         if (resumeobj != 0 || resumeoff != 0) {
 799                 dmu_object_info_t to_doi;
 800                 err = dmu_object_info(os, resumeobj, &to_doi);
 801                 if (err != 0)
 802                         goto out;
 803                 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
 804                     resumeoff / to_doi.doi_data_block_size);
 805 
 806                 nvlist_t *nvl = fnvlist_alloc();
 807                 fnvlist_add_uint64(nvl, "resume_object", resumeobj);
 808                 fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
 809                 payload = fnvlist_pack(nvl, &payload_len);
 810                 drr->drr_payloadlen = payload_len;
 811                 fnvlist_free(nvl);
 812         }
 813 
 814         err = dump_record(dsp, payload, payload_len);
 815         fnvlist_pack_free(payload, payload_len);
 816         if (err != 0) {
 817                 err = dsp->dsa_err;
 818                 goto out;
 819         }
 820 
 821         err = bqueue_init(&to_arg.q, zfs_send_queue_length,
 822             offsetof(struct send_block_record, ln));
 823         to_arg.error_code = 0;
 824         to_arg.cancel = B_FALSE;
 825         to_arg.ds = to_ds;
 826         to_arg.fromtxg = fromtxg;
 827         to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
 828         (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
 829             TS_RUN, minclsyspri);
 830 
 831         struct send_block_record *to_data;
 832         to_data = bqueue_dequeue(&to_arg.q);
 833 
 834         while (!to_data->eos_marker && err == 0) {
 835                 err = do_dump(dsp, to_data);
 836                 to_data = get_next_record(&to_arg.q, to_data);
 837                 if (issig(JUSTLOOKING) && issig(FORREAL))
 838                         err = EINTR;
 839         }
 840 
 841         if (err != 0) {
 842                 to_arg.cancel = B_TRUE;
 843                 while (!to_data->eos_marker) {
 844                         to_data = get_next_record(&to_arg.q, to_data);
 845                 }
 846         }
 847         kmem_free(to_data, sizeof (*to_data));
 848 
 849         bqueue_destroy(&to_arg.q);
 850 
 851         if (err == 0 && to_arg.error_code != 0)
 852                 err = to_arg.error_code;
 853 
 854         if (err != 0)
 855                 goto out;
 856 
 857         if (dsp->dsa_pending_op != PENDING_NONE)
 858                 if (dump_record(dsp, NULL, 0) != 0)
 859                         err = SET_ERROR(EINTR);
 860 
 861         if (err != 0) {
 862                 if (err == EINTR && dsp->dsa_err != 0)
 863                         err = dsp->dsa_err;
 864                 goto out;
 865         }
 866 
 867         bzero(drr, sizeof (dmu_replay_record_t));
 868         drr->drr_type = DRR_END;
 869         drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
 870         drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
 871 
 872         if (dump_record(dsp, NULL, 0) != 0)
 873                 err = dsp->dsa_err;
 874 
 875 out:
 876         mutex_enter(&to_ds->ds_sendstream_lock);
 877         list_remove(&to_ds->ds_sendstreams, dsp);
 878         mutex_exit(&to_ds->ds_sendstream_lock);
 879 
 880         kmem_free(drr, sizeof (dmu_replay_record_t));
 881         kmem_free(dsp, sizeof (dmu_sendarg_t));
 882 
 883         dsl_dataset_long_rele(to_ds, FTAG);
 884 
 885         return (err);
 886 }
 887 
 888 int
 889 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
 890     boolean_t embedok, boolean_t large_block_ok,
 891     int outfd, boolean_t skip_free, vnode_t *vp, offset_t *off)
 892 {
 893         dsl_pool_t *dp;
 894         dsl_dataset_t *ds;
 895         dsl_dataset_t *fromds = NULL;
 896         int err;
 897 
 898         err = dsl_pool_hold(pool, FTAG, &dp);
 899         if (err != 0)
 900                 return (err);
 901 
 902         err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
 903         if (err != 0) {
 904                 dsl_pool_rele(dp, FTAG);
 905                 return (err);
 906         }
 907 
 908         if (fromsnap != 0) {
 909                 zfs_bookmark_phys_t zb;
 910                 boolean_t is_clone;
 911 
 912                 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
 913                 if (err != 0) {
 914                         dsl_dataset_rele(ds, FTAG);
 915                         dsl_pool_rele(dp, FTAG);
 916                         return (err);
 917                 }
 918                 if (!dsl_dataset_is_before(ds, fromds, 0))
 919                         err = SET_ERROR(EXDEV);
 920                 zb.zbm_creation_time =
 921                     dsl_dataset_phys(fromds)->ds_creation_time;
 922                 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
 923                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
 924                 is_clone = (fromds->ds_dir != ds->ds_dir);
 925                 dsl_dataset_rele(fromds, FTAG);
 926                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
 927                     embedok, large_block_ok, outfd, 0, 0, skip_free, vp, off);
 928         } else {
 929                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
 930                     embedok, large_block_ok, outfd, 0, 0, skip_free, vp, off);
 931         }
 932         dsl_dataset_rele(ds, FTAG);
 933         return (err);
 934 }
 935 
 936 int
 937 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
 938     boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
 939     boolean_t skip_free, vnode_t *vp, offset_t *off)
 940 {
 941         dsl_pool_t *dp;
 942         dsl_dataset_t *ds;
 943         int err;
 944         boolean_t owned = B_FALSE;
 945 
 946         if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
 947                 return (SET_ERROR(EINVAL));
 948 
 949         err = dsl_pool_hold(tosnap, FTAG, &dp);
 950         if (err != 0)
 951                 return (err);
 952 
 953         if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
 954                 /*
 955                  * We are sending a filesystem or volume.  Ensure
 956                  * that it doesn't change by owning the dataset.
 957                  */
 958                 err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
 959                 owned = B_TRUE;
 960         } else {
 961                 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
 962         }
 963         if (err != 0) {
 964                 dsl_pool_rele(dp, FTAG);
 965                 return (err);
 966         }
 967 
 968         if (fromsnap != NULL) {
 969                 zfs_bookmark_phys_t zb;
 970                 boolean_t is_clone = B_FALSE;
 971                 int fsnamelen = strchr(tosnap, '@') - tosnap;
 972 
 973                 /*
 974                  * If the fromsnap is in a different filesystem, then
 975                  * mark the send stream as a clone.
 976                  */
 977                 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
 978                     (fromsnap[fsnamelen] != '@' &&
 979                     fromsnap[fsnamelen] != '#')) {
 980                         is_clone = B_TRUE;
 981                 }
 982 
 983                 if (strchr(fromsnap, '@')) {
 984                         dsl_dataset_t *fromds;
 985                         err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
 986                         if (err == 0) {
 987                                 if (!dsl_dataset_is_before(ds, fromds, 0))
 988                                         err = SET_ERROR(EXDEV);
 989                                 zb.zbm_creation_time =
 990                                     dsl_dataset_phys(fromds)->ds_creation_time;
 991                                 zb.zbm_creation_txg =
 992                                     dsl_dataset_phys(fromds)->ds_creation_txg;
 993                                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
 994                                 is_clone = (ds->ds_dir != fromds->ds_dir);
 995                                 dsl_dataset_rele(fromds, FTAG);
 996                         }
 997                 } else {
 998                         err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
 999                 }
1000                 if (err != 0) {
1001                         dsl_dataset_rele(ds, FTAG);
1002                         dsl_pool_rele(dp, FTAG);
1003                         return (err);
1004                 }
1005                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1006                     embedok, large_block_ok,
1007                     outfd, resumeobj, resumeoff, skip_free, vp, off);
1008         } else {
1009                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1010                     embedok, large_block_ok,
1011                     outfd, resumeobj, resumeoff, skip_free, vp, off);
1012         }
1013         if (owned)
1014                 dsl_dataset_disown(ds, FTAG);
1015         else
1016                 dsl_dataset_rele(ds, FTAG);
1017         return (err);
1018 }
1019 
1020 static int
1021 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size,
1022     uint64_t *sizep)
1023 {
1024         int err;
1025         /*
1026          * Assume that space (both on-disk and in-stream) is dominated by
1027          * data.  We will adjust for indirect blocks and the copies property,
1028          * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1029          */
1030 
1031         /*
1032          * Subtract out approximate space used by indirect blocks.
1033          * Assume most space is used by data blocks (non-indirect, non-dnode).
1034          * Assume all blocks are recordsize.  Assume ditto blocks and
1035          * internal fragmentation counter out compression.
1036          *
1037          * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1038          * block, which we observe in practice.
1039          */
1040         uint64_t recordsize;
1041         err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize);
1042         if (err != 0)
1043                 return (err);
1044         size -= size / recordsize * sizeof (blkptr_t);
1045 
1046         /* Add in the space for the record associated with each block. */
1047         size += size / recordsize * sizeof (dmu_replay_record_t);
1048 
1049         *sizep = size;
1050 
1051         return (0);
1052 }
1053 
1054 int
1055 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep)
1056 {
1057         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1058         int err;
1059         uint64_t size;
1060 
1061         ASSERT(dsl_pool_config_held(dp));
1062 
1063         /* tosnap must be a snapshot */
1064         if (!ds->ds_is_snapshot)
1065                 return (SET_ERROR(EINVAL));
1066 
1067         /* fromsnap, if provided, must be a snapshot */
1068         if (fromds != NULL && !fromds->ds_is_snapshot)
1069                 return (SET_ERROR(EINVAL));
1070 
1071         /*
1072          * fromsnap must be an earlier snapshot from the same fs as tosnap,
1073          * or the origin's fs.
1074          */
1075         if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1076                 return (SET_ERROR(EXDEV));
1077 
1078         /* Get uncompressed size estimate of changed data. */
1079         if (fromds == NULL) {
1080                 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1081         } else {
1082                 uint64_t used, comp;
1083                 err = dsl_dataset_space_written(fromds, ds,
1084                     &used, &comp, &size);
1085                 if (err != 0)
1086                         return (err);
1087         }
1088 
1089         err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1090         return (err);
1091 }
1092 
1093 /*
1094  * Simple callback used to traverse the blocks of a snapshot and sum their
1095  * uncompressed size
1096  */
1097 /* ARGSUSED */
1098 static int
1099 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1100     const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1101 {
1102         uint64_t *spaceptr = arg;
1103         if (bp != NULL && !BP_IS_HOLE(bp)) {
1104                 *spaceptr += BP_GET_UCSIZE(bp);
1105         }
1106         return (0);
1107 }
1108 
1109 /*
1110  * Given a desination snapshot and a TXG, calculate the approximate size of a
1111  * send stream sent from that TXG. from_txg may be zero, indicating that the
1112  * whole snapshot will be sent.
1113  */
1114 int
1115 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1116     uint64_t *sizep)
1117 {
1118         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1119         int err;
1120         uint64_t size = 0;
1121 
1122         ASSERT(dsl_pool_config_held(dp));
1123 
1124         /* tosnap must be a snapshot */
1125         if (!dsl_dataset_is_snapshot(ds))
1126                 return (SET_ERROR(EINVAL));
1127 
1128         /* verify that from_txg is before the provided snapshot was taken */
1129         if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1130                 return (SET_ERROR(EXDEV));
1131         }
1132 
1133         /*
1134          * traverse the blocks of the snapshot with birth times after
1135          * from_txg, summing their uncompressed size
1136          */
1137         err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1138             dmu_calculate_send_traversal, &size);
1139         if (err)
1140                 return (err);
1141 
1142         err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1143         return (err);
1144 }
1145 
1146 typedef struct dmu_recv_begin_arg {
1147         const char *drba_origin;
1148         dmu_recv_cookie_t *drba_cookie;
1149         cred_t *drba_cred;
1150         uint64_t drba_snapobj;
1151 } dmu_recv_begin_arg_t;
1152 
1153 static int
1154 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1155     uint64_t fromguid)
1156 {
1157         uint64_t val;
1158         int error;
1159         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1160 
1161         /* temporary clone name must not exist */
1162         error = zap_lookup(dp->dp_meta_objset,
1163             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1164             8, 1, &val);
1165         if (error != ENOENT)
1166                 return (error == 0 ? EBUSY : error);
1167 
1168         /* new snapshot name must not exist */
1169         error = zap_lookup(dp->dp_meta_objset,
1170             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1171             drba->drba_cookie->drc_tosnap, 8, 1, &val);
1172         if (error != ENOENT)
1173                 return (error == 0 ? EEXIST : error);
1174 
1175         /*
1176          * Check snapshot limit before receiving. We'll recheck again at the
1177          * end, but might as well abort before receiving if we're already over
1178          * the limit.
1179          *
1180          * Note that we do not check the file system limit with
1181          * dsl_dir_fscount_check because the temporary %clones don't count
1182          * against that limit.
1183          */
1184         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1185             NULL, drba->drba_cred);
1186         if (error != 0)
1187                 return (error);
1188 
1189         if (fromguid != 0) {
1190                 dsl_dataset_t *snap;
1191                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1192 
1193                 /* Find snapshot in this dir that matches fromguid. */
1194                 while (obj != 0) {
1195                         error = dsl_dataset_hold_obj(dp, obj, FTAG,
1196                             &snap);
1197                         if (error != 0)
1198                                 return (SET_ERROR(ENODEV));
1199                         if (snap->ds_dir != ds->ds_dir) {
1200                                 dsl_dataset_rele(snap, FTAG);
1201                                 return (SET_ERROR(ENODEV));
1202                         }
1203                         if (dsl_dataset_phys(snap)->ds_guid == fromguid)
1204                                 break;
1205                         obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
1206                         dsl_dataset_rele(snap, FTAG);
1207                 }
1208                 if (obj == 0)
1209                         return (SET_ERROR(ENODEV));
1210 
1211                 if (drba->drba_cookie->drc_force) {
1212                         drba->drba_snapobj = obj;
1213                 } else {
1214                         /*
1215                          * If we are not forcing, there must be no
1216                          * changes since fromsnap.
1217                          */
1218                         if (dsl_dataset_modified_since_snap(ds, snap)) {
1219                                 dsl_dataset_rele(snap, FTAG);
1220                                 return (SET_ERROR(ETXTBSY));
1221                         }
1222                         drba->drba_snapobj = ds->ds_prev->ds_object;
1223                 }
1224 
1225                 dsl_dataset_rele(snap, FTAG);
1226         } else {
1227                 /* if full, then must be forced */
1228                 if (!drba->drba_cookie->drc_force)
1229                         return (SET_ERROR(EEXIST));
1230                 /* start from $ORIGIN@$ORIGIN, if supported */
1231                 drba->drba_snapobj = dp->dp_origin_snap != NULL ?
1232                     dp->dp_origin_snap->ds_object : 0;
1233         }
1234 
1235         return (0);
1236 
1237 }
1238 
1239 static int
1240 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
1241 {
1242         dmu_recv_begin_arg_t *drba = arg;
1243         dsl_pool_t *dp = dmu_tx_pool(tx);
1244         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1245         uint64_t fromguid = drrb->drr_fromguid;
1246         int flags = drrb->drr_flags;
1247         int error;
1248         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1249         dsl_dataset_t *ds;
1250         const char *tofs = drba->drba_cookie->drc_tofs;
1251 
1252         /* already checked */
1253         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1254         ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
1255 
1256         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1257             DMU_COMPOUNDSTREAM ||
1258             drrb->drr_type >= DMU_OST_NUMTYPES ||
1259             ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
1260                 return (SET_ERROR(EINVAL));
1261 
1262         /* Verify pool version supports SA if SA_SPILL feature set */
1263         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1264             spa_version(dp->dp_spa) < SPA_VERSION_SA)
1265                 return (SET_ERROR(ENOTSUP));
1266 
1267         if (drba->drba_cookie->drc_resumable &&
1268             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
1269                 return (SET_ERROR(ENOTSUP));
1270 
1271         /*
1272          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1273          * record to a plan WRITE record, so the pool must have the
1274          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1275          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1276          */
1277         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1278             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1279                 return (SET_ERROR(ENOTSUP));
1280         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1281             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1282                 return (SET_ERROR(ENOTSUP));
1283 
1284         /*
1285          * The receiving code doesn't know how to translate large blocks
1286          * to smaller ones, so the pool must have the LARGE_BLOCKS
1287          * feature enabled if the stream has LARGE_BLOCKS.
1288          */
1289         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1290             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1291                 return (SET_ERROR(ENOTSUP));
1292 
1293         error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1294         if (error == 0) {
1295                 /* target fs already exists; recv into temp clone */
1296 
1297                 /* Can't recv a clone into an existing fs */
1298                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1299                         dsl_dataset_rele(ds, FTAG);
1300                         return (SET_ERROR(EINVAL));
1301                 }
1302 
1303                 error = recv_begin_check_existing_impl(drba, ds, fromguid);
1304                 dsl_dataset_rele(ds, FTAG);
1305         } else if (error == ENOENT) {
1306                 /* target fs does not exist; must be a full backup or clone */
1307                 char buf[MAXNAMELEN];
1308 
1309                 /*
1310                  * If it's a non-clone incremental, we are missing the
1311                  * target fs, so fail the recv.
1312                  */
1313                 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1314                     drba->drba_origin))
1315                         return (SET_ERROR(ENOENT));
1316 
1317                 /*
1318                  * If we're receiving a full send as a clone, and it doesn't
1319                  * contain all the necessary free records and freeobject
1320                  * records, reject it.
1321                  */
1322                 if (fromguid == 0 && drba->drba_origin &&
1323                     !(flags & DRR_FLAG_FREERECORDS))
1324                         return (SET_ERROR(EINVAL));
1325 
1326                 /* Open the parent of tofs */
1327                 ASSERT3U(strlen(tofs), <, MAXNAMELEN);
1328                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1329                 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1330                 if (error != 0)
1331                         return (error);
1332 
1333                 /*
1334                  * Check filesystem and snapshot limits before receiving. We'll
1335                  * recheck snapshot limits again at the end (we create the
1336                  * filesystems and increment those counts during begin_sync).
1337                  */
1338                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1339                     ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1340                 if (error != 0) {
1341                         dsl_dataset_rele(ds, FTAG);
1342                         return (error);
1343                 }
1344 
1345                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1346                     ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1347                 if (error != 0) {
1348                         dsl_dataset_rele(ds, FTAG);
1349                         return (error);
1350                 }
1351 
1352                 if (drba->drba_origin != NULL) {
1353                         dsl_dataset_t *origin;
1354                         error = dsl_dataset_hold(dp, drba->drba_origin,
1355                             FTAG, &origin);
1356                         if (error != 0) {
1357                                 dsl_dataset_rele(ds, FTAG);
1358                                 return (error);
1359                         }
1360                         if (!origin->ds_is_snapshot) {
1361                                 dsl_dataset_rele(origin, FTAG);
1362                                 dsl_dataset_rele(ds, FTAG);
1363                                 return (SET_ERROR(EINVAL));
1364                         }
1365                         if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
1366                             fromguid != 0) {
1367                                 dsl_dataset_rele(origin, FTAG);
1368                                 dsl_dataset_rele(ds, FTAG);
1369                                 return (SET_ERROR(ENODEV));
1370                         }
1371                         dsl_dataset_rele(origin, FTAG);
1372                 }
1373                 dsl_dataset_rele(ds, FTAG);
1374                 error = 0;
1375         }
1376         return (error);
1377 }
1378 
1379 static void
1380 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
1381 {
1382         dmu_recv_begin_arg_t *drba = arg;
1383         dsl_pool_t *dp = dmu_tx_pool(tx);
1384         objset_t *mos = dp->dp_meta_objset;
1385         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1386         const char *tofs = drba->drba_cookie->drc_tofs;
1387         dsl_dataset_t *ds, *newds;
1388         uint64_t dsobj;
1389         int error;
1390         uint64_t crflags = 0;
1391 
1392         if (drrb->drr_flags & DRR_FLAG_CI_DATA)
1393                 crflags |= DS_FLAG_CI_DATASET;
1394 
1395         error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1396         if (error == 0) {
1397                 /* create temporary clone */
1398                 dsl_dataset_t *snap = NULL;
1399                 if (drba->drba_snapobj != 0) {
1400                         VERIFY0(dsl_dataset_hold_obj(dp,
1401                             drba->drba_snapobj, FTAG, &snap));
1402                 }
1403                 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
1404                     snap, crflags, drba->drba_cred, tx);
1405                 if (drba->drba_snapobj != 0)
1406                         dsl_dataset_rele(snap, FTAG);
1407                 dsl_dataset_rele(ds, FTAG);
1408         } else {
1409                 dsl_dir_t *dd;
1410                 const char *tail;
1411                 dsl_dataset_t *origin = NULL;
1412 
1413                 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
1414 
1415                 if (drba->drba_origin != NULL) {
1416                         VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
1417                             FTAG, &origin));
1418                 }
1419 
1420                 /* Create new dataset. */
1421                 dsobj = dsl_dataset_create_sync(dd,
1422                     strrchr(tofs, '/') + 1,
1423                     origin, crflags, drba->drba_cred, tx);
1424                 if (origin != NULL)
1425                         dsl_dataset_rele(origin, FTAG);
1426                 dsl_dir_rele(dd, FTAG);
1427                 drba->drba_cookie->drc_newfs = B_TRUE;
1428         }
1429         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
1430 
1431         if (drba->drba_cookie->drc_resumable) {
1432                 dsl_dataset_zapify(newds, tx);
1433                 if (drrb->drr_fromguid != 0) {
1434                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
1435                             8, 1, &drrb->drr_fromguid, tx));
1436                 }
1437                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
1438                     8, 1, &drrb->drr_toguid, tx));
1439                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
1440                     1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
1441                 uint64_t one = 1;
1442                 uint64_t zero = 0;
1443                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
1444                     8, 1, &one, tx));
1445                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
1446                     8, 1, &zero, tx));
1447                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
1448                     8, 1, &zero, tx));
1449                 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1450                     DMU_BACKUP_FEATURE_EMBED_DATA) {
1451                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
1452                             8, 1, &one, tx));
1453                 }
1454         }
1455 
1456         dmu_buf_will_dirty(newds->ds_dbuf, tx);
1457         dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1458 
1459         /*
1460          * If we actually created a non-clone, we need to create the
1461          * objset in our new dataset.
1462          */
1463         if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
1464                 (void) dmu_objset_create_impl(dp->dp_spa,
1465                     newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1466         }
1467 
1468         drba->drba_cookie->drc_ds = newds;
1469 
1470         spa_history_log_internal_ds(newds, "receive", tx, "");
1471 }
1472 
1473 static int
1474 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
1475 {
1476         dmu_recv_begin_arg_t *drba = arg;
1477         dsl_pool_t *dp = dmu_tx_pool(tx);
1478         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1479         int error;
1480         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1481         dsl_dataset_t *ds;
1482         const char *tofs = drba->drba_cookie->drc_tofs;
1483 
1484         /* already checked */
1485         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1486         ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
1487 
1488         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1489             DMU_COMPOUNDSTREAM ||
1490             drrb->drr_type >= DMU_OST_NUMTYPES)
1491                 return (SET_ERROR(EINVAL));
1492 
1493         /* Verify pool version supports SA if SA_SPILL feature set */
1494         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1495             spa_version(dp->dp_spa) < SPA_VERSION_SA)
1496                 return (SET_ERROR(ENOTSUP));
1497 
1498         /*
1499          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1500          * record to a plain WRITE record, so the pool must have the
1501          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1502          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1503          */
1504         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1505             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1506                 return (SET_ERROR(ENOTSUP));
1507         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1508             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1509                 return (SET_ERROR(ENOTSUP));
1510 
1511         char recvname[ZFS_MAXNAMELEN];
1512 
1513         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
1514             tofs, recv_clone_name);
1515 
1516         if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1517                 /* %recv does not exist; continue in tofs */
1518                 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1519                 if (error != 0)
1520                         return (error);
1521         }
1522 
1523         /* check that ds is marked inconsistent */
1524         if (!DS_IS_INCONSISTENT(ds)) {
1525                 dsl_dataset_rele(ds, FTAG);
1526                 return (SET_ERROR(EINVAL));
1527         }
1528 
1529         /* check that there is resuming data, and that the toguid matches */
1530         if (!dsl_dataset_is_zapified(ds)) {
1531                 dsl_dataset_rele(ds, FTAG);
1532                 return (SET_ERROR(EINVAL));
1533         }
1534         uint64_t val;
1535         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1536             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1537         if (error != 0 || drrb->drr_toguid != val) {
1538                 dsl_dataset_rele(ds, FTAG);
1539                 return (SET_ERROR(EINVAL));
1540         }
1541 
1542         /*
1543          * Check if the receive is still running.  If so, it will be owned.
1544          * Note that nothing else can own the dataset (e.g. after the receive
1545          * fails) because it will be marked inconsistent.
1546          */
1547         if (dsl_dataset_has_owner(ds)) {
1548                 dsl_dataset_rele(ds, FTAG);
1549                 return (SET_ERROR(EBUSY));
1550         }
1551 
1552         /* There should not be any snapshots of this fs yet. */
1553         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1554                 dsl_dataset_rele(ds, FTAG);
1555                 return (SET_ERROR(EINVAL));
1556         }
1557 
1558         /*
1559          * Note: resume point will be checked when we process the first WRITE
1560          * record.
1561          */
1562 
1563         /* check that the origin matches */
1564         val = 0;
1565         (void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
1566             DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
1567         if (drrb->drr_fromguid != val) {
1568                 dsl_dataset_rele(ds, FTAG);
1569                 return (SET_ERROR(EINVAL));
1570         }
1571 
1572         dsl_dataset_rele(ds, FTAG);
1573         return (0);
1574 }
1575 
1576 static void
1577 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
1578 {
1579         dmu_recv_begin_arg_t *drba = arg;
1580         dsl_pool_t *dp = dmu_tx_pool(tx);
1581         const char *tofs = drba->drba_cookie->drc_tofs;
1582         dsl_dataset_t *ds;
1583         uint64_t dsobj;
1584         char recvname[ZFS_MAXNAMELEN];
1585 
1586         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
1587             tofs, recv_clone_name);
1588 
1589         if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1590                 /* %recv does not exist; continue in tofs */
1591                 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds));
1592                 drba->drba_cookie->drc_newfs = B_TRUE;
1593         }
1594 
1595         /* clear the inconsistent flag so that we can own it */
1596         ASSERT(DS_IS_INCONSISTENT(ds));
1597         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1598         dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
1599         dsobj = ds->ds_object;
1600         dsl_dataset_rele(ds, FTAG);
1601 
1602         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1603 
1604         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1605         dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1606 
1607         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1608 
1609         drba->drba_cookie->drc_ds = ds;
1610 
1611         spa_history_log_internal_ds(ds, "resume receive", tx, "");
1612 }
1613 
1614 /*
1615  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1616  * succeeds; otherwise we will leak the holds on the datasets.
1617  */
1618 int
1619 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1620     boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)
1621 {
1622         dmu_recv_begin_arg_t drba = { 0 };
1623 
1624         bzero(drc, sizeof (dmu_recv_cookie_t));
1625         drc->drc_drr_begin = drr_begin;
1626         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1627         drc->drc_tosnap = tosnap;
1628         drc->drc_tofs = tofs;
1629         drc->drc_force = force;
1630         drc->drc_resumable = resumable;
1631         drc->drc_cred = CRED();
1632 
1633         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1634                 drc->drc_byteswap = B_TRUE;
1635                 fletcher_4_incremental_byteswap(drr_begin,
1636                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
1637                 byteswap_record(drr_begin);
1638         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1639                 fletcher_4_incremental_native(drr_begin,
1640                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
1641         } else {
1642                 return (SET_ERROR(EINVAL));
1643         }
1644 
1645         drba.drba_origin = origin;
1646         drba.drba_cookie = drc;
1647         drba.drba_cred = CRED();
1648 
1649         if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1650             DMU_BACKUP_FEATURE_RESUMING) {
1651                 return (dsl_sync_task(tofs,
1652                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1653                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1654         } else  {
1655                 return (dsl_sync_task(tofs,
1656                     dmu_recv_begin_check, dmu_recv_begin_sync,
1657                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1658         }
1659 }
1660 
1661 struct receive_record_arg {
1662         dmu_replay_record_t header;
1663         void *payload; /* Pointer to a buffer containing the payload */
1664         /*
1665          * If the record is a write, pointer to the arc_buf_t containing the
1666          * payload.
1667          */
1668         arc_buf_t *write_buf;
1669         int payload_size;
1670         uint64_t bytes_read; /* bytes read from stream when record created */
1671         boolean_t eos_marker; /* Marks the end of the stream */
1672         bqueue_node_t node;
1673 };
1674 
1675 struct receive_writer_arg {
1676         objset_t *os;
1677         boolean_t byteswap;
1678         bqueue_t q;
1679 
1680         /*
1681          * These three args are used to signal to the main thread that we're
1682          * done.
1683          */
1684         kmutex_t mutex;
1685         kcondvar_t cv;
1686         boolean_t done;
1687 
1688         int err;
1689         /* A map from guid to dataset to help handle dedup'd streams. */
1690         avl_tree_t *guid_to_ds_map;
1691         boolean_t resumable;
1692         uint64_t last_object, last_offset;
1693         uint64_t bytes_read; /* bytes read when current record created */
1694 };
1695 
1696 struct objlist {
1697         list_t list; /* List of struct receive_objnode. */
1698         /*
1699          * Last object looked up. Used to assert that objects are being looked
1700          * up in ascending order.
1701          */
1702         uint64_t last_lookup;
1703 };
1704 
1705 struct receive_objnode {
1706         list_node_t node;
1707         uint64_t object;
1708 };
1709 
1710 struct receive_arg  {
1711         objset_t *os;
1712         vnode_t *vp; /* The vnode to read the stream from */
1713         uint64_t voff; /* The current offset in the stream */
1714         uint64_t bytes_read;
1715         /*
1716          * A record that has had its payload read in, but hasn't yet been handed
1717          * off to the worker thread.
1718          */
1719         struct receive_record_arg *rrd;
1720         /* A record that has had its header read in, but not its payload. */
1721         struct receive_record_arg *next_rrd;
1722         zio_cksum_t cksum;
1723         zio_cksum_t prev_cksum;
1724         int err;
1725         boolean_t byteswap;
1726         /* Sorted list of objects not to issue prefetches for. */
1727         struct objlist ignore_objlist;
1728 };
1729 
1730 typedef struct guid_map_entry {
1731         uint64_t        guid;
1732         dsl_dataset_t   *gme_ds;
1733         avl_node_t      avlnode;
1734 } guid_map_entry_t;
1735 
1736 static int
1737 guid_compare(const void *arg1, const void *arg2)
1738 {
1739         const guid_map_entry_t *gmep1 = arg1;
1740         const guid_map_entry_t *gmep2 = arg2;
1741 
1742         if (gmep1->guid < gmep2->guid)
1743                 return (-1);
1744         else if (gmep1->guid > gmep2->guid)
1745                 return (1);
1746         return (0);
1747 }
1748 
1749 static void
1750 free_guid_map_onexit(void *arg)
1751 {
1752         avl_tree_t *ca = arg;
1753         void *cookie = NULL;
1754         guid_map_entry_t *gmep;
1755 
1756         while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1757                 dsl_dataset_long_rele(gmep->gme_ds, gmep);
1758                 dsl_dataset_rele(gmep->gme_ds, gmep);
1759                 kmem_free(gmep, sizeof (guid_map_entry_t));
1760         }
1761         avl_destroy(ca);
1762         kmem_free(ca, sizeof (avl_tree_t));
1763 }
1764 
1765 static int
1766 receive_read(struct receive_arg *ra, int len, void *buf)
1767 {
1768         int done = 0;
1769 
1770         /* some things will require 8-byte alignment, so everything must */
1771         ASSERT0(len % 8);
1772 
1773         while (done < len) {
1774                 ssize_t resid;
1775 
1776                 ra->err = vn_rdwr(UIO_READ, ra->vp,
1777                     (char *)buf + done, len - done,
1778                     ra->voff, UIO_SYSSPACE, FAPPEND,
1779                     RLIM64_INFINITY, CRED(), &resid);
1780 
1781                 if (resid == len - done) {
1782                         /*
1783                          * Note: ECKSUM indicates that the receive
1784                          * was interrupted and can potentially be resumed.
1785                          */
1786                         ra->err = SET_ERROR(ECKSUM);
1787                 }
1788                 ra->voff += len - done - resid;
1789                 done = len - resid;
1790                 if (ra->err != 0)
1791                         return (ra->err);
1792         }
1793 
1794         ra->bytes_read += len;
1795 
1796         ASSERT3U(done, ==, len);
1797         return (0);
1798 }
1799 
1800 static void
1801 byteswap_record(dmu_replay_record_t *drr)
1802 {
1803 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1804 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1805         drr->drr_type = BSWAP_32(drr->drr_type);
1806         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1807 
1808         switch (drr->drr_type) {
1809         case DRR_BEGIN:
1810                 DO64(drr_begin.drr_magic);
1811                 DO64(drr_begin.drr_versioninfo);
1812                 DO64(drr_begin.drr_creation_time);
1813                 DO32(drr_begin.drr_type);
1814                 DO32(drr_begin.drr_flags);
1815                 DO64(drr_begin.drr_toguid);
1816                 DO64(drr_begin.drr_fromguid);
1817                 break;
1818         case DRR_OBJECT:
1819                 DO64(drr_object.drr_object);
1820                 DO32(drr_object.drr_type);
1821                 DO32(drr_object.drr_bonustype);
1822                 DO32(drr_object.drr_blksz);
1823                 DO32(drr_object.drr_bonuslen);
1824                 DO64(drr_object.drr_toguid);
1825                 break;
1826         case DRR_FREEOBJECTS:
1827                 DO64(drr_freeobjects.drr_firstobj);
1828                 DO64(drr_freeobjects.drr_numobjs);
1829                 DO64(drr_freeobjects.drr_toguid);
1830                 break;
1831         case DRR_WRITE:
1832                 DO64(drr_write.drr_object);
1833                 DO32(drr_write.drr_type);
1834                 DO64(drr_write.drr_offset);
1835                 DO64(drr_write.drr_length);
1836                 DO64(drr_write.drr_toguid);
1837                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
1838                 DO64(drr_write.drr_key.ddk_prop);
1839                 break;
1840         case DRR_WRITE_BYREF:
1841                 DO64(drr_write_byref.drr_object);
1842                 DO64(drr_write_byref.drr_offset);
1843                 DO64(drr_write_byref.drr_length);
1844                 DO64(drr_write_byref.drr_toguid);
1845                 DO64(drr_write_byref.drr_refguid);
1846                 DO64(drr_write_byref.drr_refobject);
1847                 DO64(drr_write_byref.drr_refoffset);
1848                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
1849                     drr_key.ddk_cksum);
1850                 DO64(drr_write_byref.drr_key.ddk_prop);
1851                 break;
1852         case DRR_WRITE_EMBEDDED:
1853                 DO64(drr_write_embedded.drr_object);
1854                 DO64(drr_write_embedded.drr_offset);
1855                 DO64(drr_write_embedded.drr_length);
1856                 DO64(drr_write_embedded.drr_toguid);
1857                 DO32(drr_write_embedded.drr_lsize);
1858                 DO32(drr_write_embedded.drr_psize);
1859                 break;
1860         case DRR_FREE:
1861                 DO64(drr_free.drr_object);
1862                 DO64(drr_free.drr_offset);
1863                 DO64(drr_free.drr_length);
1864                 DO64(drr_free.drr_toguid);
1865                 break;
1866         case DRR_SPILL:
1867                 DO64(drr_spill.drr_object);
1868                 DO64(drr_spill.drr_length);
1869                 DO64(drr_spill.drr_toguid);
1870                 break;
1871         case DRR_END:
1872                 DO64(drr_end.drr_toguid);
1873                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1874                 break;
1875         }
1876 
1877         if (drr->drr_type != DRR_BEGIN) {
1878                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1879         }
1880 
1881 #undef DO64
1882 #undef DO32
1883 }
1884 
1885 static inline uint8_t
1886 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1887 {
1888         if (bonus_type == DMU_OT_SA) {
1889                 return (1);
1890         } else {
1891                 return (1 +
1892                     ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT));
1893         }
1894 }
1895 
1896 static void
1897 save_resume_state(struct receive_writer_arg *rwa,
1898     uint64_t object, uint64_t offset, dmu_tx_t *tx)
1899 {
1900         int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1901 
1902         if (!rwa->resumable)
1903                 return;
1904 
1905         /*
1906          * We use ds_resume_bytes[] != 0 to indicate that we need to
1907          * update this on disk, so it must not be 0.
1908          */
1909         ASSERT(rwa->bytes_read != 0);
1910 
1911         /*
1912          * We only resume from write records, which have a valid
1913          * (non-meta-dnode) object number.
1914          */
1915         ASSERT(object != 0);
1916 
1917         /*
1918          * For resuming to work correctly, we must receive records in order,
1919          * sorted by object,offset.  This is checked by the callers, but
1920          * assert it here for good measure.
1921          */
1922         ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1923         ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1924             offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1925         ASSERT3U(rwa->bytes_read, >=,
1926             rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
1927 
1928         rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
1929         rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
1930         rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
1931 }
1932 
1933 static int
1934 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1935     void *data)
1936 {
1937         dmu_object_info_t doi;
1938         dmu_tx_t *tx;
1939         uint64_t object;
1940         int err;
1941 
1942         if (drro->drr_type == DMU_OT_NONE ||
1943             !DMU_OT_IS_VALID(drro->drr_type) ||
1944             !DMU_OT_IS_VALID(drro->drr_bonustype) ||
1945             drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
1946             drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
1947             P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
1948             drro->drr_blksz < SPA_MINBLOCKSIZE ||
1949             drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
1950             drro->drr_bonuslen > DN_MAX_BONUSLEN) {
1951                 return (SET_ERROR(EINVAL));
1952         }
1953 
1954         err = dmu_object_info(rwa->os, drro->drr_object, &doi);
1955 
1956         if (err != 0 && err != ENOENT)
1957                 return (SET_ERROR(EINVAL));
1958         object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT;
1959 
1960         /*
1961          * If we are losing blkptrs or changing the block size this must
1962          * be a new file instance.  We must clear out the previous file
1963          * contents before we can change this type of metadata in the dnode.
1964          */
1965         if (err == 0) {
1966                 int nblkptr;
1967 
1968                 nblkptr = deduce_nblkptr(drro->drr_bonustype,
1969                     drro->drr_bonuslen);
1970 
1971                 if (drro->drr_blksz != doi.doi_data_block_size ||
1972                     nblkptr < doi.doi_nblkptr) {
1973                         err = dmu_free_long_range(rwa->os, drro->drr_object,
1974                             0, DMU_OBJECT_END);
1975                         if (err != 0)
1976                                 return (SET_ERROR(EINVAL));
1977                 }
1978         }
1979 
1980         tx = dmu_tx_create(rwa->os);
1981         dmu_tx_hold_bonus(tx, object);
1982         err = dmu_tx_assign(tx, TXG_WAIT);
1983         if (err != 0) {
1984                 dmu_tx_abort(tx);
1985                 return (err);
1986         }
1987 
1988         if (object == DMU_NEW_OBJECT) {
1989                 /* currently free, want to be allocated */
1990                 err = dmu_object_claim(rwa->os, drro->drr_object,
1991                     drro->drr_type, drro->drr_blksz,
1992                     drro->drr_bonustype, drro->drr_bonuslen, tx);
1993         } else if (drro->drr_type != doi.doi_type ||
1994             drro->drr_blksz != doi.doi_data_block_size ||
1995             drro->drr_bonustype != doi.doi_bonus_type ||
1996             drro->drr_bonuslen != doi.doi_bonus_size) {
1997                 /* currently allocated, but with different properties */
1998                 err = dmu_object_reclaim(rwa->os, drro->drr_object,
1999                     drro->drr_type, drro->drr_blksz,
2000                     drro->drr_bonustype, drro->drr_bonuslen, tx);
2001         }
2002         if (err != 0) {
2003                 dmu_tx_commit(tx);
2004                 return (SET_ERROR(EINVAL));
2005         }
2006 
2007         dmu_object_set_checksum(rwa->os, drro->drr_object,
2008             drro->drr_checksumtype, tx);
2009         dmu_object_set_compress(rwa->os, drro->drr_object,
2010             drro->drr_compress, tx);
2011 
2012         if (data != NULL) {
2013                 dmu_buf_t *db;
2014 
2015                 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
2016                 dmu_buf_will_dirty(db, tx);
2017 
2018                 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
2019                 bcopy(data, db->db_data, drro->drr_bonuslen);
2020                 if (rwa->byteswap) {
2021                         dmu_object_byteswap_t byteswap =
2022                             DMU_OT_BYTESWAP(drro->drr_bonustype);
2023                         dmu_ot_byteswap[byteswap].ob_func(db->db_data,
2024                             drro->drr_bonuslen);
2025                 }
2026                 dmu_buf_rele(db, FTAG);
2027         }
2028         dmu_tx_commit(tx);
2029 
2030         return (0);
2031 }
2032 
2033 /* ARGSUSED */
2034 static int
2035 receive_freeobjects(struct receive_writer_arg *rwa,
2036     struct drr_freeobjects *drrfo)
2037 {
2038         uint64_t obj;
2039         int next_err = 0;
2040 
2041         if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
2042                 return (SET_ERROR(EINVAL));
2043 
2044         for (obj = drrfo->drr_firstobj;
2045             obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
2046             next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
2047                 int err;
2048 
2049                 if (dmu_object_info(rwa->os, obj, NULL) != 0)
2050                         continue;
2051 
2052                 err = dmu_free_long_object(rwa->os, obj);
2053                 if (err != 0)
2054                         return (err);
2055         }
2056         if (next_err != ESRCH)
2057                 return (next_err);
2058         return (0);
2059 }
2060 
2061 static int
2062 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
2063     arc_buf_t *abuf)
2064 {
2065         dmu_tx_t *tx;
2066         int err;
2067 
2068         if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
2069             !DMU_OT_IS_VALID(drrw->drr_type))
2070                 return (SET_ERROR(EINVAL));
2071 
2072         /*
2073          * For resuming to work, records must be in increasing order
2074          * by (object, offset).
2075          */
2076         if (drrw->drr_object < rwa->last_object ||
2077             (drrw->drr_object == rwa->last_object &&
2078             drrw->drr_offset < rwa->last_offset)) {
2079                 return (SET_ERROR(EINVAL));
2080         }
2081         rwa->last_object = drrw->drr_object;
2082         rwa->last_offset = drrw->drr_offset;
2083 
2084         if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2085                 return (SET_ERROR(EINVAL));
2086 
2087         tx = dmu_tx_create(rwa->os);
2088 
2089         dmu_tx_hold_write(tx, drrw->drr_object,
2090             drrw->drr_offset, drrw->drr_length);
2091         err = dmu_tx_assign(tx, TXG_WAIT);
2092         if (err != 0) {
2093                 dmu_tx_abort(tx);
2094                 return (err);
2095         }
2096         if (rwa->byteswap) {
2097                 dmu_object_byteswap_t byteswap =
2098                     DMU_OT_BYTESWAP(drrw->drr_type);
2099                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2100                     drrw->drr_length);
2101         }
2102 
2103         dmu_buf_t *bonus;
2104         if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2105                 return (SET_ERROR(EINVAL));
2106         dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2107 
2108         /*
2109          * Note: If the receive fails, we want the resume stream to start
2110          * with the same record that we last successfully received (as opposed
2111          * to the next record), so that we can verify that we are
2112          * resuming from the correct location.
2113          */
2114         save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
2115         dmu_tx_commit(tx);
2116         dmu_buf_rele(bonus, FTAG);
2117 
2118         return (0);
2119 }
2120 
2121 /*
2122  * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
2123  * streams to refer to a copy of the data that is already on the
2124  * system because it came in earlier in the stream.  This function
2125  * finds the earlier copy of the data, and uses that copy instead of
2126  * data from the stream to fulfill this write.
2127  */
2128 static int
2129 receive_write_byref(struct receive_writer_arg *rwa,
2130     struct drr_write_byref *drrwbr)
2131 {
2132         dmu_tx_t *tx;
2133         int err;
2134         guid_map_entry_t gmesrch;
2135         guid_map_entry_t *gmep;
2136         avl_index_t where;
2137         objset_t *ref_os = NULL;
2138         dmu_buf_t *dbp;
2139 
2140         if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
2141                 return (SET_ERROR(EINVAL));
2142 
2143         /*
2144          * If the GUID of the referenced dataset is different from the
2145          * GUID of the target dataset, find the referenced dataset.
2146          */
2147         if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
2148                 gmesrch.guid = drrwbr->drr_refguid;
2149                 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
2150                     &where)) == NULL) {
2151                         return (SET_ERROR(EINVAL));
2152                 }
2153                 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
2154                         return (SET_ERROR(EINVAL));
2155         } else {
2156                 ref_os = rwa->os;
2157         }
2158 
2159         err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
2160             drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
2161         if (err != 0)
2162                 return (err);
2163 
2164         tx = dmu_tx_create(rwa->os);
2165 
2166         dmu_tx_hold_write(tx, drrwbr->drr_object,
2167             drrwbr->drr_offset, drrwbr->drr_length);
2168         err = dmu_tx_assign(tx, TXG_WAIT);
2169         if (err != 0) {
2170                 dmu_tx_abort(tx);
2171                 return (err);
2172         }
2173         dmu_write(rwa->os, drrwbr->drr_object,
2174             drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
2175         dmu_buf_rele(dbp, FTAG);
2176 
2177         /* See comment in restore_write. */
2178         save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
2179         dmu_tx_commit(tx);
2180         return (0);
2181 }
2182 
2183 static int
2184 receive_write_embedded(struct receive_writer_arg *rwa,
2185     struct drr_write_embedded *drrwe, void *data)
2186 {
2187         dmu_tx_t *tx;
2188         int err;
2189 
2190         if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
2191                 return (EINVAL);
2192 
2193         if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
2194                 return (EINVAL);
2195 
2196         if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
2197                 return (EINVAL);
2198         if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
2199                 return (EINVAL);
2200 
2201         tx = dmu_tx_create(rwa->os);
2202 
2203         dmu_tx_hold_write(tx, drrwe->drr_object,
2204             drrwe->drr_offset, drrwe->drr_length);
2205         err = dmu_tx_assign(tx, TXG_WAIT);
2206         if (err != 0) {
2207                 dmu_tx_abort(tx);
2208                 return (err);
2209         }
2210 
2211         dmu_write_embedded(rwa->os, drrwe->drr_object,
2212             drrwe->drr_offset, data, drrwe->drr_etype,
2213             drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
2214             rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
2215 
2216         /* See comment in restore_write. */
2217         save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
2218         dmu_tx_commit(tx);
2219         return (0);
2220 }
2221 
2222 static int
2223 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
2224     void *data)
2225 {
2226         dmu_tx_t *tx;
2227         dmu_buf_t *db, *db_spill;
2228         int err;
2229 
2230         if (drrs->drr_length < SPA_MINBLOCKSIZE ||
2231             drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
2232                 return (SET_ERROR(EINVAL));
2233 
2234         if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
2235                 return (SET_ERROR(EINVAL));
2236 
2237         VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
2238         if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
2239                 dmu_buf_rele(db, FTAG);
2240                 return (err);
2241         }
2242 
2243         tx = dmu_tx_create(rwa->os);
2244 
2245         dmu_tx_hold_spill(tx, db->db_object);
2246 
2247         err = dmu_tx_assign(tx, TXG_WAIT);
2248         if (err != 0) {
2249                 dmu_buf_rele(db, FTAG);
2250                 dmu_buf_rele(db_spill, FTAG);
2251                 dmu_tx_abort(tx);
2252                 return (err);
2253         }
2254         dmu_buf_will_dirty(db_spill, tx);
2255 
2256         if (db_spill->db_size < drrs->drr_length)
2257                 VERIFY(0 == dbuf_spill_set_blksz(db_spill,
2258                     drrs->drr_length, tx));
2259         bcopy(data, db_spill->db_data, drrs->drr_length);
2260 
2261         dmu_buf_rele(db, FTAG);
2262         dmu_buf_rele(db_spill, FTAG);
2263 
2264         dmu_tx_commit(tx);
2265         return (0);
2266 }
2267 
2268 /* ARGSUSED */
2269 static int
2270 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
2271 {
2272         int err;
2273 
2274         if (drrf->drr_length != -1ULL &&
2275             drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
2276                 return (SET_ERROR(EINVAL));
2277 
2278         if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
2279                 return (SET_ERROR(EINVAL));
2280 
2281         err = dmu_free_long_range(rwa->os, drrf->drr_object,
2282             drrf->drr_offset, drrf->drr_length);
2283 
2284         return (err);
2285 }
2286 
2287 /* used to destroy the drc_ds on error */
2288 static void
2289 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
2290 {
2291         if (drc->drc_resumable) {
2292                 /* wait for our resume state to be written to disk */
2293                 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
2294                 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2295         } else {
2296                 char name[MAXNAMELEN];
2297                 dsl_dataset_name(drc->drc_ds, name);
2298                 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2299                 (void) dsl_destroy_head(name);
2300         }
2301 }
2302 
2303 static void
2304 receive_cksum(struct receive_arg *ra, int len, void *buf)
2305 {
2306         if (ra->byteswap) {
2307                 fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2308         } else {
2309                 fletcher_4_incremental_native(buf, len, &ra->cksum);
2310         }
2311 }
2312 
2313 /*
2314  * Read the payload into a buffer of size len, and update the current record's
2315  * payload field.
2316  * Allocate ra->next_rrd and read the next record's header into
2317  * ra->next_rrd->header.
2318  * Verify checksum of payload and next record.
2319  */
2320 static int
2321 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2322 {
2323         int err;
2324 
2325         if (len != 0) {
2326                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2327                 err = receive_read(ra, len, buf);
2328                 if (err != 0)
2329                         return (err);
2330                 receive_cksum(ra, len, buf);
2331 
2332                 /* note: rrd is NULL when reading the begin record's payload */
2333                 if (ra->rrd != NULL) {
2334                         ra->rrd->payload = buf;
2335                         ra->rrd->payload_size = len;
2336                         ra->rrd->bytes_read = ra->bytes_read;
2337                 }
2338         }
2339 
2340         ra->prev_cksum = ra->cksum;
2341 
2342         ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2343         err = receive_read(ra, sizeof (ra->next_rrd->header),
2344             &ra->next_rrd->header);
2345         ra->next_rrd->bytes_read = ra->bytes_read;
2346         if (err != 0) {
2347                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2348                 ra->next_rrd = NULL;
2349                 return (err);
2350         }
2351         if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2352                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2353                 ra->next_rrd = NULL;
2354                 return (SET_ERROR(EINVAL));
2355         }
2356 
2357         /*
2358          * Note: checksum is of everything up to but not including the
2359          * checksum itself.
2360          */
2361         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2362             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2363         receive_cksum(ra,
2364             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2365             &ra->next_rrd->header);
2366 
2367         zio_cksum_t cksum_orig =
2368             ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2369         zio_cksum_t *cksump =
2370             &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2371 
2372         if (ra->byteswap)
2373                 byteswap_record(&ra->next_rrd->header);
2374 
2375         if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2376             !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2377                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2378                 ra->next_rrd = NULL;
2379                 return (SET_ERROR(ECKSUM));
2380         }
2381 
2382         receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2383 
2384         return (0);
2385 }
2386 
2387 static void
2388 objlist_create(struct objlist *list)
2389 {
2390         list_create(&list->list, sizeof (struct receive_objnode),
2391             offsetof(struct receive_objnode, node));
2392         list->last_lookup = 0;
2393 }
2394 
2395 static void
2396 objlist_destroy(struct objlist *list)
2397 {
2398         for (struct receive_objnode *n = list_remove_head(&list->list);
2399             n != NULL; n = list_remove_head(&list->list)) {
2400                 kmem_free(n, sizeof (*n));
2401         }
2402         list_destroy(&list->list);
2403 }
2404 
2405 /*
2406  * This function looks through the objlist to see if the specified object number
2407  * is contained in the objlist.  In the process, it will remove all object
2408  * numbers in the list that are smaller than the specified object number.  Thus,
2409  * any lookup of an object number smaller than a previously looked up object
2410  * number will always return false; therefore, all lookups should be done in
2411  * ascending order.
2412  */
2413 static boolean_t
2414 objlist_exists(struct objlist *list, uint64_t object)
2415 {
2416         struct receive_objnode *node = list_head(&list->list);
2417         ASSERT3U(object, >=, list->last_lookup);
2418         list->last_lookup = object;
2419         while (node != NULL && node->object < object) {
2420                 VERIFY3P(node, ==, list_remove_head(&list->list));
2421                 kmem_free(node, sizeof (*node));
2422                 node = list_head(&list->list);
2423         }
2424         return (node != NULL && node->object == object);
2425 }
2426 
2427 /*
2428  * The objlist is a list of object numbers stored in ascending order.  However,
2429  * the insertion of new object numbers does not seek out the correct location to
2430  * store a new object number; instead, it appends it to the list for simplicity.
2431  * Thus, any users must take care to only insert new object numbers in ascending
2432  * order.
2433  */
2434 static void
2435 objlist_insert(struct objlist *list, uint64_t object)
2436 {
2437         struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
2438         node->object = object;
2439 #ifdef ZFS_DEBUG
2440         struct receive_objnode *last_object = list_tail(&list->list);
2441         uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
2442         ASSERT3U(node->object, >, last_objnum);
2443 #endif
2444         list_insert_tail(&list->list, node);
2445 }
2446 
2447 /*
2448  * Issue the prefetch reads for any necessary indirect blocks.
2449  *
2450  * We use the object ignore list to tell us whether or not to issue prefetches
2451  * for a given object.  We do this for both correctness (in case the blocksize
2452  * of an object has changed) and performance (if the object doesn't exist, don't
2453  * needlessly try to issue prefetches).  We also trim the list as we go through
2454  * the stream to prevent it from growing to an unbounded size.
2455  *
2456  * The object numbers within will always be in sorted order, and any write
2457  * records we see will also be in sorted order, but they're not sorted with
2458  * respect to each other (i.e. we can get several object records before
2459  * receiving each object's write records).  As a result, once we've reached a
2460  * given object number, we can safely remove any reference to lower object
2461  * numbers in the ignore list. In practice, we receive up to 32 object records
2462  * before receiving write records, so the list can have up to 32 nodes in it.
2463  */
2464 /* ARGSUSED */
2465 static void
2466 receive_read_prefetch(struct receive_arg *ra,
2467     uint64_t object, uint64_t offset, uint64_t length)
2468 {
2469         if (!objlist_exists(&ra->ignore_objlist, object)) {
2470                 dmu_prefetch(ra->os, object, 1, offset, length,
2471                     ZIO_PRIORITY_SYNC_READ);
2472         }
2473 }
2474 
2475 /*
2476  * Read records off the stream, issuing any necessary prefetches.
2477  */
2478 static int
2479 receive_read_record(struct receive_arg *ra)
2480 {
2481         int err;
2482 
2483         switch (ra->rrd->header.drr_type) {
2484         case DRR_OBJECT:
2485         {
2486                 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
2487                 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
2488                 void *buf = kmem_zalloc(size, KM_SLEEP);
2489                 dmu_object_info_t doi;
2490                 err = receive_read_payload_and_next_header(ra, size, buf);
2491                 if (err != 0) {
2492                         kmem_free(buf, size);
2493                         return (err);
2494                 }
2495                 err = dmu_object_info(ra->os, drro->drr_object, &doi);
2496                 /*
2497                  * See receive_read_prefetch for an explanation why we're
2498                  * storing this object in the ignore_obj_list.
2499                  */
2500                 if (err == ENOENT ||
2501                     (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2502                         objlist_insert(&ra->ignore_objlist, drro->drr_object);
2503                         err = 0;
2504                 }
2505                 return (err);
2506         }
2507         case DRR_FREEOBJECTS:
2508         {
2509                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2510                 return (err);
2511         }
2512         case DRR_WRITE:
2513         {
2514                 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
2515                 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2516                     drrw->drr_length);
2517 
2518                 err = receive_read_payload_and_next_header(ra,
2519                     drrw->drr_length, abuf->b_data);
2520                 if (err != 0) {
2521                         dmu_return_arcbuf(abuf);
2522                         return (err);
2523                 }
2524                 ra->rrd->write_buf = abuf;
2525                 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2526                     drrw->drr_length);
2527                 return (err);
2528         }
2529         case DRR_WRITE_BYREF:
2530         {
2531                 struct drr_write_byref *drrwb =
2532                     &ra->rrd->header.drr_u.drr_write_byref;
2533                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2534                 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2535                     drrwb->drr_length);
2536                 return (err);
2537         }
2538         case DRR_WRITE_EMBEDDED:
2539         {
2540                 struct drr_write_embedded *drrwe =
2541                     &ra->rrd->header.drr_u.drr_write_embedded;
2542                 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2543                 void *buf = kmem_zalloc(size, KM_SLEEP);
2544 
2545                 err = receive_read_payload_and_next_header(ra, size, buf);
2546                 if (err != 0) {
2547                         kmem_free(buf, size);
2548                         return (err);
2549                 }
2550 
2551                 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2552                     drrwe->drr_length);
2553                 return (err);
2554         }
2555         case DRR_FREE:
2556         {
2557                 /*
2558                  * It might be beneficial to prefetch indirect blocks here, but
2559                  * we don't really have the data to decide for sure.
2560                  */
2561                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2562                 return (err);
2563         }
2564         case DRR_END:
2565         {
2566                 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2567                 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2568                         return (SET_ERROR(ECKSUM));
2569                 return (0);
2570         }
2571         case DRR_SPILL:
2572         {
2573                 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2574                 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2575                 err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2576                     buf);
2577                 if (err != 0)
2578                         kmem_free(buf, drrs->drr_length);
2579                 return (err);
2580         }
2581         default:
2582                 return (SET_ERROR(EINVAL));
2583         }
2584 }
2585 
2586 /*
2587  * Commit the records to the pool.
2588  */
2589 static int
2590 receive_process_record(struct receive_writer_arg *rwa,
2591     struct receive_record_arg *rrd)
2592 {
2593         int err;
2594 
2595         /* Processing in order, therefore bytes_read should be increasing. */
2596         ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2597         rwa->bytes_read = rrd->bytes_read;
2598 
2599         switch (rrd->header.drr_type) {
2600         case DRR_OBJECT:
2601         {
2602                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2603                 err = receive_object(rwa, drro, rrd->payload);
2604                 kmem_free(rrd->payload, rrd->payload_size);
2605                 rrd->payload = NULL;
2606                 return (err);
2607         }
2608         case DRR_FREEOBJECTS:
2609         {
2610                 struct drr_freeobjects *drrfo =
2611                     &rrd->header.drr_u.drr_freeobjects;
2612                 return (receive_freeobjects(rwa, drrfo));
2613         }
2614         case DRR_WRITE:
2615         {
2616                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2617                 err = receive_write(rwa, drrw, rrd->write_buf);
2618                 /* if receive_write() is successful, it consumes the arc_buf */
2619                 if (err != 0)
2620                         dmu_return_arcbuf(rrd->write_buf);
2621                 rrd->write_buf = NULL;
2622                 rrd->payload = NULL;
2623                 return (err);
2624         }
2625         case DRR_WRITE_BYREF:
2626         {
2627                 struct drr_write_byref *drrwbr =
2628                     &rrd->header.drr_u.drr_write_byref;
2629                 return (receive_write_byref(rwa, drrwbr));
2630         }
2631         case DRR_WRITE_EMBEDDED:
2632         {
2633                 struct drr_write_embedded *drrwe =
2634                     &rrd->header.drr_u.drr_write_embedded;
2635                 err = receive_write_embedded(rwa, drrwe, rrd->payload);
2636                 kmem_free(rrd->payload, rrd->payload_size);
2637                 rrd->payload = NULL;
2638                 return (err);
2639         }
2640         case DRR_FREE:
2641         {
2642                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2643                 return (receive_free(rwa, drrf));
2644         }
2645         case DRR_SPILL:
2646         {
2647                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2648                 err = receive_spill(rwa, drrs, rrd->payload);
2649                 kmem_free(rrd->payload, rrd->payload_size);
2650                 rrd->payload = NULL;
2651                 return (err);
2652         }
2653         default:
2654                 return (SET_ERROR(EINVAL));
2655         }
2656 }
2657 
2658 /*
2659  * dmu_recv_stream's worker thread; pull records off the queue, and then call
2660  * receive_process_record  When we're done, signal the main thread and exit.
2661  */
2662 static void
2663 receive_writer_thread(void *arg)
2664 {
2665         struct receive_writer_arg *rwa = arg;
2666         struct receive_record_arg *rrd;
2667         for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2668             rrd = bqueue_dequeue(&rwa->q)) {
2669                 /*
2670                  * If there's an error, the main thread will stop putting things
2671                  * on the queue, but we need to clear everything in it before we
2672                  * can exit.
2673                  */
2674                 if (rwa->err == 0) {
2675                         rwa->err = receive_process_record(rwa, rrd);
2676                 } else if (rrd->write_buf != NULL) {
2677                         dmu_return_arcbuf(rrd->write_buf);
2678                         rrd->write_buf = NULL;
2679                         rrd->payload = NULL;
2680                 } else if (rrd->payload != NULL) {
2681                         kmem_free(rrd->payload, rrd->payload_size);
2682                         rrd->payload = NULL;
2683                 }
2684                 kmem_free(rrd, sizeof (*rrd));
2685         }
2686         kmem_free(rrd, sizeof (*rrd));
2687         mutex_enter(&rwa->mutex);
2688         rwa->done = B_TRUE;
2689         cv_signal(&rwa->cv);
2690         mutex_exit(&rwa->mutex);
2691 }
2692 
2693 static int
2694 resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2695 {
2696         uint64_t val;
2697         objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2698         uint64_t dsobj = dmu_objset_id(ra->os);
2699         uint64_t resume_obj, resume_off;
2700 
2701         if (nvlist_lookup_uint64(begin_nvl,
2702             "resume_object", &resume_obj) != 0 ||
2703             nvlist_lookup_uint64(begin_nvl,
2704             "resume_offset", &resume_off) != 0) {
2705                 return (SET_ERROR(EINVAL));
2706         }
2707         VERIFY0(zap_lookup(mos, dsobj,
2708             DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2709         if (resume_obj != val)
2710                 return (SET_ERROR(EINVAL));
2711         VERIFY0(zap_lookup(mos, dsobj,
2712             DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2713         if (resume_off != val)
2714                 return (SET_ERROR(EINVAL));
2715 
2716         return (0);
2717 }
2718 
2719 /*
2720  * Read in the stream's records, one by one, and apply them to the pool.  There
2721  * are two threads involved; the thread that calls this function will spin up a
2722  * worker thread, read the records off the stream one by one, and issue
2723  * prefetches for any necessary indirect blocks.  It will then push the records
2724  * onto an internal blocking queue.  The worker thread will pull the records off
2725  * the queue, and actually write the data into the DMU.  This way, the worker
2726  * thread doesn't have to wait for reads to complete, since everything it needs
2727  * (the indirect blocks) will be prefetched.
2728  *
2729  * NB: callers *must* call dmu_recv_end() if this succeeds.
2730  */
2731 int
2732 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2733     int cleanup_fd, uint64_t *action_handlep)
2734 {
2735         int err = 0;
2736         struct receive_arg ra = { 0 };
2737         struct receive_writer_arg rwa = { 0 };
2738         int featureflags;
2739         nvlist_t *begin_nvl = NULL;
2740 
2741         ra.byteswap = drc->drc_byteswap;
2742         ra.cksum = drc->drc_cksum;
2743         ra.vp = vp;
2744         ra.voff = *voffp;
2745 
2746         if (dsl_dataset_is_zapified(drc->drc_ds)) {
2747                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2748                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2749                     sizeof (ra.bytes_read), 1, &ra.bytes_read);
2750         }
2751 
2752         objlist_create(&ra.ignore_objlist);
2753 
2754         /* these were verified in dmu_recv_begin */
2755         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2756             DMU_SUBSTREAM);
2757         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2758 
2759         /*
2760          * Open the objset we are modifying.
2761          */
2762         VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
2763 
2764         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2765 
2766         featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2767 
2768         /* if this stream is dedup'ed, set up the avl tree for guid mapping */
2769         if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2770                 minor_t minor;
2771 
2772                 if (cleanup_fd == -1) {
2773                         ra.err = SET_ERROR(EBADF);
2774                         goto out;
2775                 }
2776                 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2777                 if (ra.err != 0) {
2778                         cleanup_fd = -1;
2779                         goto out;
2780                 }
2781 
2782                 if (*action_handlep == 0) {
2783                         rwa.guid_to_ds_map =
2784                             kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2785                         avl_create(rwa.guid_to_ds_map, guid_compare,
2786                             sizeof (guid_map_entry_t),
2787                             offsetof(guid_map_entry_t, avlnode));
2788                         err = zfs_onexit_add_cb(minor,
2789                             free_guid_map_onexit, rwa.guid_to_ds_map,
2790                             action_handlep);
2791                         if (ra.err != 0)
2792                                 goto out;
2793                 } else {
2794                         err = zfs_onexit_cb_data(minor, *action_handlep,
2795                             (void **)&rwa.guid_to_ds_map);
2796                         if (ra.err != 0)
2797                                 goto out;
2798                 }
2799 
2800                 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map;
2801         }
2802 
2803         uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
2804         void *payload = NULL;
2805         if (payloadlen != 0)
2806                 payload = kmem_alloc(payloadlen, KM_SLEEP);
2807 
2808         err = receive_read_payload_and_next_header(&ra, payloadlen, payload);
2809         if (err != 0) {
2810                 if (payloadlen != 0)
2811                         kmem_free(payload, payloadlen);
2812                 goto out;
2813         }
2814         if (payloadlen != 0) {
2815                 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2816                 kmem_free(payload, payloadlen);
2817                 if (err != 0)
2818                         goto out;
2819         }
2820 
2821         if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2822                 err = resume_check(&ra, begin_nvl);
2823                 if (err != 0)
2824                         goto out;
2825         }
2826 
2827         (void) bqueue_init(&rwa.q, zfs_recv_queue_length,
2828             offsetof(struct receive_record_arg, node));
2829         cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL);
2830         mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL);
2831         rwa.os = ra.os;
2832         rwa.byteswap = drc->drc_byteswap;
2833         rwa.resumable = drc->drc_resumable;
2834 
2835         (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, curproc,
2836             TS_RUN, minclsyspri);
2837         /*
2838          * We're reading rwa.err without locks, which is safe since we are the
2839          * only reader, and the worker thread is the only writer.  It's ok if we
2840          * miss a write for an iteration or two of the loop, since the writer
2841          * thread will keep freeing records we send it until we send it an eos
2842          * marker.
2843          *
2844          * We can leave this loop in 3 ways:  First, if rwa.err is
2845          * non-zero.  In that case, the writer thread will free the rrd we just
2846          * pushed.  Second, if  we're interrupted; in that case, either it's the
2847          * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
2848          * has been handed off to the writer thread who will free it.  Finally,
2849          * if receive_read_record fails or we're at the end of the stream, then
2850          * we free ra.rrd and exit.
2851          */
2852         while (rwa.err == 0) {
2853                 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2854                         err = SET_ERROR(EINTR);
2855                         break;
2856                 }
2857 
2858                 ASSERT3P(ra.rrd, ==, NULL);
2859                 ra.rrd = ra.next_rrd;
2860                 ra.next_rrd = NULL;
2861                 /* Allocates and loads header into ra.next_rrd */
2862                 err = receive_read_record(&ra);
2863 
2864                 if (ra.rrd->header.drr_type == DRR_END || err != 0) {
2865                         kmem_free(ra.rrd, sizeof (*ra.rrd));
2866                         ra.rrd = NULL;
2867                         break;
2868                 }
2869 
2870                 bqueue_enqueue(&rwa.q, ra.rrd,
2871                     sizeof (struct receive_record_arg) + ra.rrd->payload_size);
2872                 ra.rrd = NULL;
2873         }
2874         if (ra.next_rrd == NULL)
2875                 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP);
2876         ra.next_rrd->eos_marker = B_TRUE;
2877         bqueue_enqueue(&rwa.q, ra.next_rrd, 1);
2878 
2879         mutex_enter(&rwa.mutex);
2880         while (!rwa.done) {
2881                 cv_wait(&rwa.cv, &rwa.mutex);
2882         }
2883         mutex_exit(&rwa.mutex);
2884 
2885         cv_destroy(&rwa.cv);
2886         mutex_destroy(&rwa.mutex);
2887         bqueue_destroy(&rwa.q);
2888         if (err == 0)
2889                 err = rwa.err;
2890 
2891 out:
2892         nvlist_free(begin_nvl);
2893         if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2894                 zfs_onexit_fd_rele(cleanup_fd);
2895 
2896         if (err != 0) {
2897                 /*
2898                  * Clean up references. If receive is not resumable,
2899                  * destroy what we created, so we don't leave it in
2900                  * the inconsistent state.
2901                  */
2902                 dmu_recv_cleanup_ds(drc);
2903         }
2904 
2905         *voffp = ra.voff;
2906         objlist_destroy(&ra.ignore_objlist);
2907         return (err);
2908 }
2909 
2910 static int
2911 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2912 {
2913         dmu_recv_cookie_t *drc = arg;
2914         dsl_pool_t *dp = dmu_tx_pool(tx);
2915         int error;
2916 
2917         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2918 
2919         if (!drc->drc_newfs) {
2920                 dsl_dataset_t *origin_head;
2921 
2922                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2923                 if (error != 0)
2924                         return (error);
2925                 if (drc->drc_force) {
2926                         /*
2927                          * We will destroy any snapshots in tofs (i.e. before
2928                          * origin_head) that are after the origin (which is
2929                          * the snap before drc_ds, because drc_ds can not
2930                          * have any snaps of its own).
2931                          */
2932                         uint64_t obj;
2933 
2934                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2935                         while (obj !=
2936                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2937                                 dsl_dataset_t *snap;
2938                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,
2939                                     &snap);
2940                                 if (error != 0)
2941                                         break;
2942                                 if (snap->ds_dir != origin_head->ds_dir)
2943                                         error = SET_ERROR(EINVAL);
2944                                 if (error == 0)  {
2945                                         error = dsl_destroy_snapshot_check_impl(
2946                                             snap, B_FALSE);
2947                                 }
2948                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2949                                 dsl_dataset_rele(snap, FTAG);
2950                                 if (error != 0)
2951                                         break;
2952                         }
2953                         if (error != 0) {
2954                                 dsl_dataset_rele(origin_head, FTAG);
2955                                 return (error);
2956                         }
2957                 }
2958                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
2959                     origin_head, drc->drc_force, drc->drc_owner, tx);
2960                 if (error != 0) {
2961                         dsl_dataset_rele(origin_head, FTAG);
2962                         return (error);
2963                 }
2964                 error = dsl_dataset_snapshot_check_impl(origin_head,
2965                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2966                 dsl_dataset_rele(origin_head, FTAG);
2967                 if (error != 0)
2968                         return (error);
2969 
2970                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
2971         } else {
2972                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
2973                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2974         }
2975         return (error);
2976 }
2977 
2978 static void
2979 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
2980 {
2981         dmu_recv_cookie_t *drc = arg;
2982         dsl_pool_t *dp = dmu_tx_pool(tx);
2983 
2984         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
2985             tx, "snap=%s", drc->drc_tosnap);
2986 
2987         if (!drc->drc_newfs) {
2988                 dsl_dataset_t *origin_head;
2989 
2990                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
2991                     &origin_head));
2992 
2993                 if (drc->drc_force) {
2994                         /*
2995                          * Destroy any snapshots of drc_tofs (origin_head)
2996                          * after the origin (the snap before drc_ds).
2997                          */
2998                         uint64_t obj;
2999 
3000                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3001                         while (obj !=
3002                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3003                                 dsl_dataset_t *snap;
3004                                 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
3005                                     &snap));
3006                                 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
3007                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3008                                 dsl_destroy_snapshot_sync_impl(snap,
3009                                     B_FALSE, tx);
3010                                 dsl_dataset_rele(snap, FTAG);
3011                         }
3012                 }
3013                 VERIFY3P(drc->drc_ds->ds_prev, ==,
3014                     origin_head->ds_prev);
3015 
3016                 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
3017                     origin_head, tx);
3018                 dsl_dataset_snapshot_sync_impl(origin_head,
3019                     drc->drc_tosnap, tx);
3020 
3021                 /* set snapshot's creation time and guid */
3022                 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
3023                 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
3024                     drc->drc_drrb->drr_creation_time;
3025                 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
3026                     drc->drc_drrb->drr_toguid;
3027                 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
3028                     ~DS_FLAG_INCONSISTENT;
3029 
3030                 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
3031                 dsl_dataset_phys(origin_head)->ds_flags &=
3032                     ~DS_FLAG_INCONSISTENT;
3033 
3034                 dsl_dataset_rele(origin_head, FTAG);
3035                 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
3036 
3037                 if (drc->drc_owner != NULL)
3038                         VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
3039         } else {
3040                 dsl_dataset_t *ds = drc->drc_ds;
3041 
3042                 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
3043 
3044                 /* set snapshot's creation time and guid */
3045                 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
3046                 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
3047                     drc->drc_drrb->drr_creation_time;
3048                 dsl_dataset_phys(ds->ds_prev)->ds_guid =
3049                     drc->drc_drrb->drr_toguid;
3050                 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
3051                     ~DS_FLAG_INCONSISTENT;
3052 
3053                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3054                 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
3055                 if (dsl_dataset_has_resume_receive_state(ds)) {
3056                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3057                             DS_FIELD_RESUME_FROMGUID, tx);
3058                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3059                             DS_FIELD_RESUME_OBJECT, tx);
3060                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3061                             DS_FIELD_RESUME_OFFSET, tx);
3062                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3063                             DS_FIELD_RESUME_BYTES, tx);
3064                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3065                             DS_FIELD_RESUME_TOGUID, tx);
3066                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3067                             DS_FIELD_RESUME_TONAME, tx);
3068                 }
3069         }
3070         drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
3071         /*
3072          * Release the hold from dmu_recv_begin.  This must be done before
3073          * we return to open context, so that when we free the dataset's dnode,
3074          * we can evict its bonus buffer.
3075          */
3076         dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
3077         drc->drc_ds = NULL;
3078 }
3079 
3080 static int
3081 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
3082 {
3083         dsl_pool_t *dp;
3084         dsl_dataset_t *snapds;
3085         guid_map_entry_t *gmep;
3086         int err;
3087 
3088         ASSERT(guid_map != NULL);
3089 
3090         err = dsl_pool_hold(name, FTAG, &dp);
3091         if (err != 0)
3092                 return (err);
3093         gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
3094         err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
3095         if (err == 0) {
3096                 gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
3097                 gmep->gme_ds = snapds;
3098                 avl_add(guid_map, gmep);
3099                 dsl_dataset_long_hold(snapds, gmep);
3100         } else {
3101                 kmem_free(gmep, sizeof (*gmep));
3102         }
3103 
3104         dsl_pool_rele(dp, FTAG);
3105         return (err);
3106 }
3107 
3108 static int dmu_recv_end_modified_blocks = 3;
3109 
3110 static int
3111 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
3112 {
3113         int error;
3114         char name[MAXNAMELEN];
3115 
3116 #ifdef _KERNEL
3117         /*
3118          * We will be destroying the ds; make sure its origin is unmounted if
3119          * necessary.
3120          */
3121         dsl_dataset_name(drc->drc_ds, name);
3122         zfs_destroy_unmount_origin(name);
3123 #endif
3124 
3125         error = dsl_sync_task(drc->drc_tofs,
3126             dmu_recv_end_check, dmu_recv_end_sync, drc,
3127             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3128 
3129         if (error != 0)
3130                 dmu_recv_cleanup_ds(drc);
3131         return (error);
3132 }
3133 
3134 static int
3135 dmu_recv_new_end(dmu_recv_cookie_t *drc)
3136 {
3137         int error;
3138 
3139         error = dsl_sync_task(drc->drc_tofs,
3140             dmu_recv_end_check, dmu_recv_end_sync, drc,
3141             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3142 
3143         if (error != 0) {
3144                 dmu_recv_cleanup_ds(drc);
3145         } else if (drc->drc_guid_to_ds_map != NULL) {
3146                 (void) add_ds_to_guidmap(drc->drc_tofs,
3147                     drc->drc_guid_to_ds_map,
3148                     drc->drc_newsnapobj);
3149         }
3150         return (error);
3151 }
3152 
3153 int
3154 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
3155 {
3156         drc->drc_owner = owner;
3157 
3158         if (drc->drc_newfs)
3159                 return (dmu_recv_new_end(drc));
3160         else
3161                 return (dmu_recv_existing_end(drc));
3162 }
3163 
3164 /*
3165  * Return TRUE if this objset is currently being received into.
3166  */
3167 boolean_t
3168 dmu_objset_is_receiving(objset_t *os)
3169 {
3170         return (os->os_dsl_dataset != NULL &&
3171             os->os_dsl_dataset->ds_owner == dmu_recv_tag);
3172 }