1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  25  */
  26 
  27 #include <sys/dmu_objset.h>
  28 #include <sys/dsl_dataset.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_prop.h>
  31 #include <sys/dsl_synctask.h>
  32 #include <sys/dmu_traverse.h>
  33 #include <sys/dmu_impl.h>
  34 #include <sys/dmu_tx.h>
  35 #include <sys/arc.h>
  36 #include <sys/zio.h>
  37 #include <sys/zap.h>
  38 #include <sys/zfeature.h>
  39 #include <sys/unique.h>
  40 #include <sys/zfs_context.h>
  41 #include <sys/zfs_ioctl.h>
  42 #include <sys/spa.h>
  43 #include <sys/zfs_znode.h>
  44 #include <sys/zfs_onexit.h>
  45 #include <sys/zvol.h>
  46 #include <sys/dsl_scan.h>
  47 #include <sys/dsl_deadlist.h>
  48 
  49 static char *dsl_reaper = "the grim reaper";
  50 
  51 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
  52 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
  53 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
  54 
  55 #define SWITCH64(x, y) \
  56         { \
  57                 uint64_t __tmp = (x); \
  58                 (x) = (y); \
  59                 (y) = __tmp; \
  60         }
  61 
  62 #define DS_REF_MAX      (1ULL << 62)
  63 
  64 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
  65 
  66 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
  67 
  68 
  69 /*
  70  * Figure out how much of this delta should be propogated to the dsl_dir
  71  * layer.  If there's a refreservation, that space has already been
  72  * partially accounted for in our ancestors.
  73  */
  74 static int64_t
  75 parent_delta(dsl_dataset_t *ds, int64_t delta)
  76 {
  77         uint64_t old_bytes, new_bytes;
  78 
  79         if (ds->ds_reserved == 0)
  80                 return (delta);
  81 
  82         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
  83         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
  84 
  85         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
  86         return (new_bytes - old_bytes);
  87 }
  88 
  89 void
  90 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
  91 {
  92         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
  93         int compressed = BP_GET_PSIZE(bp);
  94         int uncompressed = BP_GET_UCSIZE(bp);
  95         int64_t delta;
  96 
  97         dprintf_bp(bp, "ds=%p", ds);
  98 
  99         ASSERT(dmu_tx_is_syncing(tx));
 100         /* It could have been compressed away to nothing */
 101         if (BP_IS_HOLE(bp))
 102                 return;
 103         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
 104         ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
 105         if (ds == NULL) {
 106                 dsl_pool_mos_diduse_space(tx->tx_pool,
 107                     used, compressed, uncompressed);
 108                 return;
 109         }
 110         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 111 
 112         mutex_enter(&ds->ds_dir->dd_lock);
 113         mutex_enter(&ds->ds_lock);
 114         delta = parent_delta(ds, used);
 115         ds->ds_phys->ds_referenced_bytes += used;
 116         ds->ds_phys->ds_compressed_bytes += compressed;
 117         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
 118         ds->ds_phys->ds_unique_bytes += used;
 119         mutex_exit(&ds->ds_lock);
 120         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
 121             compressed, uncompressed, tx);
 122         dsl_dir_transfer_space(ds->ds_dir, used - delta,
 123             DD_USED_REFRSRV, DD_USED_HEAD, tx);
 124         mutex_exit(&ds->ds_dir->dd_lock);
 125 }
 126 
 127 int
 128 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
 129     boolean_t async)
 130 {
 131         if (BP_IS_HOLE(bp))
 132                 return (0);
 133 
 134         ASSERT(dmu_tx_is_syncing(tx));
 135         ASSERT(bp->blk_birth <= tx->tx_txg);
 136 
 137         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
 138         int compressed = BP_GET_PSIZE(bp);
 139         int uncompressed = BP_GET_UCSIZE(bp);
 140 
 141         ASSERT(used > 0);
 142         if (ds == NULL) {
 143                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 144                 dsl_pool_mos_diduse_space(tx->tx_pool,
 145                     -used, -compressed, -uncompressed);
 146                 return (used);
 147         }
 148         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
 149 
 150         ASSERT(!dsl_dataset_is_snapshot(ds));
 151         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 152 
 153         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
 154                 int64_t delta;
 155 
 156                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
 157                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 158 
 159                 mutex_enter(&ds->ds_dir->dd_lock);
 160                 mutex_enter(&ds->ds_lock);
 161                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
 162                     !DS_UNIQUE_IS_ACCURATE(ds));
 163                 delta = parent_delta(ds, -used);
 164                 ds->ds_phys->ds_unique_bytes -= used;
 165                 mutex_exit(&ds->ds_lock);
 166                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
 167                     delta, -compressed, -uncompressed, tx);
 168                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
 169                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
 170                 mutex_exit(&ds->ds_dir->dd_lock);
 171         } else {
 172                 dprintf_bp(bp, "putting on dead list: %s", "");
 173                 if (async) {
 174                         /*
 175                          * We are here as part of zio's write done callback,
 176                          * which means we're a zio interrupt thread.  We can't
 177                          * call dsl_deadlist_insert() now because it may block
 178                          * waiting for I/O.  Instead, put bp on the deferred
 179                          * queue and let dsl_pool_sync() finish the job.
 180                          */
 181                         bplist_append(&ds->ds_pending_deadlist, bp);
 182                 } else {
 183                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
 184                 }
 185                 ASSERT3U(ds->ds_prev->ds_object, ==,
 186                     ds->ds_phys->ds_prev_snap_obj);
 187                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
 188                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
 189                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
 190                     ds->ds_object && bp->blk_birth >
 191                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
 192                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
 193                         mutex_enter(&ds->ds_prev->ds_lock);
 194                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
 195                         mutex_exit(&ds->ds_prev->ds_lock);
 196                 }
 197                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
 198                         dsl_dir_transfer_space(ds->ds_dir, used,
 199                             DD_USED_HEAD, DD_USED_SNAP, tx);
 200                 }
 201         }
 202         mutex_enter(&ds->ds_lock);
 203         ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
 204         ds->ds_phys->ds_referenced_bytes -= used;
 205         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
 206         ds->ds_phys->ds_compressed_bytes -= compressed;
 207         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
 208         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
 209         mutex_exit(&ds->ds_lock);
 210 
 211         return (used);
 212 }
 213 
 214 uint64_t
 215 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
 216 {
 217         uint64_t trysnap = 0;
 218 
 219         if (ds == NULL)
 220                 return (0);
 221         /*
 222          * The snapshot creation could fail, but that would cause an
 223          * incorrect FALSE return, which would only result in an
 224          * overestimation of the amount of space that an operation would
 225          * consume, which is OK.
 226          *
 227          * There's also a small window where we could miss a pending
 228          * snapshot, because we could set the sync task in the quiescing
 229          * phase.  So this should only be used as a guess.
 230          */
 231         if (ds->ds_trysnap_txg >
 232             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
 233                 trysnap = ds->ds_trysnap_txg;
 234         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
 235 }
 236 
 237 boolean_t
 238 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
 239     uint64_t blk_birth)
 240 {
 241         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
 242                 return (B_FALSE);
 243 
 244         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
 245 
 246         return (B_TRUE);
 247 }
 248 
 249 /* ARGSUSED */
 250 static void
 251 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
 252 {
 253         dsl_dataset_t *ds = dsv;
 254 
 255         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 256 
 257         unique_remove(ds->ds_fsid_guid);
 258 
 259         if (ds->ds_objset != NULL)
 260                 dmu_objset_evict(ds->ds_objset);
 261 
 262         if (ds->ds_prev) {
 263                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 264                 ds->ds_prev = NULL;
 265         }
 266 
 267         bplist_destroy(&ds->ds_pending_deadlist);
 268         if (db != NULL) {
 269                 dsl_deadlist_close(&ds->ds_deadlist);
 270         } else {
 271                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
 272                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
 273         }
 274         if (ds->ds_dir)
 275                 dsl_dir_close(ds->ds_dir, ds);
 276 
 277         ASSERT(!list_link_active(&ds->ds_synced_link));
 278 
 279         mutex_destroy(&ds->ds_lock);
 280         mutex_destroy(&ds->ds_recvlock);
 281         mutex_destroy(&ds->ds_opening_lock);
 282         rw_destroy(&ds->ds_rwlock);
 283         cv_destroy(&ds->ds_exclusive_cv);
 284 
 285         kmem_free(ds, sizeof (dsl_dataset_t));
 286 }
 287 
 288 static int
 289 dsl_dataset_get_snapname(dsl_dataset_t *ds)
 290 {
 291         dsl_dataset_phys_t *headphys;
 292         int err;
 293         dmu_buf_t *headdbuf;
 294         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 295         objset_t *mos = dp->dp_meta_objset;
 296 
 297         if (ds->ds_snapname[0])
 298                 return (0);
 299         if (ds->ds_phys->ds_next_snap_obj == 0)
 300                 return (0);
 301 
 302         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
 303             FTAG, &headdbuf);
 304         if (err)
 305                 return (err);
 306         headphys = headdbuf->db_data;
 307         err = zap_value_search(dp->dp_meta_objset,
 308             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
 309         dmu_buf_rele(headdbuf, FTAG);
 310         return (err);
 311 }
 312 
 313 static int
 314 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 315 {
 316         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 317         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 318         matchtype_t mt;
 319         int err;
 320 
 321         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 322                 mt = MT_FIRST;
 323         else
 324                 mt = MT_EXACT;
 325 
 326         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 327             value, mt, NULL, 0, NULL);
 328         if (err == ENOTSUP && mt == MT_FIRST)
 329                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
 330         return (err);
 331 }
 332 
 333 static int
 334 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
 335 {
 336         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 337         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 338         matchtype_t mt;
 339         int err;
 340 
 341         dsl_dir_snap_cmtime_update(ds->ds_dir);
 342 
 343         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 344                 mt = MT_FIRST;
 345         else
 346                 mt = MT_EXACT;
 347 
 348         err = zap_remove_norm(mos, snapobj, name, mt, tx);
 349         if (err == ENOTSUP && mt == MT_FIRST)
 350                 err = zap_remove(mos, snapobj, name, tx);
 351 
 352         if (err == 0)
 353                 dsl_snapcount_adjust(ds->ds_dir, tx, -1, B_TRUE);
 354 
 355         return (err);
 356 }
 357 
 358 static int
 359 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 360     dsl_dataset_t **dsp)
 361 {
 362         objset_t *mos = dp->dp_meta_objset;
 363         dmu_buf_t *dbuf;
 364         dsl_dataset_t *ds;
 365         int err;
 366         dmu_object_info_t doi;
 367 
 368         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
 369             dsl_pool_sync_context(dp));
 370 
 371         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
 372         if (err)
 373                 return (err);
 374 
 375         /* Make sure dsobj has the correct object type. */
 376         dmu_object_info_from_db(dbuf, &doi);
 377         if (doi.doi_type != DMU_OT_DSL_DATASET)
 378                 return (EINVAL);
 379 
 380         ds = dmu_buf_get_user(dbuf);
 381         if (ds == NULL) {
 382                 dsl_dataset_t *winner;
 383 
 384                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
 385                 ds->ds_dbuf = dbuf;
 386                 ds->ds_object = dsobj;
 387                 ds->ds_phys = dbuf->db_data;
 388 
 389                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
 390                 mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
 391                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 392                 mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
 393 
 394                 rw_init(&ds->ds_rwlock, 0, 0, 0);
 395                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 396 
 397                 bplist_create(&ds->ds_pending_deadlist);
 398                 dsl_deadlist_open(&ds->ds_deadlist,
 399                     mos, ds->ds_phys->ds_deadlist_obj);
 400 
 401                 list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
 402                     offsetof(dmu_sendarg_t, dsa_link));
 403 
 404                 if (err == 0) {
 405                         err = dsl_dir_open_obj(dp,
 406                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
 407                 }
 408                 if (err) {
 409                         mutex_destroy(&ds->ds_lock);
 410                         mutex_destroy(&ds->ds_recvlock);
 411                         mutex_destroy(&ds->ds_opening_lock);
 412                         rw_destroy(&ds->ds_rwlock);
 413                         cv_destroy(&ds->ds_exclusive_cv);
 414                         bplist_destroy(&ds->ds_pending_deadlist);
 415                         dsl_deadlist_close(&ds->ds_deadlist);
 416                         kmem_free(ds, sizeof (dsl_dataset_t));
 417                         dmu_buf_rele(dbuf, tag);
 418                         return (err);
 419                 }
 420 
 421                 if (!dsl_dataset_is_snapshot(ds)) {
 422                         ds->ds_snapname[0] = '\0';
 423                         if (ds->ds_phys->ds_prev_snap_obj) {
 424                                 err = dsl_dataset_get_ref(dp,
 425                                     ds->ds_phys->ds_prev_snap_obj,
 426                                     ds, &ds->ds_prev);
 427                         }
 428                 } else {
 429                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
 430                                 err = dsl_dataset_get_snapname(ds);
 431                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
 432                                 err = zap_count(
 433                                     ds->ds_dir->dd_pool->dp_meta_objset,
 434                                     ds->ds_phys->ds_userrefs_obj,
 435                                     &ds->ds_userrefs);
 436                         }
 437                 }
 438 
 439                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
 440                         /*
 441                          * In sync context, we're called with either no lock
 442                          * or with the write lock.  If we're not syncing,
 443                          * we're always called with the read lock held.
 444                          */
 445                         boolean_t need_lock =
 446                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
 447                             dsl_pool_sync_context(dp);
 448 
 449                         if (need_lock)
 450                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 451 
 452                         err = dsl_prop_get_ds(ds,
 453                             "refreservation", sizeof (uint64_t), 1,
 454                             &ds->ds_reserved, NULL);
 455                         if (err == 0) {
 456                                 err = dsl_prop_get_ds(ds,
 457                                     "refquota", sizeof (uint64_t), 1,
 458                                     &ds->ds_quota, NULL);
 459                         }
 460 
 461                         if (need_lock)
 462                                 rw_exit(&dp->dp_config_rwlock);
 463                 } else {
 464                         ds->ds_reserved = ds->ds_quota = 0;
 465                 }
 466 
 467                 if (err == 0) {
 468                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
 469                             dsl_dataset_evict);
 470                 }
 471                 if (err || winner) {
 472                         bplist_destroy(&ds->ds_pending_deadlist);
 473                         dsl_deadlist_close(&ds->ds_deadlist);
 474                         if (ds->ds_prev)
 475                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 476                         dsl_dir_close(ds->ds_dir, ds);
 477                         mutex_destroy(&ds->ds_lock);
 478                         mutex_destroy(&ds->ds_recvlock);
 479                         mutex_destroy(&ds->ds_opening_lock);
 480                         rw_destroy(&ds->ds_rwlock);
 481                         cv_destroy(&ds->ds_exclusive_cv);
 482                         kmem_free(ds, sizeof (dsl_dataset_t));
 483                         if (err) {
 484                                 dmu_buf_rele(dbuf, tag);
 485                                 return (err);
 486                         }
 487                         ds = winner;
 488                 } else {
 489                         ds->ds_fsid_guid =
 490                             unique_insert(ds->ds_phys->ds_fsid_guid);
 491                 }
 492         }
 493         ASSERT3P(ds->ds_dbuf, ==, dbuf);
 494         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
 495         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
 496             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
 497             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
 498         mutex_enter(&ds->ds_lock);
 499         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 500                 mutex_exit(&ds->ds_lock);
 501                 dmu_buf_rele(ds->ds_dbuf, tag);
 502                 return (ENOENT);
 503         }
 504         mutex_exit(&ds->ds_lock);
 505         *dsp = ds;
 506         return (0);
 507 }
 508 
 509 static int
 510 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
 511 {
 512         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 513 
 514         /*
 515          * In syncing context we don't want the rwlock lock: there
 516          * may be an existing writer waiting for sync phase to
 517          * finish.  We don't need to worry about such writers, since
 518          * sync phase is single-threaded, so the writer can't be
 519          * doing anything while we are active.
 520          */
 521         if (dsl_pool_sync_context(dp)) {
 522                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 523                 return (0);
 524         }
 525 
 526         /*
 527          * Normal users will hold the ds_rwlock as a READER until they
 528          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
 529          * drop their READER lock after they set the ds_owner field.
 530          *
 531          * If the dataset is being destroyed, the destroy thread will
 532          * obtain a WRITER lock for exclusive access after it's done its
 533          * open-context work and then change the ds_owner to
 534          * dsl_reaper once destruction is assured.  So threads
 535          * may block here temporarily, until the "destructability" of
 536          * the dataset is determined.
 537          */
 538         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
 539         mutex_enter(&ds->ds_lock);
 540         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
 541                 rw_exit(&dp->dp_config_rwlock);
 542                 cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
 543                 if (DSL_DATASET_IS_DESTROYED(ds)) {
 544                         mutex_exit(&ds->ds_lock);
 545                         dsl_dataset_drop_ref(ds, tag);
 546                         rw_enter(&dp->dp_config_rwlock, RW_READER);
 547                         return (ENOENT);
 548                 }
 549                 /*
 550                  * The dp_config_rwlock lives above the ds_lock. And
 551                  * we need to check DSL_DATASET_IS_DESTROYED() while
 552                  * holding the ds_lock, so we have to drop and reacquire
 553                  * the ds_lock here.
 554                  */
 555                 mutex_exit(&ds->ds_lock);
 556                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 557                 mutex_enter(&ds->ds_lock);
 558         }
 559         mutex_exit(&ds->ds_lock);
 560         return (0);
 561 }
 562 
 563 int
 564 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 565     dsl_dataset_t **dsp)
 566 {
 567         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
 568 
 569         if (err)
 570                 return (err);
 571         return (dsl_dataset_hold_ref(*dsp, tag));
 572 }
 573 
 574 int
 575 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
 576     void *tag, dsl_dataset_t **dsp)
 577 {
 578         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
 579         if (err)
 580                 return (err);
 581         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 582                 dsl_dataset_rele(*dsp, tag);
 583                 *dsp = NULL;
 584                 return (EBUSY);
 585         }
 586         return (0);
 587 }
 588 
 589 int
 590 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 591 {
 592         dsl_dir_t *dd;
 593         dsl_pool_t *dp;
 594         const char *snapname;
 595         uint64_t obj;
 596         int err = 0;
 597 
 598         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 599         if (err)
 600                 return (err);
 601 
 602         dp = dd->dd_pool;
 603         obj = dd->dd_phys->dd_head_dataset_obj;
 604         rw_enter(&dp->dp_config_rwlock, RW_READER);
 605         if (obj)
 606                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
 607         else
 608                 err = ENOENT;
 609         if (err)
 610                 goto out;
 611 
 612         err = dsl_dataset_hold_ref(*dsp, tag);
 613 
 614         /* we may be looking for a snapshot */
 615         if (err == 0 && snapname != NULL) {
 616                 dsl_dataset_t *ds = NULL;
 617 
 618                 if (*snapname++ != '@') {
 619                         dsl_dataset_rele(*dsp, tag);
 620                         err = ENOENT;
 621                         goto out;
 622                 }
 623 
 624                 dprintf("looking for snapshot '%s'\n", snapname);
 625                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
 626                 if (err == 0)
 627                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
 628                 dsl_dataset_rele(*dsp, tag);
 629 
 630                 ASSERT3U((err == 0), ==, (ds != NULL));
 631 
 632                 if (ds) {
 633                         mutex_enter(&ds->ds_lock);
 634                         if (ds->ds_snapname[0] == 0)
 635                                 (void) strlcpy(ds->ds_snapname, snapname,
 636                                     sizeof (ds->ds_snapname));
 637                         mutex_exit(&ds->ds_lock);
 638                         err = dsl_dataset_hold_ref(ds, tag);
 639                         *dsp = err ? NULL : ds;
 640                 }
 641         }
 642 out:
 643         rw_exit(&dp->dp_config_rwlock);
 644         dsl_dir_close(dd, FTAG);
 645         return (err);
 646 }
 647 
 648 int
 649 dsl_dataset_own(const char *name, boolean_t inconsistentok,
 650     void *tag, dsl_dataset_t **dsp)
 651 {
 652         int err = dsl_dataset_hold(name, tag, dsp);
 653         if (err)
 654                 return (err);
 655         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 656                 dsl_dataset_rele(*dsp, tag);
 657                 return (EBUSY);
 658         }
 659         return (0);
 660 }
 661 
 662 void
 663 dsl_dataset_name(dsl_dataset_t *ds, char *name)
 664 {
 665         if (ds == NULL) {
 666                 (void) strcpy(name, "mos");
 667         } else {
 668                 dsl_dir_name(ds->ds_dir, name);
 669                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 670                 if (ds->ds_snapname[0]) {
 671                         (void) strcat(name, "@");
 672                         /*
 673                          * We use a "recursive" mutex so that we
 674                          * can call dprintf_ds() with ds_lock held.
 675                          */
 676                         if (!MUTEX_HELD(&ds->ds_lock)) {
 677                                 mutex_enter(&ds->ds_lock);
 678                                 (void) strcat(name, ds->ds_snapname);
 679                                 mutex_exit(&ds->ds_lock);
 680                         } else {
 681                                 (void) strcat(name, ds->ds_snapname);
 682                         }
 683                 }
 684         }
 685 }
 686 
 687 static int
 688 dsl_dataset_namelen(dsl_dataset_t *ds)
 689 {
 690         int result;
 691 
 692         if (ds == NULL) {
 693                 result = 3;     /* "mos" */
 694         } else {
 695                 result = dsl_dir_namelen(ds->ds_dir);
 696                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 697                 if (ds->ds_snapname[0]) {
 698                         ++result;       /* adding one for the @-sign */
 699                         if (!MUTEX_HELD(&ds->ds_lock)) {
 700                                 mutex_enter(&ds->ds_lock);
 701                                 result += strlen(ds->ds_snapname);
 702                                 mutex_exit(&ds->ds_lock);
 703                         } else {
 704                                 result += strlen(ds->ds_snapname);
 705                         }
 706                 }
 707         }
 708 
 709         return (result);
 710 }
 711 
 712 void
 713 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 714 {
 715         dmu_buf_rele(ds->ds_dbuf, tag);
 716 }
 717 
 718 void
 719 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 720 {
 721         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
 722                 rw_exit(&ds->ds_rwlock);
 723         }
 724         dsl_dataset_drop_ref(ds, tag);
 725 }
 726 
 727 void
 728 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
 729 {
 730         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
 731             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
 732 
 733         mutex_enter(&ds->ds_lock);
 734         ds->ds_owner = NULL;
 735         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
 736                 rw_exit(&ds->ds_rwlock);
 737                 cv_broadcast(&ds->ds_exclusive_cv);
 738         }
 739         mutex_exit(&ds->ds_lock);
 740         if (ds->ds_dbuf)
 741                 dsl_dataset_drop_ref(ds, tag);
 742         else
 743                 dsl_dataset_evict(NULL, ds);
 744 }
 745 
 746 boolean_t
 747 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
 748 {
 749         boolean_t gotit = FALSE;
 750 
 751         mutex_enter(&ds->ds_lock);
 752         if (ds->ds_owner == NULL &&
 753             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
 754                 ds->ds_owner = tag;
 755                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
 756                         rw_exit(&ds->ds_rwlock);
 757                 gotit = TRUE;
 758         }
 759         mutex_exit(&ds->ds_lock);
 760         return (gotit);
 761 }
 762 
 763 void
 764 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
 765 {
 766         ASSERT3P(owner, ==, ds->ds_owner);
 767         if (!RW_WRITE_HELD(&ds->ds_rwlock))
 768                 rw_enter(&ds->ds_rwlock, RW_WRITER);
 769 }
 770 
 771 uint64_t
 772 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
 773     uint64_t flags, dmu_tx_t *tx)
 774 {
 775         dsl_pool_t *dp = dd->dd_pool;
 776         dmu_buf_t *dbuf;
 777         dsl_dataset_phys_t *dsphys;
 778         uint64_t dsobj;
 779         objset_t *mos = dp->dp_meta_objset;
 780 
 781         if (origin == NULL)
 782                 origin = dp->dp_origin_snap;
 783 
 784         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
 785         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
 786         ASSERT(dmu_tx_is_syncing(tx));
 787         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 788 
 789         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
 790             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
 791         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 792         dmu_buf_will_dirty(dbuf, tx);
 793         dsphys = dbuf->db_data;
 794         bzero(dsphys, sizeof (dsl_dataset_phys_t));
 795         dsphys->ds_dir_obj = dd->dd_object;
 796         dsphys->ds_flags = flags;
 797         dsphys->ds_fsid_guid = unique_create();
 798         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
 799             sizeof (dsphys->ds_guid));
 800         dsphys->ds_snapnames_zapobj =
 801             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
 802             DMU_OT_NONE, 0, tx);
 803         dsphys->ds_creation_time = gethrestime_sec();
 804         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
 805 
 806         if (origin == NULL) {
 807                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
 808         } else {
 809                 dsl_dataset_t *ohds;
 810 
 811                 dsphys->ds_prev_snap_obj = origin->ds_object;
 812                 dsphys->ds_prev_snap_txg =
 813                     origin->ds_phys->ds_creation_txg;
 814                 dsphys->ds_referenced_bytes =
 815                     origin->ds_phys->ds_referenced_bytes;
 816                 dsphys->ds_compressed_bytes =
 817                     origin->ds_phys->ds_compressed_bytes;
 818                 dsphys->ds_uncompressed_bytes =
 819                     origin->ds_phys->ds_uncompressed_bytes;
 820                 dsphys->ds_bp = origin->ds_phys->ds_bp;
 821                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
 822 
 823                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
 824                 origin->ds_phys->ds_num_children++;
 825 
 826                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 827                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
 828                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
 829                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
 830                 dsl_dataset_rele(ohds, FTAG);
 831 
 832                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
 833                         if (origin->ds_phys->ds_next_clones_obj == 0) {
 834                                 origin->ds_phys->ds_next_clones_obj =
 835                                     zap_create(mos,
 836                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 837                         }
 838                         VERIFY(0 == zap_add_int(mos,
 839                             origin->ds_phys->ds_next_clones_obj,
 840                             dsobj, tx));
 841                 }
 842 
 843                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
 844                 dd->dd_phys->dd_origin_obj = origin->ds_object;
 845                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
 846                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
 847                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 848                                 origin->ds_dir->dd_phys->dd_clones =
 849                                     zap_create(mos,
 850                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 851                         }
 852                         VERIFY3U(0, ==, zap_add_int(mos,
 853                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 854                 }
 855         }
 856 
 857         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
 858                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
 859 
 860         dmu_buf_rele(dbuf, FTAG);
 861 
 862         dmu_buf_will_dirty(dd->dd_dbuf, tx);
 863         dd->dd_phys->dd_head_dataset_obj = dsobj;
 864 
 865         return (dsobj);
 866 }
 867 
 868 uint64_t
 869 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
 870     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
 871 {
 872         dsl_pool_t *dp = pdd->dd_pool;
 873         uint64_t dsobj, ddobj;
 874         dsl_dir_t *dd;
 875 
 876         ASSERT(lastname[0] != '@');
 877 
 878         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
 879         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
 880 
 881         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
 882 
 883         dsl_deleg_set_create_perms(dd, tx, cr);
 884 
 885         dsl_dir_close(dd, FTAG);
 886 
 887         /*
 888          * If we are creating a clone, make sure we zero out any stale
 889          * data from the origin snapshots zil header.
 890          */
 891         if (origin != NULL) {
 892                 dsl_dataset_t *ds;
 893                 objset_t *os;
 894 
 895                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 896                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
 897                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
 898                 dsl_dataset_dirty(ds, tx);
 899                 dsl_dataset_rele(ds, FTAG);
 900         }
 901 
 902         return (dsobj);
 903 }
 904 
 905 /*
 906  * The snapshots must all be in the same pool.
 907  */
 908 int
 909 dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer,
 910     nvlist_t *errlist)
 911 {
 912         int err;
 913         dsl_sync_task_t *dst;
 914         spa_t *spa;
 915         nvpair_t *pair;
 916         dsl_sync_task_group_t *dstg;
 917 
 918         pair = nvlist_next_nvpair(snaps, NULL);
 919         if (pair == NULL)
 920                 return (0);
 921 
 922         err = spa_open(nvpair_name(pair), &spa, FTAG);
 923         if (err)
 924                 return (err);
 925         dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 926 
 927         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
 928             pair = nvlist_next_nvpair(snaps, pair)) {
 929                 dsl_dataset_t *ds;
 930 
 931                 err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
 932                 if (err == 0) {
 933                         struct dsl_ds_destroyarg *dsda;
 934 
 935                         dsl_dataset_make_exclusive(ds, dstg);
 936                         dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
 937                             KM_SLEEP);
 938                         dsda->ds = ds;
 939                         dsda->defer = defer;
 940                         dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 941                             dsl_dataset_destroy_sync, dsda, dstg, 0);
 942                 } else if (err == ENOENT) {
 943                         err = 0;
 944                 } else {
 945                         fnvlist_add_int32(errlist, nvpair_name(pair), err);
 946                         break;
 947                 }
 948         }
 949 
 950         if (err == 0)
 951                 err = dsl_sync_task_group_wait(dstg);
 952 
 953         for (dst = list_head(&dstg->dstg_tasks); dst;
 954             dst = list_next(&dstg->dstg_tasks, dst)) {
 955                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
 956                 dsl_dataset_t *ds = dsda->ds;
 957 
 958                 /*
 959                  * Return the snapshots that triggered the error.
 960                  */
 961                 if (dst->dst_err != 0) {
 962                         char name[ZFS_MAXNAMELEN];
 963                         dsl_dataset_name(ds, name);
 964                         fnvlist_add_int32(errlist, name, dst->dst_err);
 965                 }
 966                 ASSERT3P(dsda->rm_origin, ==, NULL);
 967                 dsl_dataset_disown(ds, dstg);
 968                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
 969         }
 970 
 971         dsl_sync_task_group_destroy(dstg);
 972         spa_close(spa, FTAG);
 973         return (err);
 974 
 975 }
 976 
 977 static boolean_t
 978 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
 979 {
 980         boolean_t might_destroy = B_FALSE;
 981 
 982         mutex_enter(&ds->ds_lock);
 983         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
 984             DS_IS_DEFER_DESTROY(ds))
 985                 might_destroy = B_TRUE;
 986         mutex_exit(&ds->ds_lock);
 987 
 988         return (might_destroy);
 989 }
 990 
 991 /*
 992  * If we're removing a clone, and these three conditions are true:
 993  *      1) the clone's origin has no other children
 994  *      2) the clone's origin has no user references
 995  *      3) the clone's origin has been marked for deferred destruction
 996  * Then, prepare to remove the origin as part of this sync task group.
 997  */
 998 static int
 999 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1000 {
1001         dsl_dataset_t *ds = dsda->ds;
1002         dsl_dataset_t *origin = ds->ds_prev;
1003 
1004         if (dsl_dataset_might_destroy_origin(origin)) {
1005                 char *name;
1006                 int namelen;
1007                 int error;
1008 
1009                 namelen = dsl_dataset_namelen(origin) + 1;
1010                 name = kmem_alloc(namelen, KM_SLEEP);
1011                 dsl_dataset_name(origin, name);
1012 #ifdef _KERNEL
1013                 error = zfs_unmount_snap(name, NULL);
1014                 if (error) {
1015                         kmem_free(name, namelen);
1016                         return (error);
1017                 }
1018 #endif
1019                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1020                 kmem_free(name, namelen);
1021                 if (error)
1022                         return (error);
1023                 dsda->rm_origin = origin;
1024                 dsl_dataset_make_exclusive(origin, tag);
1025         }
1026 
1027         return (0);
1028 }
1029 
1030 /*
1031  * ds must be opened as OWNER.  On return (whether successful or not),
1032  * ds will be closed and caller can no longer dereference it.
1033  */
1034 int
1035 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1036 {
1037         int err;
1038         dsl_sync_task_group_t *dstg;
1039         objset_t *os;
1040         dsl_dir_t *dd;
1041         uint64_t obj;
1042         struct dsl_ds_destroyarg dsda = { 0 };
1043 
1044         dsda.ds = ds;
1045 
1046         if (dsl_dataset_is_snapshot(ds)) {
1047                 /* Destroying a snapshot is simpler */
1048                 dsl_dataset_make_exclusive(ds, tag);
1049 
1050                 dsda.defer = defer;
1051                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1052                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1053                     &dsda, tag, 0);
1054                 ASSERT3P(dsda.rm_origin, ==, NULL);
1055                 goto out;
1056         } else if (defer) {
1057                 err = EINVAL;
1058                 goto out;
1059         }
1060 
1061         dd = ds->ds_dir;
1062 
1063         if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1064             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1065                 /*
1066                  * Check for errors and mark this ds as inconsistent, in
1067                  * case we crash while freeing the objects.
1068                  */
1069                 err = dsl_sync_task_do(dd->dd_pool,
1070                     dsl_dataset_destroy_begin_check,
1071                     dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1072                 if (err)
1073                         goto out;
1074 
1075                 err = dmu_objset_from_ds(ds, &os);
1076                 if (err)
1077                         goto out;
1078 
1079                 /*
1080                  * Remove all objects while in the open context so that
1081                  * there is less work to do in the syncing context.
1082                  */
1083                 for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1084                     ds->ds_phys->ds_prev_snap_txg)) {
1085                         /*
1086                          * Ignore errors, if there is not enough disk space
1087                          * we will deal with it in dsl_dataset_destroy_sync().
1088                          */
1089                         (void) dmu_free_object(os, obj);
1090                 }
1091                 if (err != ESRCH)
1092                         goto out;
1093 
1094                 /*
1095                  * Sync out all in-flight IO.
1096                  */
1097                 txg_wait_synced(dd->dd_pool, 0);
1098 
1099                 /*
1100                  * If we managed to free all the objects in open
1101                  * context, the user space accounting should be zero.
1102                  */
1103                 if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1104                     dmu_objset_userused_enabled(os)) {
1105                         uint64_t count;
1106 
1107                         ASSERT(zap_count(os, DMU_USERUSED_OBJECT,
1108                             &count) != 0 || count == 0);
1109                         ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT,
1110                             &count) != 0 || count == 0);
1111                 }
1112         }
1113 
1114         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1115         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1116         rw_exit(&dd->dd_pool->dp_config_rwlock);
1117 
1118         if (err)
1119                 goto out;
1120 
1121         /*
1122          * Blow away the dsl_dir + head dataset.
1123          */
1124         dsl_dataset_make_exclusive(ds, tag);
1125         /*
1126          * If we're removing a clone, we might also need to remove its
1127          * origin.
1128          */
1129         do {
1130                 dsda.need_prep = B_FALSE;
1131                 if (dsl_dir_is_clone(dd)) {
1132                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1133                         if (err) {
1134                                 dsl_dir_close(dd, FTAG);
1135                                 goto out;
1136                         }
1137                 }
1138 
1139                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1140                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1141                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1142                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1143                     dsl_dir_destroy_sync, dd, tag, 0);
1144                 err = dsl_sync_task_group_wait(dstg);
1145                 dsl_sync_task_group_destroy(dstg);
1146 
1147                 /*
1148                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1149                  * on the origin snap, in which case we can get EBUSY if we
1150                  * needed to destroy the origin snap but were not ready to
1151                  * do so.
1152                  */
1153                 if (dsda.need_prep) {
1154                         ASSERT(err == EBUSY);
1155                         ASSERT(dsl_dir_is_clone(dd));
1156                         ASSERT(dsda.rm_origin == NULL);
1157                 }
1158         } while (dsda.need_prep);
1159 
1160         if (dsda.rm_origin != NULL)
1161                 dsl_dataset_disown(dsda.rm_origin, tag);
1162 
1163         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1164         if (err)
1165                 dsl_dir_close(dd, FTAG);
1166 out:
1167         dsl_dataset_disown(ds, tag);
1168         return (err);
1169 }
1170 
1171 blkptr_t *
1172 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1173 {
1174         return (&ds->ds_phys->ds_bp);
1175 }
1176 
1177 void
1178 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1179 {
1180         ASSERT(dmu_tx_is_syncing(tx));
1181         /* If it's the meta-objset, set dp_meta_rootbp */
1182         if (ds == NULL) {
1183                 tx->tx_pool->dp_meta_rootbp = *bp;
1184         } else {
1185                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1186                 ds->ds_phys->ds_bp = *bp;
1187         }
1188 }
1189 
1190 spa_t *
1191 dsl_dataset_get_spa(dsl_dataset_t *ds)
1192 {
1193         return (ds->ds_dir->dd_pool->dp_spa);
1194 }
1195 
1196 void
1197 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1198 {
1199         dsl_pool_t *dp;
1200 
1201         if (ds == NULL) /* this is the meta-objset */
1202                 return;
1203 
1204         ASSERT(ds->ds_objset != NULL);
1205 
1206         if (ds->ds_phys->ds_next_snap_obj != 0)
1207                 panic("dirtying snapshot!");
1208 
1209         dp = ds->ds_dir->dd_pool;
1210 
1211         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1212                 /* up the hold count until we can be written out */
1213                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1214         }
1215 }
1216 
1217 boolean_t
1218 dsl_dataset_is_dirty(dsl_dataset_t *ds)
1219 {
1220         for (int t = 0; t < TXG_SIZE; t++) {
1221                 if (txg_list_member(&ds->ds_dir->dd_pool->dp_dirty_datasets,
1222                     ds, t))
1223                         return (B_TRUE);
1224         }
1225         return (B_FALSE);
1226 }
1227 
1228 /*
1229  * The unique space in the head dataset can be calculated by subtracting
1230  * the space used in the most recent snapshot, that is still being used
1231  * in this file system, from the space currently in use.  To figure out
1232  * the space in the most recent snapshot still in use, we need to take
1233  * the total space used in the snapshot and subtract out the space that
1234  * has been freed up since the snapshot was taken.
1235  */
1236 static void
1237 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1238 {
1239         uint64_t mrs_used;
1240         uint64_t dlused, dlcomp, dluncomp;
1241 
1242         ASSERT(!dsl_dataset_is_snapshot(ds));
1243 
1244         if (ds->ds_phys->ds_prev_snap_obj != 0)
1245                 mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1246         else
1247                 mrs_used = 0;
1248 
1249         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1250 
1251         ASSERT3U(dlused, <=, mrs_used);
1252         ds->ds_phys->ds_unique_bytes =
1253             ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1254 
1255         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1256             SPA_VERSION_UNIQUE_ACCURATE)
1257                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1258 }
1259 
1260 struct killarg {
1261         dsl_dataset_t *ds;
1262         dmu_tx_t *tx;
1263 };
1264 
1265 /* ARGSUSED */
1266 static int
1267 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1268     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1269 {
1270         struct killarg *ka = arg;
1271         dmu_tx_t *tx = ka->tx;
1272 
1273         if (bp == NULL)
1274                 return (0);
1275 
1276         if (zb->zb_level == ZB_ZIL_LEVEL) {
1277                 ASSERT(zilog != NULL);
1278                 /*
1279                  * It's a block in the intent log.  It has no
1280                  * accounting, so just free it.
1281                  */
1282                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1283         } else {
1284                 ASSERT(zilog == NULL);
1285                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1286                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1287         }
1288 
1289         return (0);
1290 }
1291 
1292 /* ARGSUSED */
1293 static int
1294 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1295 {
1296         dsl_dataset_t *ds = arg1;
1297         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1298         uint64_t count;
1299         int err;
1300 
1301         /*
1302          * Can't delete a head dataset if there are snapshots of it.
1303          * (Except if the only snapshots are from the branch we cloned
1304          * from.)
1305          */
1306         if (ds->ds_prev != NULL &&
1307             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1308                 return (EBUSY);
1309 
1310         /*
1311          * This is really a dsl_dir thing, but check it here so that
1312          * we'll be less likely to leave this dataset inconsistent &
1313          * nearly destroyed.
1314          */
1315         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1316         if (err)
1317                 return (err);
1318         if (count != 0)
1319                 return (EEXIST);
1320 
1321         return (0);
1322 }
1323 
1324 /* ARGSUSED */
1325 static void
1326 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1327 {
1328         dsl_dataset_t *ds = arg1;
1329 
1330         /* Mark it as inconsistent on-disk, in case we crash */
1331         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1332         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1333 
1334         spa_history_log_internal_ds(ds, "destroy begin", tx, "");
1335 }
1336 
1337 static int
1338 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1339     dmu_tx_t *tx)
1340 {
1341         dsl_dataset_t *ds = dsda->ds;
1342         dsl_dataset_t *ds_prev = ds->ds_prev;
1343 
1344         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1345                 struct dsl_ds_destroyarg ndsda = {0};
1346 
1347                 /*
1348                  * If we're not prepared to remove the origin, don't remove
1349                  * the clone either.
1350                  */
1351                 if (dsda->rm_origin == NULL) {
1352                         dsda->need_prep = B_TRUE;
1353                         return (EBUSY);
1354                 }
1355 
1356                 ndsda.ds = ds_prev;
1357                 ndsda.is_origin_rm = B_TRUE;
1358                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1359         }
1360 
1361         /*
1362          * If we're not going to remove the origin after all,
1363          * undo the open context setup.
1364          */
1365         if (dsda->rm_origin != NULL) {
1366                 dsl_dataset_disown(dsda->rm_origin, tag);
1367                 dsda->rm_origin = NULL;
1368         }
1369 
1370         return (0);
1371 }
1372 
1373 /*
1374  * If you add new checks here, you may need to add
1375  * additional checks to the "temporary" case in
1376  * snapshot_check() in dmu_objset.c.
1377  */
1378 /* ARGSUSED */
1379 int
1380 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1381 {
1382         struct dsl_ds_destroyarg *dsda = arg1;
1383         dsl_dataset_t *ds = dsda->ds;
1384 
1385         /* we have an owner hold, so noone else can destroy us */
1386         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1387 
1388         /*
1389          * Only allow deferred destroy on pools that support it.
1390          * NOTE: deferred destroy is only supported on snapshots.
1391          */
1392         if (dsda->defer) {
1393                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1394                     SPA_VERSION_USERREFS)
1395                         return (ENOTSUP);
1396                 ASSERT(dsl_dataset_is_snapshot(ds));
1397                 return (0);
1398         }
1399 
1400         /*
1401          * Can't delete a head dataset if there are snapshots of it.
1402          * (Except if the only snapshots are from the branch we cloned
1403          * from.)
1404          */
1405         if (ds->ds_prev != NULL &&
1406             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1407                 return (EBUSY);
1408 
1409         /*
1410          * If we made changes this txg, traverse_dsl_dataset won't find
1411          * them.  Try again.
1412          */
1413         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1414                 return (EAGAIN);
1415 
1416         if (dsl_dataset_is_snapshot(ds)) {
1417                 /*
1418                  * If this snapshot has an elevated user reference count,
1419                  * we can't destroy it yet.
1420                  */
1421                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1422                         return (EBUSY);
1423 
1424                 mutex_enter(&ds->ds_lock);
1425                 /*
1426                  * Can't delete a branch point. However, if we're destroying
1427                  * a clone and removing its origin due to it having a user
1428                  * hold count of 0 and having been marked for deferred destroy,
1429                  * it's OK for the origin to have a single clone.
1430                  */
1431                 if (ds->ds_phys->ds_num_children >
1432                     (dsda->is_origin_rm ? 2 : 1)) {
1433                         mutex_exit(&ds->ds_lock);
1434                         return (EEXIST);
1435                 }
1436                 mutex_exit(&ds->ds_lock);
1437         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1438                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1439         }
1440 
1441         /* XXX we should do some i/o error checking... */
1442         return (0);
1443 }
1444 
1445 struct refsarg {
1446         kmutex_t lock;
1447         boolean_t gone;
1448         kcondvar_t cv;
1449 };
1450 
1451 /* ARGSUSED */
1452 static void
1453 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1454 {
1455         struct refsarg *arg = argv;
1456 
1457         mutex_enter(&arg->lock);
1458         arg->gone = TRUE;
1459         cv_signal(&arg->cv);
1460         mutex_exit(&arg->lock);
1461 }
1462 
1463 static void
1464 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1465 {
1466         struct refsarg arg;
1467 
1468         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1469         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1470         arg.gone = FALSE;
1471         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1472             dsl_dataset_refs_gone);
1473         dmu_buf_rele(ds->ds_dbuf, tag);
1474         mutex_enter(&arg.lock);
1475         while (!arg.gone)
1476                 cv_wait(&arg.cv, &arg.lock);
1477         ASSERT(arg.gone);
1478         mutex_exit(&arg.lock);
1479         ds->ds_dbuf = NULL;
1480         ds->ds_phys = NULL;
1481         mutex_destroy(&arg.lock);
1482         cv_destroy(&arg.cv);
1483 }
1484 
1485 static void
1486 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1487 {
1488         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1489         uint64_t count;
1490         int err;
1491 
1492         ASSERT(ds->ds_phys->ds_num_children >= 2);
1493         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1494         /*
1495          * The err should not be ENOENT, but a bug in a previous version
1496          * of the code could cause upgrade_clones_cb() to not set
1497          * ds_next_snap_obj when it should, leading to a missing entry.
1498          * If we knew that the pool was created after
1499          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1500          * ENOENT.  However, at least we can check that we don't have
1501          * too many entries in the next_clones_obj even after failing to
1502          * remove this one.
1503          */
1504         if (err != ENOENT) {
1505                 VERIFY0(err);
1506         }
1507         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1508             &count));
1509         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1510 }
1511 
1512 static void
1513 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1514 {
1515         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1516         zap_cursor_t zc;
1517         zap_attribute_t za;
1518 
1519         /*
1520          * If it is the old version, dd_clones doesn't exist so we can't
1521          * find the clones, but deadlist_remove_key() is a no-op so it
1522          * doesn't matter.
1523          */
1524         if (ds->ds_dir->dd_phys->dd_clones == 0)
1525                 return;
1526 
1527         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1528             zap_cursor_retrieve(&zc, &za) == 0;
1529             zap_cursor_advance(&zc)) {
1530                 dsl_dataset_t *clone;
1531 
1532                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1533                     za.za_first_integer, FTAG, &clone));
1534                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1535                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1536                             mintxg, tx);
1537                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1538                 }
1539                 dsl_dataset_rele(clone, FTAG);
1540         }
1541         zap_cursor_fini(&zc);
1542 }
1543 
1544 struct process_old_arg {
1545         dsl_dataset_t *ds;
1546         dsl_dataset_t *ds_prev;
1547         boolean_t after_branch_point;
1548         zio_t *pio;
1549         uint64_t used, comp, uncomp;
1550 };
1551 
1552 static int
1553 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1554 {
1555         struct process_old_arg *poa = arg;
1556         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1557 
1558         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1559                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1560                 if (poa->ds_prev && !poa->after_branch_point &&
1561                     bp->blk_birth >
1562                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1563                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1564                             bp_get_dsize_sync(dp->dp_spa, bp);
1565                 }
1566         } else {
1567                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1568                 poa->comp += BP_GET_PSIZE(bp);
1569                 poa->uncomp += BP_GET_UCSIZE(bp);
1570                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1571         }
1572         return (0);
1573 }
1574 
1575 static void
1576 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1577     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1578 {
1579         struct process_old_arg poa = { 0 };
1580         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1581         objset_t *mos = dp->dp_meta_objset;
1582 
1583         ASSERT(ds->ds_deadlist.dl_oldfmt);
1584         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1585 
1586         poa.ds = ds;
1587         poa.ds_prev = ds_prev;
1588         poa.after_branch_point = after_branch_point;
1589         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1590         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1591             process_old_cb, &poa, tx));
1592         VERIFY0(zio_wait(poa.pio));
1593         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1594 
1595         /* change snapused */
1596         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1597             -poa.used, -poa.comp, -poa.uncomp, tx);
1598 
1599         /* swap next's deadlist to our deadlist */
1600         dsl_deadlist_close(&ds->ds_deadlist);
1601         dsl_deadlist_close(&ds_next->ds_deadlist);
1602         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1603             ds->ds_phys->ds_deadlist_obj);
1604         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1605         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1606             ds_next->ds_phys->ds_deadlist_obj);
1607 }
1608 
1609 static int
1610 old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1611 {
1612         int err;
1613         struct killarg ka;
1614 
1615         /*
1616          * Free everything that we point to (that's born after
1617          * the previous snapshot, if we are a clone)
1618          *
1619          * NB: this should be very quick, because we already
1620          * freed all the objects in open context.
1621          */
1622         ka.ds = ds;
1623         ka.tx = tx;
1624         err = traverse_dataset(ds,
1625             ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1626             kill_blkptr, &ka);
1627         ASSERT0(err);
1628         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1629 
1630         return (err);
1631 }
1632 
1633 void
1634 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1635 {
1636         struct dsl_ds_destroyarg *dsda = arg1;
1637         dsl_dataset_t *ds = dsda->ds;
1638         int err;
1639         int after_branch_point = FALSE;
1640         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1641         objset_t *mos = dp->dp_meta_objset;
1642         dsl_dataset_t *ds_prev = NULL;
1643         boolean_t wont_destroy;
1644         uint64_t obj;
1645 
1646         wont_destroy = (dsda->defer &&
1647             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1648 
1649         ASSERT(ds->ds_owner || wont_destroy);
1650         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1651         ASSERT(ds->ds_prev == NULL ||
1652             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1653         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1654 
1655         if (wont_destroy) {
1656                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1657                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1658                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1659                 spa_history_log_internal_ds(ds, "defer_destroy", tx, "");
1660                 return;
1661         }
1662 
1663         /* We need to log before removing it from the namespace. */
1664         spa_history_log_internal_ds(ds, "destroy", tx, "");
1665 
1666         /* signal any waiters that this dataset is going away */
1667         mutex_enter(&ds->ds_lock);
1668         ds->ds_owner = dsl_reaper;
1669         cv_broadcast(&ds->ds_exclusive_cv);
1670         mutex_exit(&ds->ds_lock);
1671 
1672         /* Remove our reservation */
1673         if (ds->ds_reserved != 0) {
1674                 dsl_prop_setarg_t psa;
1675                 uint64_t value = 0;
1676 
1677                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1678                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1679                     &value);
1680                 psa.psa_effective_value = 0;    /* predict default value */
1681 
1682                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1683                 ASSERT0(ds->ds_reserved);
1684         }
1685 
1686         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1687 
1688         dsl_scan_ds_destroyed(ds, tx);
1689 
1690         obj = ds->ds_object;
1691 
1692         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1693                 if (ds->ds_prev) {
1694                         ds_prev = ds->ds_prev;
1695                 } else {
1696                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1697                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1698                 }
1699                 after_branch_point =
1700                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1701 
1702                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1703                 if (after_branch_point &&
1704                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1705                         remove_from_next_clones(ds_prev, obj, tx);
1706                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1707                                 VERIFY(0 == zap_add_int(mos,
1708                                     ds_prev->ds_phys->ds_next_clones_obj,
1709                                     ds->ds_phys->ds_next_snap_obj, tx));
1710                         }
1711                 }
1712                 if (after_branch_point &&
1713                     ds->ds_phys->ds_next_snap_obj == 0) {
1714                         /* This clone is toast. */
1715                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1716                         ds_prev->ds_phys->ds_num_children--;
1717 
1718                         /*
1719                          * If the clone's origin has no other clones, no
1720                          * user holds, and has been marked for deferred
1721                          * deletion, then we should have done the necessary
1722                          * destroy setup for it.
1723                          */
1724                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1725                             ds_prev->ds_userrefs == 0 &&
1726                             DS_IS_DEFER_DESTROY(ds_prev)) {
1727                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1728                         } else {
1729                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1730                         }
1731                 } else if (!after_branch_point) {
1732                         ds_prev->ds_phys->ds_next_snap_obj =
1733                             ds->ds_phys->ds_next_snap_obj;
1734                 }
1735         }
1736 
1737         if (dsl_dataset_is_snapshot(ds)) {
1738                 dsl_dataset_t *ds_next;
1739                 uint64_t old_unique;
1740                 uint64_t used = 0, comp = 0, uncomp = 0;
1741 
1742                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1743                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1744                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1745 
1746                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1747 
1748                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1749                 ds_next->ds_phys->ds_prev_snap_obj =
1750                     ds->ds_phys->ds_prev_snap_obj;
1751                 ds_next->ds_phys->ds_prev_snap_txg =
1752                     ds->ds_phys->ds_prev_snap_txg;
1753                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1754                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1755 
1756 
1757                 if (ds_next->ds_deadlist.dl_oldfmt) {
1758                         process_old_deadlist(ds, ds_prev, ds_next,
1759                             after_branch_point, tx);
1760                 } else {
1761                         /* Adjust prev's unique space. */
1762                         if (ds_prev && !after_branch_point) {
1763                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1764                                     ds_prev->ds_phys->ds_prev_snap_txg,
1765                                     ds->ds_phys->ds_prev_snap_txg,
1766                                     &used, &comp, &uncomp);
1767                                 ds_prev->ds_phys->ds_unique_bytes += used;
1768                         }
1769 
1770                         /* Adjust snapused. */
1771                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1772                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1773                             &used, &comp, &uncomp);
1774                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1775                             -used, -comp, -uncomp, tx);
1776 
1777                         /* Move blocks to be freed to pool's free list. */
1778                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1779                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1780                             tx);
1781                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1782                             DD_USED_HEAD, used, comp, uncomp, tx);
1783 
1784                         /* Merge our deadlist into next's and free it. */
1785                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1786                             ds->ds_phys->ds_deadlist_obj, tx);
1787                 }
1788                 dsl_deadlist_close(&ds->ds_deadlist);
1789                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1790 
1791                 /* Collapse range in clone heads */
1792                 dsl_dataset_remove_clones_key(ds,
1793                     ds->ds_phys->ds_creation_txg, tx);
1794 
1795                 if (dsl_dataset_is_snapshot(ds_next)) {
1796                         dsl_dataset_t *ds_nextnext;
1797 
1798                         /*
1799                          * Update next's unique to include blocks which
1800                          * were previously shared by only this snapshot
1801                          * and it.  Those blocks will be born after the
1802                          * prev snap and before this snap, and will have
1803                          * died after the next snap and before the one
1804                          * after that (ie. be on the snap after next's
1805                          * deadlist).
1806                          */
1807                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1808                             ds_next->ds_phys->ds_next_snap_obj,
1809                             FTAG, &ds_nextnext));
1810                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1811                             ds->ds_phys->ds_prev_snap_txg,
1812                             ds->ds_phys->ds_creation_txg,
1813                             &used, &comp, &uncomp);
1814                         ds_next->ds_phys->ds_unique_bytes += used;
1815                         dsl_dataset_rele(ds_nextnext, FTAG);
1816                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1817 
1818                         /* Collapse range in this head. */
1819                         dsl_dataset_t *hds;
1820                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1821                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1822                             FTAG, &hds));
1823                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1824                             ds->ds_phys->ds_creation_txg, tx);
1825                         dsl_dataset_rele(hds, FTAG);
1826 
1827                 } else {
1828                         ASSERT3P(ds_next->ds_prev, ==, ds);
1829                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1830                         ds_next->ds_prev = NULL;
1831                         if (ds_prev) {
1832                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1833                                     ds->ds_phys->ds_prev_snap_obj,
1834                                     ds_next, &ds_next->ds_prev));
1835                         }
1836 
1837                         dsl_dataset_recalc_head_uniq(ds_next);
1838 
1839                         /*
1840                          * Reduce the amount of our unconsmed refreservation
1841                          * being charged to our parent by the amount of
1842                          * new unique data we have gained.
1843                          */
1844                         if (old_unique < ds_next->ds_reserved) {
1845                                 int64_t mrsdelta;
1846                                 uint64_t new_unique =
1847                                     ds_next->ds_phys->ds_unique_bytes;
1848 
1849                                 ASSERT(old_unique <= new_unique);
1850                                 mrsdelta = MIN(new_unique - old_unique,
1851                                     ds_next->ds_reserved - old_unique);
1852                                 dsl_dir_diduse_space(ds->ds_dir,
1853                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1854                         }
1855                 }
1856                 dsl_dataset_rele(ds_next, FTAG);
1857         } else {
1858                 zfeature_info_t *async_destroy =
1859                     &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1860                 objset_t *os;
1861 
1862                 /*
1863                  * There's no next snapshot, so this is a head dataset.
1864                  * Destroy the deadlist.  Unless it's a clone, the
1865                  * deadlist should be empty.  (If it's a clone, it's
1866                  * safe to ignore the deadlist contents.)
1867                  */
1868                 dsl_deadlist_close(&ds->ds_deadlist);
1869                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1870                 ds->ds_phys->ds_deadlist_obj = 0;
1871 
1872                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
1873 
1874                 if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1875                         err = old_synchronous_dataset_destroy(ds, tx);
1876                 } else {
1877                         /*
1878                          * Move the bptree into the pool's list of trees to
1879                          * clean up and update space accounting information.
1880                          */
1881                         uint64_t used, comp, uncomp;
1882 
1883                         zil_destroy_sync(dmu_objset_zil(os), tx);
1884 
1885                         if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1886                                 spa_feature_incr(dp->dp_spa, async_destroy, tx);
1887                                 dp->dp_bptree_obj = bptree_alloc(mos, tx);
1888                                 VERIFY(zap_add(mos,
1889                                     DMU_POOL_DIRECTORY_OBJECT,
1890                                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1891                                     &dp->dp_bptree_obj, tx) == 0);
1892                         }
1893 
1894                         used = ds->ds_dir->dd_phys->dd_used_bytes;
1895                         comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1896                         uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1897 
1898                         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1899                             ds->ds_phys->ds_unique_bytes == used);
1900 
1901                         bptree_add(mos, dp->dp_bptree_obj,
1902                             &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1903                             used, comp, uncomp, tx);
1904                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1905                             -used, -comp, -uncomp, tx);
1906                         dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1907                             used, comp, uncomp, tx);
1908                 }
1909 
1910                 if (ds->ds_prev != NULL) {
1911                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1912                                 VERIFY3U(0, ==, zap_remove_int(mos,
1913                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1914                                     ds->ds_object, tx));
1915                         }
1916                         dsl_dataset_rele(ds->ds_prev, ds);
1917                         ds->ds_prev = ds_prev = NULL;
1918                 }
1919         }
1920 
1921         /*
1922          * This must be done after the dsl_traverse(), because it will
1923          * re-open the objset.
1924          */
1925         if (ds->ds_objset) {
1926                 dmu_objset_evict(ds->ds_objset);
1927                 ds->ds_objset = NULL;
1928         }
1929 
1930         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1931                 /* Erase the link in the dir */
1932                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1933                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1934                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1935                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1936                 ASSERT(err == 0);
1937         } else {
1938                 /* remove from snapshot namespace */
1939                 dsl_dataset_t *ds_head;
1940                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1941                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1942                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1943                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1944 #ifdef ZFS_DEBUG
1945                 {
1946                         uint64_t val;
1947 
1948                         err = dsl_dataset_snap_lookup(ds_head,
1949                             ds->ds_snapname, &val);
1950                         ASSERT0(err);
1951                         ASSERT3U(val, ==, obj);
1952                 }
1953 #endif
1954                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1955                 ASSERT(err == 0);
1956                 dsl_dataset_rele(ds_head, FTAG);
1957         }
1958 
1959         if (ds_prev && ds->ds_prev != ds_prev)
1960                 dsl_dataset_rele(ds_prev, FTAG);
1961 
1962         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1963 
1964         if (ds->ds_phys->ds_next_clones_obj != 0) {
1965                 uint64_t count;
1966                 ASSERT(0 == zap_count(mos,
1967                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1968                 VERIFY(0 == dmu_object_free(mos,
1969                     ds->ds_phys->ds_next_clones_obj, tx));
1970         }
1971         if (ds->ds_phys->ds_props_obj != 0)
1972                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1973         if (ds->ds_phys->ds_userrefs_obj != 0)
1974                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1975         dsl_dir_close(ds->ds_dir, ds);
1976         ds->ds_dir = NULL;
1977         dsl_dataset_drain_refs(ds, tag);
1978         VERIFY(0 == dmu_object_free(mos, obj, tx));
1979 
1980         if (dsda->rm_origin) {
1981                 /*
1982                  * Remove the origin of the clone we just destroyed.
1983                  */
1984                 struct dsl_ds_destroyarg ndsda = {0};
1985 
1986                 ndsda.ds = dsda->rm_origin;
1987                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
1988         }
1989 }
1990 
1991 static int
1992 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1993 {
1994         uint64_t asize;
1995 
1996         if (!dmu_tx_is_syncing(tx))
1997                 return (0);
1998 
1999         /*
2000          * If there's an fs-only reservation, any blocks that might become
2001          * owned by the snapshot dataset must be accommodated by space
2002          * outside of the reservation.
2003          */
2004         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2005         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2006         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2007                 return (ENOSPC);
2008 
2009         /*
2010          * Propagate any reserved space for this snapshot to other
2011          * snapshot checks in this sync group.
2012          */
2013         if (asize > 0)
2014                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2015 
2016         return (0);
2017 }
2018 
2019 /*
2020  * Check if adding additional snapshot(s) would exceed any snapshot quotas.
2021  * Note that all snapshot quotas up to the root dataset (i.e. the pool itself)
2022  * or the given ancestor must be satisfied. Note that it is valid for the
2023  * count to exceed the quota. This can happen if a recursive snapshot is taken
2024  * from a dataset above this one.
2025  */
2026 int
2027 dsl_snapcount_check(dsl_dir_t *dd, dmu_tx_t *tx, uint64_t cnt,
2028     dsl_dir_t *ancestor)
2029 {
2030         uint64_t quota;
2031         int err = 0;
2032 
2033         /*
2034          * As with dsl_dataset_set_reservation_check(), don't run this check in
2035          * open context.
2036          */
2037         if (!dmu_tx_is_syncing(tx))
2038                 return (0);
2039 
2040         /*
2041          * If renaming a dataset with no snapshots, count adjustment is 0.
2042          * Likewise when taking a recursive snapshot below the top-level (see
2043          * the comment in snapshot_check() for more details).
2044          */
2045         if (cnt == 0)
2046                 return (0);
2047 
2048         /*
2049          * If an ancestor has been provided, stop checking the quota once we
2050          * hit that dir. We need this during rename so that we don't overcount
2051          * the check once we recurse up to the common ancestor.
2052          */
2053         if (ancestor == dd)
2054                 return (0);
2055 
2056         /*
2057          * If there's no value for this property, there's no need to enforce a
2058          * snapshot quota.
2059          */
2060         err = dsl_prop_get_dd(dd, zfs_prop_to_name(ZFS_PROP_SNAPSHOT_QUOTA),
2061             8, 1, &quota, NULL, B_FALSE);
2062         if (err == ENOENT)
2063                 return (0);
2064         else if (err != 0)
2065                 return (err);
2066 
2067 #ifdef _KERNEL
2068         extern void __dtrace_probe_zfs__ss__quota(uint64_t, uint64_t, char *);
2069         __dtrace_probe_zfs__ss__quota(
2070             (uint64_t)dd->dd_phys->dd_snapshot_count, (uint64_t)quota,
2071             dd->dd_myname);
2072 #endif
2073 
2074         if (quota > 0 && (dd->dd_phys->dd_snapshot_count + cnt) > quota)
2075                 return (EDQUOT);
2076 
2077         if (dd->dd_parent != NULL)
2078                 err = dsl_snapcount_check(dd->dd_parent, tx, cnt, ancestor);
2079 
2080         return (err);
2081 }
2082 
2083 /*
2084  * Adjust the snapshot count for the specified dsl_dir_t and all parents.
2085  * When a new snapshot is created, increment the count on all parents, and when
2086  * a snapshot is destroyed, decrement the count.
2087  */
2088 void
2089 dsl_snapcount_adjust(dsl_dir_t *dd, dmu_tx_t *tx, int64_t delta,
2090     boolean_t first)
2091 {
2092         /*
2093          * On initial entry we need to check if this feature is active, but
2094          * we don't want to re-check this on each recursive call. Note: the
2095          * feature cannot be active if its not enabled. If the feature is not
2096          * active, don't touch the on-disk count fields.
2097          */
2098         if (first) {
2099                 dsl_dataset_t *ds = NULL;
2100                 spa_t *spa;
2101                 zfeature_info_t *quota_feat =
2102                     &spa_feature_table[SPA_FEATURE_DS_SS_QUOTA];
2103 
2104                 VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2105                     dd->dd_phys->dd_head_dataset_obj, FTAG, &ds));
2106                 spa = dsl_dataset_get_spa(ds);
2107                 dsl_dataset_rele(ds, FTAG);
2108                 if (!spa_feature_is_active(spa, quota_feat))
2109                         return;
2110         }
2111 
2112         /*
2113          * As with dsl_dataset_set_reservation_check(), wdon't want to run
2114          * this check in open context.
2115          */
2116         if (!dmu_tx_is_syncing(tx))
2117                 return;
2118 
2119         /* if renaming a dataset with no snapshots, count adjustment is 0 */
2120         if (delta == 0)
2121                 return;
2122 
2123         /* Increment count for parent */
2124         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2125 
2126         mutex_enter(&dd->dd_lock);
2127 
2128         /*
2129          * Counts may be incorrect if dealing with an existing pool and
2130          * there has never been a quota set in the dataset hierarchy.
2131          * This is not an error.
2132          */
2133         if (delta < 0 && dd->dd_phys->dd_snapshot_count < (delta * -1)) {
2134 #ifdef _KERNEL
2135                 extern void __dtrace_probe_zfs__sscnt__adj__neg(char *);
2136                 __dtrace_probe_zfs__sscnt__adj__neg(dd->dd_myname);
2137 #endif
2138                 mutex_exit(&dd->dd_lock);
2139                 return;
2140         }
2141 
2142         dd->dd_phys->dd_snapshot_count += delta;
2143 
2144         /* Roll up this additional count into our ancestors */
2145 
2146         if (dd->dd_parent != NULL)
2147                 dsl_snapcount_adjust(dd->dd_parent, tx, delta, B_FALSE);
2148 
2149         mutex_exit(&dd->dd_lock);
2150 }
2151 
2152 int
2153 dsl_dataset_snapshot_check(dsl_dataset_t *ds, const char *snapname,
2154     uint64_t cnt, dmu_tx_t *tx)
2155 {
2156         int err;
2157         uint64_t value;
2158 
2159         /*
2160          * We don't allow multiple snapshots of the same txg.  If there
2161          * is already one, try again.
2162          */
2163         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2164                 return (EAGAIN);
2165 
2166         /*
2167          * Check for conflicting snapshot name.
2168          */
2169         err = dsl_dataset_snap_lookup(ds, snapname, &value);
2170         if (err == 0)
2171                 return (EEXIST);
2172         if (err != ENOENT)
2173                 return (err);
2174 
2175         /*
2176          * Check that the dataset's name is not too long.  Name consists
2177          * of the dataset's length + 1 for the @-sign + snapshot name's length
2178          */
2179         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2180                 return (ENAMETOOLONG);
2181 
2182         err = dsl_snapcount_check(ds->ds_dir, tx, cnt, NULL);
2183         if (err)
2184                 return (err);
2185 
2186         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2187         if (err)
2188                 return (err);
2189 
2190         ds->ds_trysnap_txg = tx->tx_txg;
2191         return (0);
2192 }
2193 
2194 void
2195 dsl_dataset_snapshot_sync(dsl_dataset_t *ds, const char *snapname,
2196     dmu_tx_t *tx)
2197 {
2198         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2199         dmu_buf_t *dbuf;
2200         dsl_dataset_phys_t *dsphys;
2201         uint64_t dsobj, crtxg;
2202         objset_t *mos = dp->dp_meta_objset;
2203         int err;
2204 
2205         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2206 
2207         dsl_snapcount_adjust(ds->ds_dir, tx, 1, B_TRUE);
2208 
2209         /*
2210          * The origin's ds_creation_txg has to be < TXG_INITIAL
2211          */
2212         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2213                 crtxg = 1;
2214         else
2215                 crtxg = tx->tx_txg;
2216 
2217         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2218             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2219         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2220         dmu_buf_will_dirty(dbuf, tx);
2221         dsphys = dbuf->db_data;
2222         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2223         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2224         dsphys->ds_fsid_guid = unique_create();
2225         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2226             sizeof (dsphys->ds_guid));
2227         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2228         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2229         dsphys->ds_next_snap_obj = ds->ds_object;
2230         dsphys->ds_num_children = 1;
2231         dsphys->ds_creation_time = gethrestime_sec();
2232         dsphys->ds_creation_txg = crtxg;
2233         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2234         dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2235         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2236         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2237         dsphys->ds_flags = ds->ds_phys->ds_flags;
2238         dsphys->ds_bp = ds->ds_phys->ds_bp;
2239         dmu_buf_rele(dbuf, FTAG);
2240 
2241         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2242         if (ds->ds_prev) {
2243                 uint64_t next_clones_obj =
2244                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2245                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2246                     ds->ds_object ||
2247                     ds->ds_prev->ds_phys->ds_num_children > 1);
2248                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2249                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2250                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2251                             ds->ds_prev->ds_phys->ds_creation_txg);
2252                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2253                 } else if (next_clones_obj != 0) {
2254                         remove_from_next_clones(ds->ds_prev,
2255                             dsphys->ds_next_snap_obj, tx);
2256                         VERIFY3U(0, ==, zap_add_int(mos,
2257                             next_clones_obj, dsobj, tx));
2258                 }
2259         }
2260 
2261         /*
2262          * If we have a reference-reservation on this dataset, we will
2263          * need to increase the amount of refreservation being charged
2264          * since our unique space is going to zero.
2265          */
2266         if (ds->ds_reserved) {
2267                 int64_t delta;
2268                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2269                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2270                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2271                     delta, 0, 0, tx);
2272         }
2273 
2274         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2275         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2276             ds->ds_dir->dd_myname, snapname, dsobj,
2277             ds->ds_phys->ds_prev_snap_txg);
2278         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2279             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2280         dsl_deadlist_close(&ds->ds_deadlist);
2281         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2282         dsl_deadlist_add_key(&ds->ds_deadlist,
2283             ds->ds_phys->ds_prev_snap_txg, tx);
2284 
2285         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2286         ds->ds_phys->ds_prev_snap_obj = dsobj;
2287         ds->ds_phys->ds_prev_snap_txg = crtxg;
2288         ds->ds_phys->ds_unique_bytes = 0;
2289         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2290                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2291 
2292         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2293             snapname, 8, 1, &dsobj, tx);
2294         ASSERT(err == 0);
2295 
2296         if (ds->ds_prev)
2297                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2298         VERIFY(0 == dsl_dataset_get_ref(dp,
2299             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2300 
2301         dsl_scan_ds_snapshotted(ds, tx);
2302 
2303         dsl_dir_snap_cmtime_update(ds->ds_dir);
2304 
2305         spa_history_log_internal_ds(ds->ds_prev, "snapshot", tx, "");
2306 }
2307 
2308 void
2309 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2310 {
2311         ASSERT(dmu_tx_is_syncing(tx));
2312         ASSERT(ds->ds_objset != NULL);
2313         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2314 
2315         /*
2316          * in case we had to change ds_fsid_guid when we opened it,
2317          * sync it out now.
2318          */
2319         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2320         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2321 
2322         dmu_objset_sync(ds->ds_objset, zio, tx);
2323 }
2324 
2325 static void
2326 get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2327 {
2328         uint64_t count = 0;
2329         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2330         zap_cursor_t zc;
2331         zap_attribute_t za;
2332         nvlist_t *propval;
2333         nvlist_t *val;
2334 
2335         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2336         VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2337         VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2338 
2339         /*
2340          * There may me missing entries in ds_next_clones_obj
2341          * due to a bug in a previous version of the code.
2342          * Only trust it if it has the right number of entries.
2343          */
2344         if (ds->ds_phys->ds_next_clones_obj != 0) {
2345                 ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2346                     &count));
2347         }
2348         if (count != ds->ds_phys->ds_num_children - 1) {
2349                 goto fail;
2350         }
2351         for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2352             zap_cursor_retrieve(&zc, &za) == 0;
2353             zap_cursor_advance(&zc)) {
2354                 dsl_dataset_t *clone;
2355                 char buf[ZFS_MAXNAMELEN];
2356                 /*
2357                  * Even though we hold the dp_config_rwlock, the dataset
2358                  * may fail to open, returning ENOENT.  If there is a
2359                  * thread concurrently attempting to destroy this
2360                  * dataset, it will have the ds_rwlock held for
2361                  * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2362                  * dsl_dataset_hold_ref() will fail its
2363                  * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2364                  * dp_config_rwlock, and wait for the destroy progress
2365                  * and signal ds_exclusive_cv.  If the destroy was
2366                  * successful, we will see that
2367                  * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2368                  */
2369                 if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2370                     za.za_first_integer, FTAG, &clone) != 0)
2371                         continue;
2372                 dsl_dir_name(clone->ds_dir, buf);
2373                 VERIFY(nvlist_add_boolean(val, buf) == 0);
2374                 dsl_dataset_rele(clone, FTAG);
2375         }
2376         zap_cursor_fini(&zc);
2377         VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2378         VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2379             propval) == 0);
2380 fail:
2381         nvlist_free(val);
2382         nvlist_free(propval);
2383         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2384 }
2385 
2386 void
2387 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2388 {
2389         uint64_t refd, avail, uobjs, aobjs, ratio;
2390 
2391         ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2392             (ds->ds_phys->ds_uncompressed_bytes * 100 /
2393             ds->ds_phys->ds_compressed_bytes);
2394 
2395         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2396 
2397         if (dsl_dataset_is_snapshot(ds)) {
2398                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2399                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2400                     ds->ds_phys->ds_unique_bytes);
2401                 get_clones_stat(ds, nv);
2402         } else {
2403                 dsl_dir_stats(ds->ds_dir, nv);
2404         }
2405 
2406         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2407         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2408         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2409 
2410         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2411             ds->ds_phys->ds_creation_time);
2412         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2413             ds->ds_phys->ds_creation_txg);
2414         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2415             ds->ds_quota);
2416         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2417             ds->ds_reserved);
2418         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2419             ds->ds_phys->ds_guid);
2420         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2421             ds->ds_phys->ds_unique_bytes);
2422         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2423             ds->ds_object);
2424         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2425             ds->ds_userrefs);
2426         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2427             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2428 
2429         if (ds->ds_phys->ds_prev_snap_obj != 0) {
2430                 uint64_t written, comp, uncomp;
2431                 dsl_pool_t *dp = ds->ds_dir->dd_pool;
2432                 dsl_dataset_t *prev;
2433 
2434                 rw_enter(&dp->dp_config_rwlock, RW_READER);
2435                 int err = dsl_dataset_hold_obj(dp,
2436                     ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2437                 rw_exit(&dp->dp_config_rwlock);
2438                 if (err == 0) {
2439                         err = dsl_dataset_space_written(prev, ds, &written,
2440                             &comp, &uncomp);
2441                         dsl_dataset_rele(prev, FTAG);
2442                         if (err == 0) {
2443                                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2444                                     written);
2445                         }
2446                 }
2447         }
2448 }
2449 
2450 void
2451 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2452 {
2453         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2454         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2455         stat->dds_guid = ds->ds_phys->ds_guid;
2456         stat->dds_origin[0] = '\0';
2457         if (dsl_dataset_is_snapshot(ds)) {
2458                 stat->dds_is_snapshot = B_TRUE;
2459                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2460         } else {
2461                 stat->dds_is_snapshot = B_FALSE;
2462                 stat->dds_num_clones = 0;
2463 
2464                 rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2465                 if (dsl_dir_is_clone(ds->ds_dir)) {
2466                         dsl_dataset_t *ods;
2467 
2468                         VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2469                             ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2470                         dsl_dataset_name(ods, stat->dds_origin);
2471                         dsl_dataset_drop_ref(ods, FTAG);
2472                 }
2473                 rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2474         }
2475 }
2476 
2477 uint64_t
2478 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2479 {
2480         return (ds->ds_fsid_guid);
2481 }
2482 
2483 void
2484 dsl_dataset_space(dsl_dataset_t *ds,
2485     uint64_t *refdbytesp, uint64_t *availbytesp,
2486     uint64_t *usedobjsp, uint64_t *availobjsp)
2487 {
2488         *refdbytesp = ds->ds_phys->ds_referenced_bytes;
2489         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2490         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2491                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2492         if (ds->ds_quota != 0) {
2493                 /*
2494                  * Adjust available bytes according to refquota
2495                  */
2496                 if (*refdbytesp < ds->ds_quota)
2497                         *availbytesp = MIN(*availbytesp,
2498                             ds->ds_quota - *refdbytesp);
2499                 else
2500                         *availbytesp = 0;
2501         }
2502         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2503         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2504 }
2505 
2506 boolean_t
2507 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2508 {
2509         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2510 
2511         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2512             dsl_pool_sync_context(dp));
2513         if (ds->ds_prev == NULL)
2514                 return (B_FALSE);
2515         if (ds->ds_phys->ds_bp.blk_birth >
2516             ds->ds_prev->ds_phys->ds_creation_txg) {
2517                 objset_t *os, *os_prev;
2518                 /*
2519                  * It may be that only the ZIL differs, because it was
2520                  * reset in the head.  Don't count that as being
2521                  * modified.
2522                  */
2523                 if (dmu_objset_from_ds(ds, &os) != 0)
2524                         return (B_TRUE);
2525                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2526                         return (B_TRUE);
2527                 return (bcmp(&os->os_phys->os_meta_dnode,
2528                     &os_prev->os_phys->os_meta_dnode,
2529                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2530         }
2531         return (B_FALSE);
2532 }
2533 
2534 /* ARGSUSED */
2535 static int
2536 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2537 {
2538         dsl_dataset_t *ds = arg1;
2539         char *newsnapname = arg2;
2540         dsl_dir_t *dd = ds->ds_dir;
2541         dsl_dataset_t *hds;
2542         uint64_t val;
2543         int err;
2544 
2545         err = dsl_dataset_hold_obj(dd->dd_pool,
2546             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2547         if (err)
2548                 return (err);
2549 
2550         /* new name better not be in use */
2551         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2552         dsl_dataset_rele(hds, FTAG);
2553 
2554         if (err == 0)
2555                 err = EEXIST;
2556         else if (err == ENOENT)
2557                 err = 0;
2558 
2559         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2560         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2561                 err = ENAMETOOLONG;
2562 
2563         return (err);
2564 }
2565 
2566 static void
2567 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2568 {
2569         dsl_dataset_t *ds = arg1;
2570         const char *newsnapname = arg2;
2571         dsl_dir_t *dd = ds->ds_dir;
2572         objset_t *mos = dd->dd_pool->dp_meta_objset;
2573         dsl_dataset_t *hds;
2574         int err;
2575 
2576         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2577 
2578         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2579             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2580 
2581         VERIFY(0 == dsl_dataset_get_snapname(ds));
2582         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2583         ASSERT0(err);
2584         mutex_enter(&ds->ds_lock);
2585         (void) strcpy(ds->ds_snapname, newsnapname);
2586         mutex_exit(&ds->ds_lock);
2587         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2588             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2589         ASSERT0(err);
2590 
2591         spa_history_log_internal_ds(ds, "rename", tx,
2592             "-> @%s", newsnapname);
2593         dsl_dataset_rele(hds, FTAG);
2594 }
2595 
2596 struct renamesnaparg {
2597         dsl_sync_task_group_t *dstg;
2598         char failed[MAXPATHLEN];
2599         char *oldsnap;
2600         char *newsnap;
2601 };
2602 
2603 static int
2604 dsl_snapshot_rename_one(const char *name, void *arg)
2605 {
2606         struct renamesnaparg *ra = arg;
2607         dsl_dataset_t *ds = NULL;
2608         char *snapname;
2609         int err;
2610 
2611         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2612         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2613 
2614         /*
2615          * For recursive snapshot renames the parent won't be changing
2616          * so we just pass name for both the to/from argument.
2617          */
2618         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2619         if (err != 0) {
2620                 strfree(snapname);
2621                 return (err == ENOENT ? 0 : err);
2622         }
2623 
2624 #ifdef _KERNEL
2625         /*
2626          * For all filesystems undergoing rename, we'll need to unmount it.
2627          */
2628         (void) zfs_unmount_snap(snapname, NULL);
2629 #endif
2630         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2631         strfree(snapname);
2632         if (err != 0)
2633                 return (err == ENOENT ? 0 : err);
2634 
2635         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2636             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2637 
2638         return (0);
2639 }
2640 
2641 static int
2642 dsl_recursive_rename(char *oldname, const char *newname)
2643 {
2644         int err;
2645         struct renamesnaparg *ra;
2646         dsl_sync_task_t *dst;
2647         spa_t *spa;
2648         char *cp, *fsname = spa_strdup(oldname);
2649         int len = strlen(oldname) + 1;
2650 
2651         /* truncate the snapshot name to get the fsname */
2652         cp = strchr(fsname, '@');
2653         *cp = '\0';
2654 
2655         err = spa_open(fsname, &spa, FTAG);
2656         if (err) {
2657                 kmem_free(fsname, len);
2658                 return (err);
2659         }
2660         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2661         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2662 
2663         ra->oldsnap = strchr(oldname, '@') + 1;
2664         ra->newsnap = strchr(newname, '@') + 1;
2665         *ra->failed = '\0';
2666 
2667         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2668             DS_FIND_CHILDREN);
2669         kmem_free(fsname, len);
2670 
2671         if (err == 0) {
2672                 err = dsl_sync_task_group_wait(ra->dstg);
2673         }
2674 
2675         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2676             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2677                 dsl_dataset_t *ds = dst->dst_arg1;
2678                 if (dst->dst_err) {
2679                         dsl_dir_name(ds->ds_dir, ra->failed);
2680                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2681                         (void) strlcat(ra->failed, ra->newsnap,
2682                             sizeof (ra->failed));
2683                 }
2684                 dsl_dataset_rele(ds, ra->dstg);
2685         }
2686 
2687         if (err)
2688                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2689 
2690         dsl_sync_task_group_destroy(ra->dstg);
2691         kmem_free(ra, sizeof (struct renamesnaparg));
2692         spa_close(spa, FTAG);
2693         return (err);
2694 }
2695 
2696 static int
2697 dsl_valid_rename(const char *oldname, void *arg)
2698 {
2699         int delta = *(int *)arg;
2700 
2701         if (strlen(oldname) + delta >= MAXNAMELEN)
2702                 return (ENAMETOOLONG);
2703 
2704         return (0);
2705 }
2706 
2707 #pragma weak dmu_objset_rename = dsl_dataset_rename
2708 int
2709 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2710 {
2711         dsl_dir_t *dd;
2712         dsl_dataset_t *ds;
2713         const char *tail;
2714         int err;
2715 
2716         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2717         if (err)
2718                 return (err);
2719 
2720         if (tail == NULL) {
2721                 int delta = strlen(newname) - strlen(oldname);
2722 
2723                 /* if we're growing, validate child name lengths */
2724                 if (delta > 0)
2725                         err = dmu_objset_find(oldname, dsl_valid_rename,
2726                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2727 
2728                 if (err == 0)
2729                         err = dsl_dir_rename(dd, newname);
2730                 dsl_dir_close(dd, FTAG);
2731                 return (err);
2732         }
2733 
2734         if (tail[0] != '@') {
2735                 /* the name ended in a nonexistent component */
2736                 dsl_dir_close(dd, FTAG);
2737                 return (ENOENT);
2738         }
2739 
2740         dsl_dir_close(dd, FTAG);
2741 
2742         /* new name must be snapshot in same filesystem */
2743         tail = strchr(newname, '@');
2744         if (tail == NULL)
2745                 return (EINVAL);
2746         tail++;
2747         if (strncmp(oldname, newname, tail - newname) != 0)
2748                 return (EXDEV);
2749 
2750         if (recursive) {
2751                 err = dsl_recursive_rename(oldname, newname);
2752         } else {
2753                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2754                 if (err)
2755                         return (err);
2756 
2757                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2758                     dsl_dataset_snapshot_rename_check,
2759                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2760 
2761                 dsl_dataset_rele(ds, FTAG);
2762         }
2763 
2764         return (err);
2765 }
2766 
2767 struct promotenode {
2768         list_node_t link;
2769         dsl_dataset_t *ds;
2770 };
2771 
2772 struct promotearg {
2773         list_t shared_snaps, origin_snaps, clone_snaps;
2774         dsl_dataset_t *origin_origin;
2775         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2776         char *err_ds;
2777 };
2778 
2779 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2780 static boolean_t snaplist_unstable(list_t *l);
2781 
2782 static int
2783 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2784 {
2785         dsl_dataset_t *hds = arg1;
2786         struct promotearg *pa = arg2;
2787         struct promotenode *snap = list_head(&pa->shared_snaps);
2788         dsl_dataset_t *origin_ds = snap->ds;
2789         int err;
2790         uint64_t unused;
2791 
2792         /* Check that it is a real clone */
2793         if (!dsl_dir_is_clone(hds->ds_dir))
2794                 return (EINVAL);
2795 
2796         /* Since this is so expensive, don't do the preliminary check */
2797         if (!dmu_tx_is_syncing(tx))
2798                 return (0);
2799 
2800         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2801                 return (EXDEV);
2802 
2803         /* compute origin's new unique space */
2804         snap = list_tail(&pa->clone_snaps);
2805         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2806         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2807             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2808             &pa->unique, &unused, &unused);
2809 
2810         /*
2811          * Walk the snapshots that we are moving
2812          *
2813          * Compute space to transfer.  Consider the incremental changes
2814          * to used for each snapshot:
2815          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2816          * So each snapshot gave birth to:
2817          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2818          * So a sequence would look like:
2819          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2820          * Which simplifies to:
2821          * uN + kN + kN-1 + ... + k1 + k0
2822          * Note however, if we stop before we reach the ORIGIN we get:
2823          * uN + kN + kN-1 + ... + kM - uM-1
2824          */
2825         pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2826         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2827         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2828         for (snap = list_head(&pa->shared_snaps); snap;
2829             snap = list_next(&pa->shared_snaps, snap)) {
2830                 uint64_t val, dlused, dlcomp, dluncomp;
2831                 dsl_dataset_t *ds = snap->ds;
2832 
2833                 /* Check that the snapshot name does not conflict */
2834                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2835                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2836                 if (err == 0) {
2837                         err = EEXIST;
2838                         goto out;
2839                 }
2840                 if (err != ENOENT)
2841                         goto out;
2842 
2843                 /* The very first snapshot does not have a deadlist */
2844                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2845                         continue;
2846 
2847                 dsl_deadlist_space(&ds->ds_deadlist,
2848                     &dlused, &dlcomp, &dluncomp);
2849                 pa->used += dlused;
2850                 pa->comp += dlcomp;
2851                 pa->uncomp += dluncomp;
2852         }
2853 
2854         /*
2855          * If we are a clone of a clone then we never reached ORIGIN,
2856          * so we need to subtract out the clone origin's used space.
2857          */
2858         if (pa->origin_origin) {
2859                 pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2860                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2861                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2862         }
2863 
2864         /* Check that there is enough space and quota headroom here */
2865         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2866             origin_ds->ds_dir, pa->used, tx);
2867         if (err)
2868                 return (err);
2869 
2870         /*
2871          * Compute the amounts of space that will be used by snapshots
2872          * after the promotion (for both origin and clone).  For each,
2873          * it is the amount of space that will be on all of their
2874          * deadlists (that was not born before their new origin).
2875          */
2876         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2877                 uint64_t space;
2878 
2879                 /*
2880                  * Note, typically this will not be a clone of a clone,
2881                  * so dd_origin_txg will be < TXG_INITIAL, so
2882                  * these snaplist_space() -> dsl_deadlist_space_range()
2883                  * calls will be fast because they do not have to
2884                  * iterate over all bps.
2885                  */
2886                 snap = list_head(&pa->origin_snaps);
2887                 err = snaplist_space(&pa->shared_snaps,
2888                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2889                 if (err)
2890                         return (err);
2891 
2892                 err = snaplist_space(&pa->clone_snaps,
2893                     snap->ds->ds_dir->dd_origin_txg, &space);
2894                 if (err)
2895                         return (err);
2896                 pa->cloneusedsnap += space;
2897         }
2898         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2899                 err = snaplist_space(&pa->origin_snaps,
2900                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2901                 if (err)
2902                         return (err);
2903         }
2904 
2905         return (0);
2906 out:
2907         pa->err_ds =  snap->ds->ds_snapname;
2908         return (err);
2909 }
2910 
2911 static void
2912 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2913 {
2914         dsl_dataset_t *hds = arg1;
2915         struct promotearg *pa = arg2;
2916         struct promotenode *snap = list_head(&pa->shared_snaps);
2917         dsl_dataset_t *origin_ds = snap->ds;
2918         dsl_dataset_t *origin_head;
2919         dsl_dir_t *dd = hds->ds_dir;
2920         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2921         dsl_dir_t *odd = NULL;
2922         uint64_t oldnext_obj;
2923         int64_t delta;
2924 
2925         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2926 
2927         snap = list_head(&pa->origin_snaps);
2928         origin_head = snap->ds;
2929 
2930         /*
2931          * We need to explicitly open odd, since origin_ds's dd will be
2932          * changing.
2933          */
2934         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2935             NULL, FTAG, &odd));
2936 
2937         /* change origin's next snap */
2938         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2939         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2940         snap = list_tail(&pa->clone_snaps);
2941         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2942         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2943 
2944         /* change the origin's next clone */
2945         if (origin_ds->ds_phys->ds_next_clones_obj) {
2946                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2947                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2948                     origin_ds->ds_phys->ds_next_clones_obj,
2949                     oldnext_obj, tx));
2950         }
2951 
2952         /* change origin */
2953         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2954         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2955         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2956         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2957         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2958         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2959         origin_head->ds_dir->dd_origin_txg =
2960             origin_ds->ds_phys->ds_creation_txg;
2961 
2962         /* change dd_clone entries */
2963         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2964                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2965                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2966                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2967                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2968                     hds->ds_object, tx));
2969 
2970                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2971                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2972                     origin_head->ds_object, tx));
2973                 if (dd->dd_phys->dd_clones == 0) {
2974                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2975                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2976                 }
2977                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2978                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2979 
2980         }
2981 
2982         /* move snapshots to this dir */
2983         for (snap = list_head(&pa->shared_snaps); snap;
2984             snap = list_next(&pa->shared_snaps, snap)) {
2985                 dsl_dataset_t *ds = snap->ds;
2986 
2987                 /* unregister props as dsl_dir is changing */
2988                 if (ds->ds_objset) {
2989                         dmu_objset_evict(ds->ds_objset);
2990                         ds->ds_objset = NULL;
2991                 }
2992                 /* move snap name entry */
2993                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2994                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2995                     ds->ds_snapname, tx));
2996                 VERIFY(0 == zap_add(dp->dp_meta_objset,
2997                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2998                     8, 1, &ds->ds_object, tx));
2999                 dsl_snapcount_adjust(hds->ds_dir, tx, 1, B_TRUE);
3000 
3001                 /* change containing dsl_dir */
3002                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3003                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
3004                 ds->ds_phys->ds_dir_obj = dd->dd_object;
3005                 ASSERT3P(ds->ds_dir, ==, odd);
3006                 dsl_dir_close(ds->ds_dir, ds);
3007                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
3008                     NULL, ds, &ds->ds_dir));
3009 
3010                 /* move any clone references */
3011                 if (ds->ds_phys->ds_next_clones_obj &&
3012                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
3013                         zap_cursor_t zc;
3014                         zap_attribute_t za;
3015 
3016                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
3017                             ds->ds_phys->ds_next_clones_obj);
3018                             zap_cursor_retrieve(&zc, &za) == 0;
3019                             zap_cursor_advance(&zc)) {
3020                                 dsl_dataset_t *cnds;
3021                                 uint64_t o;
3022 
3023                                 if (za.za_first_integer == oldnext_obj) {
3024                                         /*
3025                                          * We've already moved the
3026                                          * origin's reference.
3027                                          */
3028                                         continue;
3029                                 }
3030 
3031                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
3032                                     za.za_first_integer, FTAG, &cnds));
3033                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
3034 
3035                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
3036                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
3037                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
3038                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
3039                                 dsl_dataset_rele(cnds, FTAG);
3040                         }
3041                         zap_cursor_fini(&zc);
3042                 }
3043 
3044                 ASSERT0(dsl_prop_numcb(ds));
3045         }
3046 
3047         /*
3048          * Change space accounting.
3049          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
3050          * both be valid, or both be 0 (resulting in delta == 0).  This
3051          * is true for each of {clone,origin} independently.
3052          */
3053 
3054         delta = pa->cloneusedsnap -
3055             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
3056         ASSERT3S(delta, >=, 0);
3057         ASSERT3U(pa->used, >=, delta);
3058         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
3059         dsl_dir_diduse_space(dd, DD_USED_HEAD,
3060             pa->used - delta, pa->comp, pa->uncomp, tx);
3061 
3062         delta = pa->originusedsnap -
3063             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
3064         ASSERT3S(delta, <=, 0);
3065         ASSERT3U(pa->used, >=, -delta);
3066         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
3067         dsl_dir_diduse_space(odd, DD_USED_HEAD,
3068             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
3069 
3070         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
3071 
3072         /* log history record */
3073         spa_history_log_internal_ds(hds, "promote", tx, "");
3074 
3075         dsl_dir_close(odd, FTAG);
3076 }
3077 
3078 static char *snaplist_tag = "snaplist";
3079 /*
3080  * Make a list of dsl_dataset_t's for the snapshots between first_obj
3081  * (exclusive) and last_obj (inclusive).  The list will be in reverse
3082  * order (last_obj will be the list_head()).  If first_obj == 0, do all
3083  * snapshots back to this dataset's origin.
3084  */
3085 static int
3086 snaplist_make(dsl_pool_t *dp, boolean_t own,
3087     uint64_t first_obj, uint64_t last_obj, list_t *l)
3088 {
3089         uint64_t obj = last_obj;
3090 
3091         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
3092 
3093         list_create(l, sizeof (struct promotenode),
3094             offsetof(struct promotenode, link));
3095 
3096         while (obj != first_obj) {
3097                 dsl_dataset_t *ds;
3098                 struct promotenode *snap;
3099                 int err;
3100 
3101                 if (own) {
3102                         err = dsl_dataset_own_obj(dp, obj,
3103                             0, snaplist_tag, &ds);
3104                         if (err == 0)
3105                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
3106                 } else {
3107                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
3108                 }
3109                 if (err == ENOENT) {
3110                         /* lost race with snapshot destroy */
3111                         struct promotenode *last = list_tail(l);
3112                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
3113                         obj = last->ds->ds_phys->ds_prev_snap_obj;
3114                         continue;
3115                 } else if (err) {
3116                         return (err);
3117                 }
3118 
3119                 if (first_obj == 0)
3120                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
3121 
3122                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
3123                 snap->ds = ds;
3124                 list_insert_tail(l, snap);
3125                 obj = ds->ds_phys->ds_prev_snap_obj;
3126         }
3127 
3128         return (0);
3129 }
3130 
3131 static int
3132 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3133 {
3134         struct promotenode *snap;
3135 
3136         *spacep = 0;
3137         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3138                 uint64_t used, comp, uncomp;
3139                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3140                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
3141                 *spacep += used;
3142         }
3143         return (0);
3144 }
3145 
3146 static void
3147 snaplist_destroy(list_t *l, boolean_t own)
3148 {
3149         struct promotenode *snap;
3150 
3151         if (!l || !list_link_active(&l->list_head))
3152                 return;
3153 
3154         while ((snap = list_tail(l)) != NULL) {
3155                 list_remove(l, snap);
3156                 if (own)
3157                         dsl_dataset_disown(snap->ds, snaplist_tag);
3158                 else
3159                         dsl_dataset_rele(snap->ds, snaplist_tag);
3160                 kmem_free(snap, sizeof (struct promotenode));
3161         }
3162         list_destroy(l);
3163 }
3164 
3165 /*
3166  * Promote a clone.  Nomenclature note:
3167  * "clone" or "cds": the original clone which is being promoted
3168  * "origin" or "ods": the snapshot which is originally clone's origin
3169  * "origin head" or "ohds": the dataset which is the head
3170  * (filesystem/volume) for the origin
3171  * "origin origin": the origin of the origin's filesystem (typically
3172  * NULL, indicating that the clone is not a clone of a clone).
3173  */
3174 int
3175 dsl_dataset_promote(const char *name, char *conflsnap)
3176 {
3177         dsl_dataset_t *ds;
3178         dsl_dir_t *dd;
3179         dsl_pool_t *dp;
3180         dmu_object_info_t doi;
3181         struct promotearg pa = { 0 };
3182         struct promotenode *snap;
3183         int err;
3184 
3185         err = dsl_dataset_hold(name, FTAG, &ds);
3186         if (err)
3187                 return (err);
3188         dd = ds->ds_dir;
3189         dp = dd->dd_pool;
3190 
3191         err = dmu_object_info(dp->dp_meta_objset,
3192             ds->ds_phys->ds_snapnames_zapobj, &doi);
3193         if (err) {
3194                 dsl_dataset_rele(ds, FTAG);
3195                 return (err);
3196         }
3197 
3198         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3199                 dsl_dataset_rele(ds, FTAG);
3200                 return (EINVAL);
3201         }
3202 
3203         /*
3204          * We are going to inherit all the snapshots taken before our
3205          * origin (i.e., our new origin will be our parent's origin).
3206          * Take ownership of them so that we can rename them into our
3207          * namespace.
3208          */
3209         rw_enter(&dp->dp_config_rwlock, RW_READER);
3210 
3211         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3212             &pa.shared_snaps);
3213         if (err != 0)
3214                 goto out;
3215 
3216         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3217         if (err != 0)
3218                 goto out;
3219 
3220         snap = list_head(&pa.shared_snaps);
3221         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3222         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3223             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3224         if (err != 0)
3225                 goto out;
3226 
3227         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3228                 err = dsl_dataset_hold_obj(dp,
3229                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
3230                     FTAG, &pa.origin_origin);
3231                 if (err != 0)
3232                         goto out;
3233         }
3234 
3235 out:
3236         rw_exit(&dp->dp_config_rwlock);
3237 
3238         /*
3239          * Add in 128x the snapnames zapobj size, since we will be moving
3240          * a bunch of snapnames to the promoted ds, and dirtying their
3241          * bonus buffers.
3242          */
3243         if (err == 0) {
3244                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3245                     dsl_dataset_promote_sync, ds, &pa,
3246                     2 + 2 * doi.doi_physical_blocks_512);
3247                 if (err && pa.err_ds && conflsnap)
3248                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3249         }
3250 
3251         snaplist_destroy(&pa.shared_snaps, B_TRUE);
3252         snaplist_destroy(&pa.clone_snaps, B_FALSE);
3253         snaplist_destroy(&pa.origin_snaps, B_FALSE);
3254         if (pa.origin_origin)
3255                 dsl_dataset_rele(pa.origin_origin, FTAG);
3256         dsl_dataset_rele(ds, FTAG);
3257         return (err);
3258 }
3259 
3260 struct cloneswaparg {
3261         dsl_dataset_t *cds; /* clone dataset */
3262         dsl_dataset_t *ohds; /* origin's head dataset */
3263         boolean_t force;
3264         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3265 };
3266 
3267 /* ARGSUSED */
3268 static int
3269 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3270 {
3271         struct cloneswaparg *csa = arg1;
3272 
3273         /* they should both be heads */
3274         if (dsl_dataset_is_snapshot(csa->cds) ||
3275             dsl_dataset_is_snapshot(csa->ohds))
3276                 return (EINVAL);
3277 
3278         /* the branch point should be just before them */
3279         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3280                 return (EINVAL);
3281 
3282         /* cds should be the clone (unless they are unrelated) */
3283         if (csa->cds->ds_prev != NULL &&
3284             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3285             csa->ohds->ds_object !=
3286             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3287                 return (EINVAL);
3288 
3289         /* the clone should be a child of the origin */
3290         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3291                 return (EINVAL);
3292 
3293         /* ohds shouldn't be modified unless 'force' */
3294         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3295                 return (ETXTBSY);
3296 
3297         /* adjust amount of any unconsumed refreservation */
3298         csa->unused_refres_delta =
3299             (int64_t)MIN(csa->ohds->ds_reserved,
3300             csa->ohds->ds_phys->ds_unique_bytes) -
3301             (int64_t)MIN(csa->ohds->ds_reserved,
3302             csa->cds->ds_phys->ds_unique_bytes);
3303 
3304         if (csa->unused_refres_delta > 0 &&
3305             csa->unused_refres_delta >
3306             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3307                 return (ENOSPC);
3308 
3309         if (csa->ohds->ds_quota != 0 &&
3310             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3311                 return (EDQUOT);
3312 
3313         return (0);
3314 }
3315 
3316 /* ARGSUSED */
3317 static void
3318 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3319 {
3320         struct cloneswaparg *csa = arg1;
3321         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3322 
3323         ASSERT(csa->cds->ds_reserved == 0);
3324         ASSERT(csa->ohds->ds_quota == 0 ||
3325             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3326 
3327         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3328         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3329 
3330         if (csa->cds->ds_objset != NULL) {
3331                 dmu_objset_evict(csa->cds->ds_objset);
3332                 csa->cds->ds_objset = NULL;
3333         }
3334 
3335         if (csa->ohds->ds_objset != NULL) {
3336                 dmu_objset_evict(csa->ohds->ds_objset);
3337                 csa->ohds->ds_objset = NULL;
3338         }
3339 
3340         /*
3341          * Reset origin's unique bytes, if it exists.
3342          */
3343         if (csa->cds->ds_prev) {
3344                 dsl_dataset_t *origin = csa->cds->ds_prev;
3345                 uint64_t comp, uncomp;
3346 
3347                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3348                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3349                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3350                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3351         }
3352 
3353         /* swap blkptrs */
3354         {
3355                 blkptr_t tmp;
3356                 tmp = csa->ohds->ds_phys->ds_bp;
3357                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3358                 csa->cds->ds_phys->ds_bp = tmp;
3359         }
3360 
3361         /* set dd_*_bytes */
3362         {
3363                 int64_t dused, dcomp, duncomp;
3364                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3365                 uint64_t odl_used, odl_comp, odl_uncomp;
3366 
3367                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3368                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3369 
3370                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3371                     &cdl_used, &cdl_comp, &cdl_uncomp);
3372                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3373                     &odl_used, &odl_comp, &odl_uncomp);
3374 
3375                 dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3376                     (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3377                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3378                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3379                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3380                     cdl_uncomp -
3381                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3382 
3383                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3384                     dused, dcomp, duncomp, tx);
3385                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3386                     -dused, -dcomp, -duncomp, tx);
3387 
3388                 /*
3389                  * The difference in the space used by snapshots is the
3390                  * difference in snapshot space due to the head's
3391                  * deadlist (since that's the only thing that's
3392                  * changing that affects the snapused).
3393                  */
3394                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3395                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3396                     &cdl_used, &cdl_comp, &cdl_uncomp);
3397                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3398                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3399                     &odl_used, &odl_comp, &odl_uncomp);
3400                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3401                     DD_USED_HEAD, DD_USED_SNAP, tx);
3402         }
3403 
3404         /* swap ds_*_bytes */
3405         SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3406             csa->cds->ds_phys->ds_referenced_bytes);
3407         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3408             csa->cds->ds_phys->ds_compressed_bytes);
3409         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3410             csa->cds->ds_phys->ds_uncompressed_bytes);
3411         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3412             csa->cds->ds_phys->ds_unique_bytes);
3413 
3414         /* apply any parent delta for change in unconsumed refreservation */
3415         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3416             csa->unused_refres_delta, 0, 0, tx);
3417 
3418         /*
3419          * Swap deadlists.
3420          */
3421         dsl_deadlist_close(&csa->cds->ds_deadlist);
3422         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3423         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3424             csa->cds->ds_phys->ds_deadlist_obj);
3425         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3426             csa->cds->ds_phys->ds_deadlist_obj);
3427         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3428             csa->ohds->ds_phys->ds_deadlist_obj);
3429 
3430         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3431 
3432         spa_history_log_internal_ds(csa->cds, "clone swap", tx,
3433             "parent=%s", csa->ohds->ds_dir->dd_myname);
3434 }
3435 
3436 /*
3437  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3438  * recv" into an existing fs to swizzle the file system to the new
3439  * version, and by "zfs rollback".  Can also be used to swap two
3440  * independent head datasets if neither has any snapshots.
3441  */
3442 int
3443 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3444     boolean_t force)
3445 {
3446         struct cloneswaparg csa;
3447         int error;
3448 
3449         ASSERT(clone->ds_owner);
3450         ASSERT(origin_head->ds_owner);
3451 retry:
3452         /*
3453          * Need exclusive access for the swap. If we're swapping these
3454          * datasets back after an error, we already hold the locks.
3455          */
3456         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3457                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3458         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3459             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3460                 rw_exit(&clone->ds_rwlock);
3461                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3462                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3463                         rw_exit(&origin_head->ds_rwlock);
3464                         goto retry;
3465                 }
3466         }
3467         csa.cds = clone;
3468         csa.ohds = origin_head;
3469         csa.force = force;
3470         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3471             dsl_dataset_clone_swap_check,
3472             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3473         return (error);
3474 }
3475 
3476 /*
3477  * Given a pool name and a dataset object number in that pool,
3478  * return the name of that dataset.
3479  */
3480 int
3481 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3482 {
3483         spa_t *spa;
3484         dsl_pool_t *dp;
3485         dsl_dataset_t *ds;
3486         int error;
3487 
3488         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3489                 return (error);
3490         dp = spa_get_dsl(spa);
3491         rw_enter(&dp->dp_config_rwlock, RW_READER);
3492         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3493                 dsl_dataset_name(ds, buf);
3494                 dsl_dataset_rele(ds, FTAG);
3495         }
3496         rw_exit(&dp->dp_config_rwlock);
3497         spa_close(spa, FTAG);
3498 
3499         return (error);
3500 }
3501 
3502 int
3503 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3504     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3505 {
3506         int error = 0;
3507 
3508         ASSERT3S(asize, >, 0);
3509 
3510         /*
3511          * *ref_rsrv is the portion of asize that will come from any
3512          * unconsumed refreservation space.
3513          */
3514         *ref_rsrv = 0;
3515 
3516         mutex_enter(&ds->ds_lock);
3517         /*
3518          * Make a space adjustment for reserved bytes.
3519          */
3520         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3521                 ASSERT3U(*used, >=,
3522                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3523                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3524                 *ref_rsrv =
3525                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3526         }
3527 
3528         if (!check_quota || ds->ds_quota == 0) {
3529                 mutex_exit(&ds->ds_lock);
3530                 return (0);
3531         }
3532         /*
3533          * If they are requesting more space, and our current estimate
3534          * is over quota, they get to try again unless the actual
3535          * on-disk is over quota and there are no pending changes (which
3536          * may free up space for us).
3537          */
3538         if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3539                 if (inflight > 0 ||
3540                     ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3541                         error = ERESTART;
3542                 else
3543                         error = EDQUOT;
3544         }
3545         mutex_exit(&ds->ds_lock);
3546 
3547         return (error);
3548 }
3549 
3550 /* ARGSUSED */
3551 static int
3552 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3553 {
3554         dsl_dataset_t *ds = arg1;
3555         dsl_prop_setarg_t *psa = arg2;
3556         int err;
3557 
3558         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3559                 return (ENOTSUP);
3560 
3561         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3562                 return (err);
3563 
3564         if (psa->psa_effective_value == 0)
3565                 return (0);
3566 
3567         if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3568             psa->psa_effective_value < ds->ds_reserved)
3569                 return (ENOSPC);
3570 
3571         return (0);
3572 }
3573 
3574 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3575 
3576 void
3577 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3578 {
3579         dsl_dataset_t *ds = arg1;
3580         dsl_prop_setarg_t *psa = arg2;
3581         uint64_t effective_value = psa->psa_effective_value;
3582 
3583         dsl_prop_set_sync(ds, psa, tx);
3584         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3585 
3586         if (ds->ds_quota != effective_value) {
3587                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3588                 ds->ds_quota = effective_value;
3589         }
3590 }
3591 
3592 int
3593 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3594 {
3595         dsl_dataset_t *ds;
3596         dsl_prop_setarg_t psa;
3597         int err;
3598 
3599         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3600 
3601         err = dsl_dataset_hold(dsname, FTAG, &ds);
3602         if (err)
3603                 return (err);
3604 
3605         /*
3606          * If someone removes a file, then tries to set the quota, we
3607          * want to make sure the file freeing takes effect.
3608          */
3609         txg_wait_open(ds->ds_dir->dd_pool, 0);
3610 
3611         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3612             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3613             ds, &psa, 0);
3614 
3615         dsl_dataset_rele(ds, FTAG);
3616         return (err);
3617 }
3618 
3619 static int
3620 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3621 {
3622         dsl_dataset_t *ds = arg1;
3623         dsl_prop_setarg_t *psa = arg2;
3624         uint64_t effective_value;
3625         uint64_t unique;
3626         int err;
3627 
3628         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3629             SPA_VERSION_REFRESERVATION)
3630                 return (ENOTSUP);
3631 
3632         if (dsl_dataset_is_snapshot(ds))
3633                 return (EINVAL);
3634 
3635         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3636                 return (err);
3637 
3638         effective_value = psa->psa_effective_value;
3639 
3640         /*
3641          * If we are doing the preliminary check in open context, the
3642          * space estimates may be inaccurate.
3643          */
3644         if (!dmu_tx_is_syncing(tx))
3645                 return (0);
3646 
3647         mutex_enter(&ds->ds_lock);
3648         if (!DS_UNIQUE_IS_ACCURATE(ds))
3649                 dsl_dataset_recalc_head_uniq(ds);
3650         unique = ds->ds_phys->ds_unique_bytes;
3651         mutex_exit(&ds->ds_lock);
3652 
3653         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3654                 uint64_t delta = MAX(unique, effective_value) -
3655                     MAX(unique, ds->ds_reserved);
3656 
3657                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3658                         return (ENOSPC);
3659                 if (ds->ds_quota > 0 &&
3660                     effective_value > ds->ds_quota)
3661                         return (ENOSPC);
3662         }
3663 
3664         return (0);
3665 }
3666 
3667 static void
3668 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3669 {
3670         dsl_dataset_t *ds = arg1;
3671         dsl_prop_setarg_t *psa = arg2;
3672         uint64_t effective_value = psa->psa_effective_value;
3673         uint64_t unique;
3674         int64_t delta;
3675 
3676         dsl_prop_set_sync(ds, psa, tx);
3677         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3678 
3679         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3680 
3681         mutex_enter(&ds->ds_dir->dd_lock);
3682         mutex_enter(&ds->ds_lock);
3683         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3684         unique = ds->ds_phys->ds_unique_bytes;
3685         delta = MAX(0, (int64_t)(effective_value - unique)) -
3686             MAX(0, (int64_t)(ds->ds_reserved - unique));
3687         ds->ds_reserved = effective_value;
3688         mutex_exit(&ds->ds_lock);
3689 
3690         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3691         mutex_exit(&ds->ds_dir->dd_lock);
3692 }
3693 
3694 int
3695 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3696     uint64_t reservation)
3697 {
3698         dsl_dataset_t *ds;
3699         dsl_prop_setarg_t psa;
3700         int err;
3701 
3702         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3703             &reservation);
3704 
3705         err = dsl_dataset_hold(dsname, FTAG, &ds);
3706         if (err)
3707                 return (err);
3708 
3709         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3710             dsl_dataset_set_reservation_check,
3711             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3712 
3713         dsl_dataset_rele(ds, FTAG);
3714         return (err);
3715 }
3716 
3717 typedef struct zfs_hold_cleanup_arg {
3718         dsl_pool_t *dp;
3719         uint64_t dsobj;
3720         char htag[MAXNAMELEN];
3721 } zfs_hold_cleanup_arg_t;
3722 
3723 static void
3724 dsl_dataset_user_release_onexit(void *arg)
3725 {
3726         zfs_hold_cleanup_arg_t *ca = arg;
3727 
3728         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3729             B_TRUE);
3730         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3731 }
3732 
3733 void
3734 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3735     minor_t minor)
3736 {
3737         zfs_hold_cleanup_arg_t *ca;
3738 
3739         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3740         ca->dp = ds->ds_dir->dd_pool;
3741         ca->dsobj = ds->ds_object;
3742         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3743         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3744             dsl_dataset_user_release_onexit, ca, NULL));
3745 }
3746 
3747 /*
3748  * If you add new checks here, you may need to add
3749  * additional checks to the "temporary" case in
3750  * snapshot_check() in dmu_objset.c.
3751  */
3752 static int
3753 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3754 {
3755         dsl_dataset_t *ds = arg1;
3756         struct dsl_ds_holdarg *ha = arg2;
3757         const char *htag = ha->htag;
3758         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3759         int error = 0;
3760 
3761         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3762                 return (ENOTSUP);
3763 
3764         if (!dsl_dataset_is_snapshot(ds))
3765                 return (EINVAL);
3766 
3767         /* tags must be unique */
3768         mutex_enter(&ds->ds_lock);
3769         if (ds->ds_phys->ds_userrefs_obj) {
3770                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3771                     8, 1, tx);
3772                 if (error == 0)
3773                         error = EEXIST;
3774                 else if (error == ENOENT)
3775                         error = 0;
3776         }
3777         mutex_exit(&ds->ds_lock);
3778 
3779         if (error == 0 && ha->temphold &&
3780             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3781                 error = E2BIG;
3782 
3783         return (error);
3784 }
3785 
3786 void
3787 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3788 {
3789         dsl_dataset_t *ds = arg1;
3790         struct dsl_ds_holdarg *ha = arg2;
3791         const char *htag = ha->htag;
3792         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3793         objset_t *mos = dp->dp_meta_objset;
3794         uint64_t now = gethrestime_sec();
3795         uint64_t zapobj;
3796 
3797         mutex_enter(&ds->ds_lock);
3798         if (ds->ds_phys->ds_userrefs_obj == 0) {
3799                 /*
3800                  * This is the first user hold for this dataset.  Create
3801                  * the userrefs zap object.
3802                  */
3803                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3804                 zapobj = ds->ds_phys->ds_userrefs_obj =
3805                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3806         } else {
3807                 zapobj = ds->ds_phys->ds_userrefs_obj;
3808         }
3809         ds->ds_userrefs++;
3810         mutex_exit(&ds->ds_lock);
3811 
3812         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3813 
3814         if (ha->temphold) {
3815                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3816                     htag, &now, tx));
3817         }
3818 
3819         spa_history_log_internal_ds(ds, "hold", tx,
3820             "tag = %s temp = %d holds now = %llu",
3821             htag, (int)ha->temphold, ds->ds_userrefs);
3822 }
3823 
3824 static int
3825 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3826 {
3827         struct dsl_ds_holdarg *ha = arg;
3828         dsl_dataset_t *ds;
3829         int error;
3830         char *name;
3831 
3832         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3833         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3834         error = dsl_dataset_hold(name, ha->dstg, &ds);
3835         strfree(name);
3836         if (error == 0) {
3837                 ha->gotone = B_TRUE;
3838                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3839                     dsl_dataset_user_hold_sync, ds, ha, 0);
3840         } else if (error == ENOENT && ha->recursive) {
3841                 error = 0;
3842         } else {
3843                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3844         }
3845         return (error);
3846 }
3847 
3848 int
3849 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3850     boolean_t temphold)
3851 {
3852         struct dsl_ds_holdarg *ha;
3853         int error;
3854 
3855         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3856         ha->htag = htag;
3857         ha->temphold = temphold;
3858         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3859             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3860             ds, ha, 0);
3861         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3862 
3863         return (error);
3864 }
3865 
3866 int
3867 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3868     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3869 {
3870         struct dsl_ds_holdarg *ha;
3871         dsl_sync_task_t *dst;
3872         spa_t *spa;
3873         int error;
3874         minor_t minor = 0;
3875 
3876         if (cleanup_fd != -1) {
3877                 /* Currently we only support cleanup-on-exit of tempholds. */
3878                 if (!temphold)
3879                         return (EINVAL);
3880                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3881                 if (error)
3882                         return (error);
3883         }
3884 
3885         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3886 
3887         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3888 
3889         error = spa_open(dsname, &spa, FTAG);
3890         if (error) {
3891                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3892                 if (cleanup_fd != -1)
3893                         zfs_onexit_fd_rele(cleanup_fd);
3894                 return (error);
3895         }
3896 
3897         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3898         ha->htag = htag;
3899         ha->snapname = snapname;
3900         ha->recursive = recursive;
3901         ha->temphold = temphold;
3902 
3903         if (recursive) {
3904                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3905                     ha, DS_FIND_CHILDREN);
3906         } else {
3907                 error = dsl_dataset_user_hold_one(dsname, ha);
3908         }
3909         if (error == 0)
3910                 error = dsl_sync_task_group_wait(ha->dstg);
3911 
3912         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3913             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3914                 dsl_dataset_t *ds = dst->dst_arg1;
3915 
3916                 if (dst->dst_err) {
3917                         dsl_dataset_name(ds, ha->failed);
3918                         *strchr(ha->failed, '@') = '\0';
3919                 } else if (error == 0 && minor != 0 && temphold) {
3920                         /*
3921                          * If this hold is to be released upon process exit,
3922                          * register that action now.
3923                          */
3924                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3925                 }
3926                 dsl_dataset_rele(ds, ha->dstg);
3927         }
3928 
3929         if (error == 0 && recursive && !ha->gotone)
3930                 error = ENOENT;
3931 
3932         if (error)
3933                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3934 
3935         dsl_sync_task_group_destroy(ha->dstg);
3936 
3937         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3938         spa_close(spa, FTAG);
3939         if (cleanup_fd != -1)
3940                 zfs_onexit_fd_rele(cleanup_fd);
3941         return (error);
3942 }
3943 
3944 struct dsl_ds_releasearg {
3945         dsl_dataset_t *ds;
3946         const char *htag;
3947         boolean_t own;          /* do we own or just hold ds? */
3948 };
3949 
3950 static int
3951 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3952     boolean_t *might_destroy)
3953 {
3954         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3955         uint64_t zapobj;
3956         uint64_t tmp;
3957         int error;
3958 
3959         *might_destroy = B_FALSE;
3960 
3961         mutex_enter(&ds->ds_lock);
3962         zapobj = ds->ds_phys->ds_userrefs_obj;
3963         if (zapobj == 0) {
3964                 /* The tag can't possibly exist */
3965                 mutex_exit(&ds->ds_lock);
3966                 return (ESRCH);
3967         }
3968 
3969         /* Make sure the tag exists */
3970         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3971         if (error) {
3972                 mutex_exit(&ds->ds_lock);
3973                 if (error == ENOENT)
3974                         error = ESRCH;
3975                 return (error);
3976         }
3977 
3978         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3979             DS_IS_DEFER_DESTROY(ds))
3980                 *might_destroy = B_TRUE;
3981 
3982         mutex_exit(&ds->ds_lock);
3983         return (0);
3984 }
3985 
3986 static int
3987 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3988 {
3989         struct dsl_ds_releasearg *ra = arg1;
3990         dsl_dataset_t *ds = ra->ds;
3991         boolean_t might_destroy;
3992         int error;
3993 
3994         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3995                 return (ENOTSUP);
3996 
3997         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3998         if (error)
3999                 return (error);
4000 
4001         if (might_destroy) {
4002                 struct dsl_ds_destroyarg dsda = {0};
4003 
4004                 if (dmu_tx_is_syncing(tx)) {
4005                         /*
4006                          * If we're not prepared to remove the snapshot,
4007                          * we can't allow the release to happen right now.
4008                          */
4009                         if (!ra->own)
4010                                 return (EBUSY);
4011                 }
4012                 dsda.ds = ds;
4013                 dsda.releasing = B_TRUE;
4014                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
4015         }
4016 
4017         return (0);
4018 }
4019 
4020 static void
4021 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
4022 {
4023         struct dsl_ds_releasearg *ra = arg1;
4024         dsl_dataset_t *ds = ra->ds;
4025         dsl_pool_t *dp = ds->ds_dir->dd_pool;
4026         objset_t *mos = dp->dp_meta_objset;
4027         uint64_t zapobj;
4028         uint64_t refs;
4029         int error;
4030 
4031         mutex_enter(&ds->ds_lock);
4032         ds->ds_userrefs--;
4033         refs = ds->ds_userrefs;
4034         mutex_exit(&ds->ds_lock);
4035         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
4036         VERIFY(error == 0 || error == ENOENT);
4037         zapobj = ds->ds_phys->ds_userrefs_obj;
4038         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
4039 
4040         spa_history_log_internal_ds(ds, "release", tx,
4041             "tag = %s refs now = %lld", ra->htag, (longlong_t)refs);
4042 
4043         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
4044             DS_IS_DEFER_DESTROY(ds)) {
4045                 struct dsl_ds_destroyarg dsda = {0};
4046 
4047                 ASSERT(ra->own);
4048                 dsda.ds = ds;
4049                 dsda.releasing = B_TRUE;
4050                 /* We already did the destroy_check */
4051                 dsl_dataset_destroy_sync(&dsda, tag, tx);
4052         }
4053 }
4054 
4055 static int
4056 dsl_dataset_user_release_one(const char *dsname, void *arg)
4057 {
4058         struct dsl_ds_holdarg *ha = arg;
4059         struct dsl_ds_releasearg *ra;
4060         dsl_dataset_t *ds;
4061         int error;
4062         void *dtag = ha->dstg;
4063         char *name;
4064         boolean_t own = B_FALSE;
4065         boolean_t might_destroy;
4066 
4067         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
4068         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4069         error = dsl_dataset_hold(name, dtag, &ds);
4070         strfree(name);
4071         if (error == ENOENT && ha->recursive)
4072                 return (0);
4073         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4074         if (error)
4075                 return (error);
4076 
4077         ha->gotone = B_TRUE;
4078 
4079         ASSERT(dsl_dataset_is_snapshot(ds));
4080 
4081         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
4082         if (error) {
4083                 dsl_dataset_rele(ds, dtag);
4084                 return (error);
4085         }
4086 
4087         if (might_destroy) {
4088 #ifdef _KERNEL
4089                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4090                 error = zfs_unmount_snap(name, NULL);
4091                 strfree(name);
4092                 if (error) {
4093                         dsl_dataset_rele(ds, dtag);
4094                         return (error);
4095                 }
4096 #endif
4097                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
4098                         dsl_dataset_rele(ds, dtag);
4099                         return (EBUSY);
4100                 } else {
4101                         own = B_TRUE;
4102                         dsl_dataset_make_exclusive(ds, dtag);
4103                 }
4104         }
4105 
4106         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
4107         ra->ds = ds;
4108         ra->htag = ha->htag;
4109         ra->own = own;
4110         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
4111             dsl_dataset_user_release_sync, ra, dtag, 0);
4112 
4113         return (0);
4114 }
4115 
4116 int
4117 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
4118     boolean_t recursive)
4119 {
4120         struct dsl_ds_holdarg *ha;
4121         dsl_sync_task_t *dst;
4122         spa_t *spa;
4123         int error;
4124 
4125 top:
4126         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4127 
4128         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4129 
4130         error = spa_open(dsname, &spa, FTAG);
4131         if (error) {
4132                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4133                 return (error);
4134         }
4135 
4136         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4137         ha->htag = htag;
4138         ha->snapname = snapname;
4139         ha->recursive = recursive;
4140         if (recursive) {
4141                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4142                     ha, DS_FIND_CHILDREN);
4143         } else {
4144                 error = dsl_dataset_user_release_one(dsname, ha);
4145         }
4146         if (error == 0)
4147                 error = dsl_sync_task_group_wait(ha->dstg);
4148 
4149         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4150             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4151                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
4152                 dsl_dataset_t *ds = ra->ds;
4153 
4154                 if (dst->dst_err)
4155                         dsl_dataset_name(ds, ha->failed);
4156 
4157                 if (ra->own)
4158                         dsl_dataset_disown(ds, ha->dstg);
4159                 else
4160                         dsl_dataset_rele(ds, ha->dstg);
4161 
4162                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4163         }
4164 
4165         if (error == 0 && recursive && !ha->gotone)
4166                 error = ENOENT;
4167 
4168         if (error && error != EBUSY)
4169                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4170 
4171         dsl_sync_task_group_destroy(ha->dstg);
4172         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4173         spa_close(spa, FTAG);
4174 
4175         /*
4176          * We can get EBUSY if we were racing with deferred destroy and
4177          * dsl_dataset_user_release_check() hadn't done the necessary
4178          * open context setup.  We can also get EBUSY if we're racing
4179          * with destroy and that thread is the ds_owner.  Either way
4180          * the busy condition should be transient, and we should retry
4181          * the release operation.
4182          */
4183         if (error == EBUSY)
4184                 goto top;
4185 
4186         return (error);
4187 }
4188 
4189 /*
4190  * Called at spa_load time (with retry == B_FALSE) to release a stale
4191  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4192  */
4193 int
4194 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4195     boolean_t retry)
4196 {
4197         dsl_dataset_t *ds;
4198         char *snap;
4199         char *name;
4200         int namelen;
4201         int error;
4202 
4203         do {
4204                 rw_enter(&dp->dp_config_rwlock, RW_READER);
4205                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4206                 rw_exit(&dp->dp_config_rwlock);
4207                 if (error)
4208                         return (error);
4209                 namelen = dsl_dataset_namelen(ds)+1;
4210                 name = kmem_alloc(namelen, KM_SLEEP);
4211                 dsl_dataset_name(ds, name);
4212                 dsl_dataset_rele(ds, FTAG);
4213 
4214                 snap = strchr(name, '@');
4215                 *snap = '\0';
4216                 ++snap;
4217                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4218                 kmem_free(name, namelen);
4219 
4220                 /*
4221                  * The object can't have been destroyed because we have a hold,
4222                  * but it might have been renamed, resulting in ENOENT.  Retry
4223                  * if we've been requested to do so.
4224                  *
4225                  * It would be nice if we could use the dsobj all the way
4226                  * through and avoid ENOENT entirely.  But we might need to
4227                  * unmount the snapshot, and there's currently no way to lookup
4228                  * a vfsp using a ZFS object id.
4229                  */
4230         } while ((error == ENOENT) && retry);
4231 
4232         return (error);
4233 }
4234 
4235 int
4236 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4237 {
4238         dsl_dataset_t *ds;
4239         int err;
4240 
4241         err = dsl_dataset_hold(dsname, FTAG, &ds);
4242         if (err)
4243                 return (err);
4244 
4245         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4246         if (ds->ds_phys->ds_userrefs_obj != 0) {
4247                 zap_attribute_t *za;
4248                 zap_cursor_t zc;
4249 
4250                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4251                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4252                     ds->ds_phys->ds_userrefs_obj);
4253                     zap_cursor_retrieve(&zc, za) == 0;
4254                     zap_cursor_advance(&zc)) {
4255                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4256                             za->za_first_integer));
4257                 }
4258                 zap_cursor_fini(&zc);
4259                 kmem_free(za, sizeof (zap_attribute_t));
4260         }
4261         dsl_dataset_rele(ds, FTAG);
4262         return (0);
4263 }
4264 
4265 /*
4266  * Note, this function is used as the callback for dmu_objset_find().  We
4267  * always return 0 so that we will continue to find and process
4268  * inconsistent datasets, even if we encounter an error trying to
4269  * process one of them.
4270  */
4271 /* ARGSUSED */
4272 int
4273 dsl_destroy_inconsistent(const char *dsname, void *arg)
4274 {
4275         dsl_dataset_t *ds;
4276 
4277         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4278                 if (DS_IS_INCONSISTENT(ds))
4279                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4280                 else
4281                         dsl_dataset_disown(ds, FTAG);
4282         }
4283         return (0);
4284 }
4285 
4286 /*
4287  * Return (in *usedp) the amount of space written in new that is not
4288  * present in oldsnap.  New may be a snapshot or the head.  Old must be
4289  * a snapshot before new, in new's filesystem (or its origin).  If not then
4290  * fail and return EINVAL.
4291  *
4292  * The written space is calculated by considering two components:  First, we
4293  * ignore any freed space, and calculate the written as new's used space
4294  * minus old's used space.  Next, we add in the amount of space that was freed
4295  * between the two snapshots, thus reducing new's used space relative to old's.
4296  * Specifically, this is the space that was born before old->ds_creation_txg,
4297  * and freed before new (ie. on new's deadlist or a previous deadlist).
4298  *
4299  * space freed                         [---------------------]
4300  * snapshots                       ---O-------O--------O-------O------
4301  *                                         oldsnap            new
4302  */
4303 int
4304 dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4305     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4306 {
4307         int err = 0;
4308         uint64_t snapobj;
4309         dsl_pool_t *dp = new->ds_dir->dd_pool;
4310 
4311         *usedp = 0;
4312         *usedp += new->ds_phys->ds_referenced_bytes;
4313         *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4314 
4315         *compp = 0;
4316         *compp += new->ds_phys->ds_compressed_bytes;
4317         *compp -= oldsnap->ds_phys->ds_compressed_bytes;
4318 
4319         *uncompp = 0;
4320         *uncompp += new->ds_phys->ds_uncompressed_bytes;
4321         *uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4322 
4323         rw_enter(&dp->dp_config_rwlock, RW_READER);
4324         snapobj = new->ds_object;
4325         while (snapobj != oldsnap->ds_object) {
4326                 dsl_dataset_t *snap;
4327                 uint64_t used, comp, uncomp;
4328 
4329                 if (snapobj == new->ds_object) {
4330                         snap = new;
4331                 } else {
4332                         err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4333                         if (err != 0)
4334                                 break;
4335                 }
4336 
4337                 if (snap->ds_phys->ds_prev_snap_txg ==
4338                     oldsnap->ds_phys->ds_creation_txg) {
4339                         /*
4340                          * The blocks in the deadlist can not be born after
4341                          * ds_prev_snap_txg, so get the whole deadlist space,
4342                          * which is more efficient (especially for old-format
4343                          * deadlists).  Unfortunately the deadlist code
4344                          * doesn't have enough information to make this
4345                          * optimization itself.
4346                          */
4347                         dsl_deadlist_space(&snap->ds_deadlist,
4348                             &used, &comp, &uncomp);
4349                 } else {
4350                         dsl_deadlist_space_range(&snap->ds_deadlist,
4351                             0, oldsnap->ds_phys->ds_creation_txg,
4352                             &used, &comp, &uncomp);
4353                 }
4354                 *usedp += used;
4355                 *compp += comp;
4356                 *uncompp += uncomp;
4357 
4358                 /*
4359                  * If we get to the beginning of the chain of snapshots
4360                  * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4361                  * was not a snapshot of/before new.
4362                  */
4363                 snapobj = snap->ds_phys->ds_prev_snap_obj;
4364                 if (snap != new)
4365                         dsl_dataset_rele(snap, FTAG);
4366                 if (snapobj == 0) {
4367                         err = EINVAL;
4368                         break;
4369                 }
4370 
4371         }
4372         rw_exit(&dp->dp_config_rwlock);
4373         return (err);
4374 }
4375 
4376 /*
4377  * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4378  * lastsnap, and all snapshots in between are deleted.
4379  *
4380  * blocks that would be freed            [---------------------------]
4381  * snapshots                       ---O-------O--------O-------O--------O
4382  *                                        firstsnap        lastsnap
4383  *
4384  * This is the set of blocks that were born after the snap before firstsnap,
4385  * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4386  * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4387  * We calculate this by iterating over the relevant deadlists (from the snap
4388  * after lastsnap, backward to the snap after firstsnap), summing up the
4389  * space on the deadlist that was born after the snap before firstsnap.
4390  */
4391 int
4392 dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4393     dsl_dataset_t *lastsnap,
4394     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4395 {
4396         int err = 0;
4397         uint64_t snapobj;
4398         dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4399 
4400         ASSERT(dsl_dataset_is_snapshot(firstsnap));
4401         ASSERT(dsl_dataset_is_snapshot(lastsnap));
4402 
4403         /*
4404          * Check that the snapshots are in the same dsl_dir, and firstsnap
4405          * is before lastsnap.
4406          */
4407         if (firstsnap->ds_dir != lastsnap->ds_dir ||
4408             firstsnap->ds_phys->ds_creation_txg >
4409             lastsnap->ds_phys->ds_creation_txg)
4410                 return (EINVAL);
4411 
4412         *usedp = *compp = *uncompp = 0;
4413 
4414         rw_enter(&dp->dp_config_rwlock, RW_READER);
4415         snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4416         while (snapobj != firstsnap->ds_object) {
4417                 dsl_dataset_t *ds;
4418                 uint64_t used, comp, uncomp;
4419 
4420                 err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4421                 if (err != 0)
4422                         break;
4423 
4424                 dsl_deadlist_space_range(&ds->ds_deadlist,
4425                     firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4426                     &used, &comp, &uncomp);
4427                 *usedp += used;
4428                 *compp += comp;
4429                 *uncompp += uncomp;
4430 
4431                 snapobj = ds->ds_phys->ds_prev_snap_obj;
4432                 ASSERT3U(snapobj, !=, 0);
4433                 dsl_dataset_rele(ds, FTAG);
4434         }
4435         rw_exit(&dp->dp_config_rwlock);
4436         return (err);
4437 }