1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  25  */
  26 
  27 #include <sys/dmu_objset.h>
  28 #include <sys/dsl_dataset.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_prop.h>
  31 #include <sys/dsl_synctask.h>
  32 #include <sys/dmu_traverse.h>
  33 #include <sys/dmu_impl.h>
  34 #include <sys/dmu_tx.h>
  35 #include <sys/arc.h>
  36 #include <sys/zio.h>
  37 #include <sys/zap.h>
  38 #include <sys/zfeature.h>
  39 #include <sys/unique.h>
  40 #include <sys/zfs_context.h>
  41 #include <sys/zfs_ioctl.h>
  42 #include <sys/spa.h>
  43 #include <sys/zfs_znode.h>
  44 #include <sys/zfs_onexit.h>
  45 #include <sys/zvol.h>
  46 #include <sys/dsl_scan.h>
  47 #include <sys/dsl_deadlist.h>
  48 #include "zfs_prop.h"
  49 
  50 static char *dsl_reaper = "the grim reaper";
  51 
  52 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
  53 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
  54 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
  55 
  56 #define SWITCH64(x, y) \
  57         { \
  58                 uint64_t __tmp = (x); \
  59                 (x) = (y); \
  60                 (y) = __tmp; \
  61         }
  62 
  63 #define DS_REF_MAX      (1ULL << 62)
  64 
  65 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
  66 
  67 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
  68 
  69 
  70 /*
  71  * Figure out how much of this delta should be propogated to the dsl_dir
  72  * layer.  If there's a refreservation, that space has already been
  73  * partially accounted for in our ancestors.
  74  */
  75 static int64_t
  76 parent_delta(dsl_dataset_t *ds, int64_t delta)
  77 {
  78         uint64_t old_bytes, new_bytes;
  79 
  80         if (ds->ds_reserved == 0)
  81                 return (delta);
  82 
  83         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
  84         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
  85 
  86         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
  87         return (new_bytes - old_bytes);
  88 }
  89 
  90 void
  91 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
  92 {
  93         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
  94         int compressed = BP_GET_PSIZE(bp);
  95         int uncompressed = BP_GET_UCSIZE(bp);
  96         int64_t delta;
  97 
  98         dprintf_bp(bp, "ds=%p", ds);
  99 
 100         ASSERT(dmu_tx_is_syncing(tx));
 101         /* It could have been compressed away to nothing */
 102         if (BP_IS_HOLE(bp))
 103                 return;
 104         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
 105         ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
 106         if (ds == NULL) {
 107                 dsl_pool_mos_diduse_space(tx->tx_pool,
 108                     used, compressed, uncompressed);
 109                 return;
 110         }
 111         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 112 
 113         mutex_enter(&ds->ds_dir->dd_lock);
 114         mutex_enter(&ds->ds_lock);
 115         delta = parent_delta(ds, used);
 116         ds->ds_phys->ds_referenced_bytes += used;
 117         ds->ds_phys->ds_compressed_bytes += compressed;
 118         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
 119         ds->ds_phys->ds_unique_bytes += used;
 120         mutex_exit(&ds->ds_lock);
 121         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
 122             compressed, uncompressed, tx);
 123         dsl_dir_transfer_space(ds->ds_dir, used - delta,
 124             DD_USED_REFRSRV, DD_USED_HEAD, tx);
 125         mutex_exit(&ds->ds_dir->dd_lock);
 126 }
 127 
 128 int
 129 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
 130     boolean_t async)
 131 {
 132         if (BP_IS_HOLE(bp))
 133                 return (0);
 134 
 135         ASSERT(dmu_tx_is_syncing(tx));
 136         ASSERT(bp->blk_birth <= tx->tx_txg);
 137 
 138         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
 139         int compressed = BP_GET_PSIZE(bp);
 140         int uncompressed = BP_GET_UCSIZE(bp);
 141 
 142         ASSERT(used > 0);
 143         if (ds == NULL) {
 144                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 145                 dsl_pool_mos_diduse_space(tx->tx_pool,
 146                     -used, -compressed, -uncompressed);
 147                 return (used);
 148         }
 149         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
 150 
 151         ASSERT(!dsl_dataset_is_snapshot(ds));
 152         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 153 
 154         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
 155                 int64_t delta;
 156 
 157                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
 158                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 159 
 160                 mutex_enter(&ds->ds_dir->dd_lock);
 161                 mutex_enter(&ds->ds_lock);
 162                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
 163                     !DS_UNIQUE_IS_ACCURATE(ds));
 164                 delta = parent_delta(ds, -used);
 165                 ds->ds_phys->ds_unique_bytes -= used;
 166                 mutex_exit(&ds->ds_lock);
 167                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
 168                     delta, -compressed, -uncompressed, tx);
 169                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
 170                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
 171                 mutex_exit(&ds->ds_dir->dd_lock);
 172         } else {
 173                 dprintf_bp(bp, "putting on dead list: %s", "");
 174                 if (async) {
 175                         /*
 176                          * We are here as part of zio's write done callback,
 177                          * which means we're a zio interrupt thread.  We can't
 178                          * call dsl_deadlist_insert() now because it may block
 179                          * waiting for I/O.  Instead, put bp on the deferred
 180                          * queue and let dsl_pool_sync() finish the job.
 181                          */
 182                         bplist_append(&ds->ds_pending_deadlist, bp);
 183                 } else {
 184                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
 185                 }
 186                 ASSERT3U(ds->ds_prev->ds_object, ==,
 187                     ds->ds_phys->ds_prev_snap_obj);
 188                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
 189                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
 190                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
 191                     ds->ds_object && bp->blk_birth >
 192                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
 193                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
 194                         mutex_enter(&ds->ds_prev->ds_lock);
 195                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
 196                         mutex_exit(&ds->ds_prev->ds_lock);
 197                 }
 198                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
 199                         dsl_dir_transfer_space(ds->ds_dir, used,
 200                             DD_USED_HEAD, DD_USED_SNAP, tx);
 201                 }
 202         }
 203         mutex_enter(&ds->ds_lock);
 204         ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
 205         ds->ds_phys->ds_referenced_bytes -= used;
 206         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
 207         ds->ds_phys->ds_compressed_bytes -= compressed;
 208         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
 209         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
 210         mutex_exit(&ds->ds_lock);
 211 
 212         return (used);
 213 }
 214 
 215 uint64_t
 216 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
 217 {
 218         uint64_t trysnap = 0;
 219 
 220         if (ds == NULL)
 221                 return (0);
 222         /*
 223          * The snapshot creation could fail, but that would cause an
 224          * incorrect FALSE return, which would only result in an
 225          * overestimation of the amount of space that an operation would
 226          * consume, which is OK.
 227          *
 228          * There's also a small window where we could miss a pending
 229          * snapshot, because we could set the sync task in the quiescing
 230          * phase.  So this should only be used as a guess.
 231          */
 232         if (ds->ds_trysnap_txg >
 233             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
 234                 trysnap = ds->ds_trysnap_txg;
 235         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
 236 }
 237 
 238 boolean_t
 239 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
 240     uint64_t blk_birth)
 241 {
 242         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
 243                 return (B_FALSE);
 244 
 245         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
 246 
 247         return (B_TRUE);
 248 }
 249 
 250 /* ARGSUSED */
 251 static void
 252 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
 253 {
 254         dsl_dataset_t *ds = dsv;
 255 
 256         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 257 
 258         unique_remove(ds->ds_fsid_guid);
 259 
 260         if (ds->ds_objset != NULL)
 261                 dmu_objset_evict(ds->ds_objset);
 262 
 263         if (ds->ds_prev) {
 264                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 265                 ds->ds_prev = NULL;
 266         }
 267 
 268         bplist_destroy(&ds->ds_pending_deadlist);
 269         if (db != NULL) {
 270                 dsl_deadlist_close(&ds->ds_deadlist);
 271         } else {
 272                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
 273                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
 274         }
 275         if (ds->ds_dir)
 276                 dsl_dir_close(ds->ds_dir, ds);
 277 
 278         ASSERT(!list_link_active(&ds->ds_synced_link));
 279 
 280         mutex_destroy(&ds->ds_lock);
 281         mutex_destroy(&ds->ds_recvlock);
 282         mutex_destroy(&ds->ds_opening_lock);
 283         rw_destroy(&ds->ds_rwlock);
 284         cv_destroy(&ds->ds_exclusive_cv);
 285 
 286         kmem_free(ds, sizeof (dsl_dataset_t));
 287 }
 288 
 289 static int
 290 dsl_dataset_get_snapname(dsl_dataset_t *ds)
 291 {
 292         dsl_dataset_phys_t *headphys;
 293         int err;
 294         dmu_buf_t *headdbuf;
 295         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 296         objset_t *mos = dp->dp_meta_objset;
 297 
 298         if (ds->ds_snapname[0])
 299                 return (0);
 300         if (ds->ds_phys->ds_next_snap_obj == 0)
 301                 return (0);
 302 
 303         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
 304             FTAG, &headdbuf);
 305         if (err)
 306                 return (err);
 307         headphys = headdbuf->db_data;
 308         err = zap_value_search(dp->dp_meta_objset,
 309             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
 310         dmu_buf_rele(headdbuf, FTAG);
 311         return (err);
 312 }
 313 
 314 static int
 315 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 316 {
 317         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 318         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 319         matchtype_t mt;
 320         int err;
 321 
 322         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 323                 mt = MT_FIRST;
 324         else
 325                 mt = MT_EXACT;
 326 
 327         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 328             value, mt, NULL, 0, NULL);
 329         if (err == ENOTSUP && mt == MT_FIRST)
 330                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
 331         return (err);
 332 }
 333 
 334 static int
 335 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
 336 {
 337         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 338         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 339         matchtype_t mt;
 340         int err;
 341 
 342         dsl_dir_snap_cmtime_update(ds->ds_dir);
 343 
 344         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 345                 mt = MT_FIRST;
 346         else
 347                 mt = MT_EXACT;
 348 
 349         err = zap_remove_norm(mos, snapobj, name, mt, tx);
 350         if (err == ENOTSUP && mt == MT_FIRST)
 351                 err = zap_remove(mos, snapobj, name, tx);
 352 
 353         if (err == 0)
 354                 dsl_snapcount_adjust(ds->ds_dir, tx, -1, B_TRUE);
 355 
 356         return (err);
 357 }
 358 
 359 static int
 360 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 361     dsl_dataset_t **dsp)
 362 {
 363         objset_t *mos = dp->dp_meta_objset;
 364         dmu_buf_t *dbuf;
 365         dsl_dataset_t *ds;
 366         int err;
 367         dmu_object_info_t doi;
 368 
 369         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
 370             dsl_pool_sync_context(dp));
 371 
 372         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
 373         if (err)
 374                 return (err);
 375 
 376         /* Make sure dsobj has the correct object type. */
 377         dmu_object_info_from_db(dbuf, &doi);
 378         if (doi.doi_type != DMU_OT_DSL_DATASET)
 379                 return (EINVAL);
 380 
 381         ds = dmu_buf_get_user(dbuf);
 382         if (ds == NULL) {
 383                 dsl_dataset_t *winner;
 384 
 385                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
 386                 ds->ds_dbuf = dbuf;
 387                 ds->ds_object = dsobj;
 388                 ds->ds_phys = dbuf->db_data;
 389 
 390                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
 391                 mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
 392                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 393                 mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
 394 
 395                 rw_init(&ds->ds_rwlock, 0, 0, 0);
 396                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 397 
 398                 bplist_create(&ds->ds_pending_deadlist);
 399                 dsl_deadlist_open(&ds->ds_deadlist,
 400                     mos, ds->ds_phys->ds_deadlist_obj);
 401 
 402                 list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
 403                     offsetof(dmu_sendarg_t, dsa_link));
 404 
 405                 if (err == 0) {
 406                         err = dsl_dir_open_obj(dp,
 407                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
 408                 }
 409                 if (err) {
 410                         mutex_destroy(&ds->ds_lock);
 411                         mutex_destroy(&ds->ds_recvlock);
 412                         mutex_destroy(&ds->ds_opening_lock);
 413                         rw_destroy(&ds->ds_rwlock);
 414                         cv_destroy(&ds->ds_exclusive_cv);
 415                         bplist_destroy(&ds->ds_pending_deadlist);
 416                         dsl_deadlist_close(&ds->ds_deadlist);
 417                         kmem_free(ds, sizeof (dsl_dataset_t));
 418                         dmu_buf_rele(dbuf, tag);
 419                         return (err);
 420                 }
 421 
 422                 if (!dsl_dataset_is_snapshot(ds)) {
 423                         ds->ds_snapname[0] = '\0';
 424                         if (ds->ds_phys->ds_prev_snap_obj) {
 425                                 err = dsl_dataset_get_ref(dp,
 426                                     ds->ds_phys->ds_prev_snap_obj,
 427                                     ds, &ds->ds_prev);
 428                         }
 429                 } else {
 430                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
 431                                 err = dsl_dataset_get_snapname(ds);
 432                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
 433                                 err = zap_count(
 434                                     ds->ds_dir->dd_pool->dp_meta_objset,
 435                                     ds->ds_phys->ds_userrefs_obj,
 436                                     &ds->ds_userrefs);
 437                         }
 438                 }
 439 
 440                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
 441                         /*
 442                          * In sync context, we're called with either no lock
 443                          * or with the write lock.  If we're not syncing,
 444                          * we're always called with the read lock held.
 445                          */
 446                         boolean_t need_lock =
 447                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
 448                             dsl_pool_sync_context(dp);
 449 
 450                         if (need_lock)
 451                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 452 
 453                         err = dsl_prop_get_ds(ds,
 454                             "refreservation", sizeof (uint64_t), 1,
 455                             &ds->ds_reserved, NULL);
 456                         if (err == 0) {
 457                                 err = dsl_prop_get_ds(ds,
 458                                     "refquota", sizeof (uint64_t), 1,
 459                                     &ds->ds_quota, NULL);
 460                         }
 461 
 462                         if (need_lock)
 463                                 rw_exit(&dp->dp_config_rwlock);
 464                 } else {
 465                         ds->ds_reserved = ds->ds_quota = 0;
 466                 }
 467 
 468                 if (err == 0) {
 469                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
 470                             dsl_dataset_evict);
 471                 }
 472                 if (err || winner) {
 473                         bplist_destroy(&ds->ds_pending_deadlist);
 474                         dsl_deadlist_close(&ds->ds_deadlist);
 475                         if (ds->ds_prev)
 476                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 477                         dsl_dir_close(ds->ds_dir, ds);
 478                         mutex_destroy(&ds->ds_lock);
 479                         mutex_destroy(&ds->ds_recvlock);
 480                         mutex_destroy(&ds->ds_opening_lock);
 481                         rw_destroy(&ds->ds_rwlock);
 482                         cv_destroy(&ds->ds_exclusive_cv);
 483                         kmem_free(ds, sizeof (dsl_dataset_t));
 484                         if (err) {
 485                                 dmu_buf_rele(dbuf, tag);
 486                                 return (err);
 487                         }
 488                         ds = winner;
 489                 } else {
 490                         ds->ds_fsid_guid =
 491                             unique_insert(ds->ds_phys->ds_fsid_guid);
 492                 }
 493         }
 494         ASSERT3P(ds->ds_dbuf, ==, dbuf);
 495         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
 496         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
 497             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
 498             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
 499         mutex_enter(&ds->ds_lock);
 500         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 501                 mutex_exit(&ds->ds_lock);
 502                 dmu_buf_rele(ds->ds_dbuf, tag);
 503                 return (ENOENT);
 504         }
 505         mutex_exit(&ds->ds_lock);
 506         *dsp = ds;
 507         return (0);
 508 }
 509 
 510 static int
 511 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
 512 {
 513         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 514 
 515         /*
 516          * In syncing context we don't want the rwlock lock: there
 517          * may be an existing writer waiting for sync phase to
 518          * finish.  We don't need to worry about such writers, since
 519          * sync phase is single-threaded, so the writer can't be
 520          * doing anything while we are active.
 521          */
 522         if (dsl_pool_sync_context(dp)) {
 523                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 524                 return (0);
 525         }
 526 
 527         /*
 528          * Normal users will hold the ds_rwlock as a READER until they
 529          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
 530          * drop their READER lock after they set the ds_owner field.
 531          *
 532          * If the dataset is being destroyed, the destroy thread will
 533          * obtain a WRITER lock for exclusive access after it's done its
 534          * open-context work and then change the ds_owner to
 535          * dsl_reaper once destruction is assured.  So threads
 536          * may block here temporarily, until the "destructability" of
 537          * the dataset is determined.
 538          */
 539         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
 540         mutex_enter(&ds->ds_lock);
 541         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
 542                 rw_exit(&dp->dp_config_rwlock);
 543                 cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
 544                 if (DSL_DATASET_IS_DESTROYED(ds)) {
 545                         mutex_exit(&ds->ds_lock);
 546                         dsl_dataset_drop_ref(ds, tag);
 547                         rw_enter(&dp->dp_config_rwlock, RW_READER);
 548                         return (ENOENT);
 549                 }
 550                 /*
 551                  * The dp_config_rwlock lives above the ds_lock. And
 552                  * we need to check DSL_DATASET_IS_DESTROYED() while
 553                  * holding the ds_lock, so we have to drop and reacquire
 554                  * the ds_lock here.
 555                  */
 556                 mutex_exit(&ds->ds_lock);
 557                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 558                 mutex_enter(&ds->ds_lock);
 559         }
 560         mutex_exit(&ds->ds_lock);
 561         return (0);
 562 }
 563 
 564 int
 565 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 566     dsl_dataset_t **dsp)
 567 {
 568         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
 569 
 570         if (err)
 571                 return (err);
 572         return (dsl_dataset_hold_ref(*dsp, tag));
 573 }
 574 
 575 int
 576 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
 577     void *tag, dsl_dataset_t **dsp)
 578 {
 579         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
 580         if (err)
 581                 return (err);
 582         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 583                 dsl_dataset_rele(*dsp, tag);
 584                 *dsp = NULL;
 585                 return (EBUSY);
 586         }
 587         return (0);
 588 }
 589 
 590 int
 591 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 592 {
 593         dsl_dir_t *dd;
 594         dsl_pool_t *dp;
 595         const char *snapname;
 596         uint64_t obj;
 597         int err = 0;
 598 
 599         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 600         if (err)
 601                 return (err);
 602 
 603         dp = dd->dd_pool;
 604         obj = dd->dd_phys->dd_head_dataset_obj;
 605         rw_enter(&dp->dp_config_rwlock, RW_READER);
 606         if (obj)
 607                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
 608         else
 609                 err = ENOENT;
 610         if (err)
 611                 goto out;
 612 
 613         err = dsl_dataset_hold_ref(*dsp, tag);
 614 
 615         /* we may be looking for a snapshot */
 616         if (err == 0 && snapname != NULL) {
 617                 dsl_dataset_t *ds = NULL;
 618 
 619                 if (*snapname++ != '@') {
 620                         dsl_dataset_rele(*dsp, tag);
 621                         err = ENOENT;
 622                         goto out;
 623                 }
 624 
 625                 dprintf("looking for snapshot '%s'\n", snapname);
 626                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
 627                 if (err == 0)
 628                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
 629                 dsl_dataset_rele(*dsp, tag);
 630 
 631                 ASSERT3U((err == 0), ==, (ds != NULL));
 632 
 633                 if (ds) {
 634                         mutex_enter(&ds->ds_lock);
 635                         if (ds->ds_snapname[0] == 0)
 636                                 (void) strlcpy(ds->ds_snapname, snapname,
 637                                     sizeof (ds->ds_snapname));
 638                         mutex_exit(&ds->ds_lock);
 639                         err = dsl_dataset_hold_ref(ds, tag);
 640                         *dsp = err ? NULL : ds;
 641                 }
 642         }
 643 out:
 644         rw_exit(&dp->dp_config_rwlock);
 645         dsl_dir_close(dd, FTAG);
 646         return (err);
 647 }
 648 
 649 int
 650 dsl_dataset_own(const char *name, boolean_t inconsistentok,
 651     void *tag, dsl_dataset_t **dsp)
 652 {
 653         int err = dsl_dataset_hold(name, tag, dsp);
 654         if (err)
 655                 return (err);
 656         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 657                 dsl_dataset_rele(*dsp, tag);
 658                 return (EBUSY);
 659         }
 660         return (0);
 661 }
 662 
 663 void
 664 dsl_dataset_name(dsl_dataset_t *ds, char *name)
 665 {
 666         if (ds == NULL) {
 667                 (void) strcpy(name, "mos");
 668         } else {
 669                 dsl_dir_name(ds->ds_dir, name);
 670                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 671                 if (ds->ds_snapname[0]) {
 672                         (void) strcat(name, "@");
 673                         /*
 674                          * We use a "recursive" mutex so that we
 675                          * can call dprintf_ds() with ds_lock held.
 676                          */
 677                         if (!MUTEX_HELD(&ds->ds_lock)) {
 678                                 mutex_enter(&ds->ds_lock);
 679                                 (void) strcat(name, ds->ds_snapname);
 680                                 mutex_exit(&ds->ds_lock);
 681                         } else {
 682                                 (void) strcat(name, ds->ds_snapname);
 683                         }
 684                 }
 685         }
 686 }
 687 
 688 static int
 689 dsl_dataset_namelen(dsl_dataset_t *ds)
 690 {
 691         int result;
 692 
 693         if (ds == NULL) {
 694                 result = 3;     /* "mos" */
 695         } else {
 696                 result = dsl_dir_namelen(ds->ds_dir);
 697                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 698                 if (ds->ds_snapname[0]) {
 699                         ++result;       /* adding one for the @-sign */
 700                         if (!MUTEX_HELD(&ds->ds_lock)) {
 701                                 mutex_enter(&ds->ds_lock);
 702                                 result += strlen(ds->ds_snapname);
 703                                 mutex_exit(&ds->ds_lock);
 704                         } else {
 705                                 result += strlen(ds->ds_snapname);
 706                         }
 707                 }
 708         }
 709 
 710         return (result);
 711 }
 712 
 713 void
 714 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 715 {
 716         dmu_buf_rele(ds->ds_dbuf, tag);
 717 }
 718 
 719 void
 720 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 721 {
 722         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
 723                 rw_exit(&ds->ds_rwlock);
 724         }
 725         dsl_dataset_drop_ref(ds, tag);
 726 }
 727 
 728 void
 729 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
 730 {
 731         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
 732             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
 733 
 734         mutex_enter(&ds->ds_lock);
 735         ds->ds_owner = NULL;
 736         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
 737                 rw_exit(&ds->ds_rwlock);
 738                 cv_broadcast(&ds->ds_exclusive_cv);
 739         }
 740         mutex_exit(&ds->ds_lock);
 741         if (ds->ds_dbuf)
 742                 dsl_dataset_drop_ref(ds, tag);
 743         else
 744                 dsl_dataset_evict(NULL, ds);
 745 }
 746 
 747 boolean_t
 748 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
 749 {
 750         boolean_t gotit = FALSE;
 751 
 752         mutex_enter(&ds->ds_lock);
 753         if (ds->ds_owner == NULL &&
 754             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
 755                 ds->ds_owner = tag;
 756                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
 757                         rw_exit(&ds->ds_rwlock);
 758                 gotit = TRUE;
 759         }
 760         mutex_exit(&ds->ds_lock);
 761         return (gotit);
 762 }
 763 
 764 void
 765 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
 766 {
 767         ASSERT3P(owner, ==, ds->ds_owner);
 768         if (!RW_WRITE_HELD(&ds->ds_rwlock))
 769                 rw_enter(&ds->ds_rwlock, RW_WRITER);
 770 }
 771 
 772 uint64_t
 773 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
 774     uint64_t flags, dmu_tx_t *tx)
 775 {
 776         dsl_pool_t *dp = dd->dd_pool;
 777         dmu_buf_t *dbuf;
 778         dsl_dataset_phys_t *dsphys;
 779         uint64_t dsobj;
 780         objset_t *mos = dp->dp_meta_objset;
 781 
 782         if (origin == NULL)
 783                 origin = dp->dp_origin_snap;
 784 
 785         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
 786         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
 787         ASSERT(dmu_tx_is_syncing(tx));
 788         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 789 
 790         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
 791             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
 792         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 793         dmu_buf_will_dirty(dbuf, tx);
 794         dsphys = dbuf->db_data;
 795         bzero(dsphys, sizeof (dsl_dataset_phys_t));
 796         dsphys->ds_dir_obj = dd->dd_object;
 797         dsphys->ds_flags = flags;
 798         dsphys->ds_fsid_guid = unique_create();
 799         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
 800             sizeof (dsphys->ds_guid));
 801         dsphys->ds_snapnames_zapobj =
 802             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
 803             DMU_OT_NONE, 0, tx);
 804         dsphys->ds_creation_time = gethrestime_sec();
 805         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
 806 
 807         if (origin == NULL) {
 808                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
 809         } else {
 810                 dsl_dataset_t *ohds;
 811 
 812                 dsphys->ds_prev_snap_obj = origin->ds_object;
 813                 dsphys->ds_prev_snap_txg =
 814                     origin->ds_phys->ds_creation_txg;
 815                 dsphys->ds_referenced_bytes =
 816                     origin->ds_phys->ds_referenced_bytes;
 817                 dsphys->ds_compressed_bytes =
 818                     origin->ds_phys->ds_compressed_bytes;
 819                 dsphys->ds_uncompressed_bytes =
 820                     origin->ds_phys->ds_uncompressed_bytes;
 821                 dsphys->ds_bp = origin->ds_phys->ds_bp;
 822                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
 823 
 824                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
 825                 origin->ds_phys->ds_num_children++;
 826 
 827                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 828                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
 829                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
 830                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
 831                 dsl_dataset_rele(ohds, FTAG);
 832 
 833                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
 834                         if (origin->ds_phys->ds_next_clones_obj == 0) {
 835                                 origin->ds_phys->ds_next_clones_obj =
 836                                     zap_create(mos,
 837                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 838                         }
 839                         VERIFY(0 == zap_add_int(mos,
 840                             origin->ds_phys->ds_next_clones_obj,
 841                             dsobj, tx));
 842                 }
 843 
 844                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
 845                 dd->dd_phys->dd_origin_obj = origin->ds_object;
 846                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
 847                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
 848                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 849                                 origin->ds_dir->dd_phys->dd_clones =
 850                                     zap_create(mos,
 851                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 852                         }
 853                         VERIFY3U(0, ==, zap_add_int(mos,
 854                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 855                 }
 856         }
 857 
 858         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
 859                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
 860 
 861         dmu_buf_rele(dbuf, FTAG);
 862 
 863         dmu_buf_will_dirty(dd->dd_dbuf, tx);
 864         dd->dd_phys->dd_head_dataset_obj = dsobj;
 865 
 866         return (dsobj);
 867 }
 868 
 869 uint64_t
 870 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
 871     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
 872 {
 873         dsl_pool_t *dp = pdd->dd_pool;
 874         uint64_t dsobj, ddobj;
 875         dsl_dir_t *dd;
 876 
 877         ASSERT(lastname[0] != '@');
 878 
 879         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
 880         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
 881 
 882         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
 883 
 884         dsl_deleg_set_create_perms(dd, tx, cr);
 885 
 886         dsl_dir_close(dd, FTAG);
 887 
 888         /*
 889          * If we are creating a clone, make sure we zero out any stale
 890          * data from the origin snapshots zil header.
 891          */
 892         if (origin != NULL) {
 893                 dsl_dataset_t *ds;
 894                 objset_t *os;
 895 
 896                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 897                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
 898                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
 899                 dsl_dataset_dirty(ds, tx);
 900                 dsl_dataset_rele(ds, FTAG);
 901         }
 902 
 903         return (dsobj);
 904 }
 905 
 906 /*
 907  * The snapshots must all be in the same pool.
 908  */
 909 int
 910 dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer,
 911     nvlist_t *errlist)
 912 {
 913         int err;
 914         dsl_sync_task_t *dst;
 915         spa_t *spa;
 916         nvpair_t *pair;
 917         dsl_sync_task_group_t *dstg;
 918 
 919         pair = nvlist_next_nvpair(snaps, NULL);
 920         if (pair == NULL)
 921                 return (0);
 922 
 923         err = spa_open(nvpair_name(pair), &spa, FTAG);
 924         if (err)
 925                 return (err);
 926         dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 927 
 928         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
 929             pair = nvlist_next_nvpair(snaps, pair)) {
 930                 dsl_dataset_t *ds;
 931 
 932                 err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
 933                 if (err == 0) {
 934                         struct dsl_ds_destroyarg *dsda;
 935 
 936                         dsl_dataset_make_exclusive(ds, dstg);
 937                         dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
 938                             KM_SLEEP);
 939                         dsda->ds = ds;
 940                         dsda->defer = defer;
 941                         dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 942                             dsl_dataset_destroy_sync, dsda, dstg, 0);
 943                 } else if (err == ENOENT) {
 944                         err = 0;
 945                 } else {
 946                         fnvlist_add_int32(errlist, nvpair_name(pair), err);
 947                         break;
 948                 }
 949         }
 950 
 951         if (err == 0)
 952                 err = dsl_sync_task_group_wait(dstg);
 953 
 954         for (dst = list_head(&dstg->dstg_tasks); dst;
 955             dst = list_next(&dstg->dstg_tasks, dst)) {
 956                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
 957                 dsl_dataset_t *ds = dsda->ds;
 958 
 959                 /*
 960                  * Return the snapshots that triggered the error.
 961                  */
 962                 if (dst->dst_err != 0) {
 963                         char name[ZFS_MAXNAMELEN];
 964                         dsl_dataset_name(ds, name);
 965                         fnvlist_add_int32(errlist, name, dst->dst_err);
 966                 }
 967                 ASSERT3P(dsda->rm_origin, ==, NULL);
 968                 dsl_dataset_disown(ds, dstg);
 969                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
 970         }
 971 
 972         dsl_sync_task_group_destroy(dstg);
 973         spa_close(spa, FTAG);
 974         return (err);
 975 
 976 }
 977 
 978 static boolean_t
 979 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
 980 {
 981         boolean_t might_destroy = B_FALSE;
 982 
 983         mutex_enter(&ds->ds_lock);
 984         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
 985             DS_IS_DEFER_DESTROY(ds))
 986                 might_destroy = B_TRUE;
 987         mutex_exit(&ds->ds_lock);
 988 
 989         return (might_destroy);
 990 }
 991 
 992 /*
 993  * If we're removing a clone, and these three conditions are true:
 994  *      1) the clone's origin has no other children
 995  *      2) the clone's origin has no user references
 996  *      3) the clone's origin has been marked for deferred destruction
 997  * Then, prepare to remove the origin as part of this sync task group.
 998  */
 999 static int
1000 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1001 {
1002         dsl_dataset_t *ds = dsda->ds;
1003         dsl_dataset_t *origin = ds->ds_prev;
1004 
1005         if (dsl_dataset_might_destroy_origin(origin)) {
1006                 char *name;
1007                 int namelen;
1008                 int error;
1009 
1010                 namelen = dsl_dataset_namelen(origin) + 1;
1011                 name = kmem_alloc(namelen, KM_SLEEP);
1012                 dsl_dataset_name(origin, name);
1013 #ifdef _KERNEL
1014                 error = zfs_unmount_snap(name, NULL);
1015                 if (error) {
1016                         kmem_free(name, namelen);
1017                         return (error);
1018                 }
1019 #endif
1020                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1021                 kmem_free(name, namelen);
1022                 if (error)
1023                         return (error);
1024                 dsda->rm_origin = origin;
1025                 dsl_dataset_make_exclusive(origin, tag);
1026         }
1027 
1028         return (0);
1029 }
1030 
1031 /*
1032  * ds must be opened as OWNER.  On return (whether successful or not),
1033  * ds will be closed and caller can no longer dereference it.
1034  */
1035 int
1036 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1037 {
1038         int err;
1039         dsl_sync_task_group_t *dstg;
1040         objset_t *os;
1041         dsl_dir_t *dd;
1042         uint64_t obj;
1043         struct dsl_ds_destroyarg dsda = { 0 };
1044 
1045         dsda.ds = ds;
1046 
1047         if (dsl_dataset_is_snapshot(ds)) {
1048                 /* Destroying a snapshot is simpler */
1049                 dsl_dataset_make_exclusive(ds, tag);
1050 
1051                 dsda.defer = defer;
1052                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1053                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1054                     &dsda, tag, 0);
1055                 ASSERT3P(dsda.rm_origin, ==, NULL);
1056                 goto out;
1057         } else if (defer) {
1058                 err = EINVAL;
1059                 goto out;
1060         }
1061 
1062         dd = ds->ds_dir;
1063 
1064         if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1065             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1066                 /*
1067                  * Check for errors and mark this ds as inconsistent, in
1068                  * case we crash while freeing the objects.
1069                  */
1070                 err = dsl_sync_task_do(dd->dd_pool,
1071                     dsl_dataset_destroy_begin_check,
1072                     dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1073                 if (err)
1074                         goto out;
1075 
1076                 err = dmu_objset_from_ds(ds, &os);
1077                 if (err)
1078                         goto out;
1079 
1080                 /*
1081                  * Remove all objects while in the open context so that
1082                  * there is less work to do in the syncing context.
1083                  */
1084                 for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1085                     ds->ds_phys->ds_prev_snap_txg)) {
1086                         /*
1087                          * Ignore errors, if there is not enough disk space
1088                          * we will deal with it in dsl_dataset_destroy_sync().
1089                          */
1090                         (void) dmu_free_object(os, obj);
1091                 }
1092                 if (err != ESRCH)
1093                         goto out;
1094 
1095                 /*
1096                  * Sync out all in-flight IO.
1097                  */
1098                 txg_wait_synced(dd->dd_pool, 0);
1099 
1100                 /*
1101                  * If we managed to free all the objects in open
1102                  * context, the user space accounting should be zero.
1103                  */
1104                 if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1105                     dmu_objset_userused_enabled(os)) {
1106                         uint64_t count;
1107 
1108                         ASSERT(zap_count(os, DMU_USERUSED_OBJECT,
1109                             &count) != 0 || count == 0);
1110                         ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT,
1111                             &count) != 0 || count == 0);
1112                 }
1113         }
1114 
1115         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1116         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1117         rw_exit(&dd->dd_pool->dp_config_rwlock);
1118 
1119         if (err)
1120                 goto out;
1121 
1122         /*
1123          * Blow away the dsl_dir + head dataset.
1124          */
1125         dsl_dataset_make_exclusive(ds, tag);
1126         /*
1127          * If we're removing a clone, we might also need to remove its
1128          * origin.
1129          */
1130         do {
1131                 dsda.need_prep = B_FALSE;
1132                 if (dsl_dir_is_clone(dd)) {
1133                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1134                         if (err) {
1135                                 dsl_dir_close(dd, FTAG);
1136                                 goto out;
1137                         }
1138                 }
1139 
1140                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1141                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1142                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1143                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1144                     dsl_dir_destroy_sync, dd, tag, 0);
1145                 err = dsl_sync_task_group_wait(dstg);
1146                 dsl_sync_task_group_destroy(dstg);
1147 
1148                 /*
1149                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1150                  * on the origin snap, in which case we can get EBUSY if we
1151                  * needed to destroy the origin snap but were not ready to
1152                  * do so.
1153                  */
1154                 if (dsda.need_prep) {
1155                         ASSERT(err == EBUSY);
1156                         ASSERT(dsl_dir_is_clone(dd));
1157                         ASSERT(dsda.rm_origin == NULL);
1158                 }
1159         } while (dsda.need_prep);
1160 
1161         if (dsda.rm_origin != NULL)
1162                 dsl_dataset_disown(dsda.rm_origin, tag);
1163 
1164         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1165         if (err)
1166                 dsl_dir_close(dd, FTAG);
1167 out:
1168         dsl_dataset_disown(ds, tag);
1169         return (err);
1170 }
1171 
1172 blkptr_t *
1173 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1174 {
1175         return (&ds->ds_phys->ds_bp);
1176 }
1177 
1178 void
1179 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1180 {
1181         ASSERT(dmu_tx_is_syncing(tx));
1182         /* If it's the meta-objset, set dp_meta_rootbp */
1183         if (ds == NULL) {
1184                 tx->tx_pool->dp_meta_rootbp = *bp;
1185         } else {
1186                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1187                 ds->ds_phys->ds_bp = *bp;
1188         }
1189 }
1190 
1191 spa_t *
1192 dsl_dataset_get_spa(dsl_dataset_t *ds)
1193 {
1194         return (ds->ds_dir->dd_pool->dp_spa);
1195 }
1196 
1197 void
1198 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1199 {
1200         dsl_pool_t *dp;
1201 
1202         if (ds == NULL) /* this is the meta-objset */
1203                 return;
1204 
1205         ASSERT(ds->ds_objset != NULL);
1206 
1207         if (ds->ds_phys->ds_next_snap_obj != 0)
1208                 panic("dirtying snapshot!");
1209 
1210         dp = ds->ds_dir->dd_pool;
1211 
1212         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1213                 /* up the hold count until we can be written out */
1214                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1215         }
1216 }
1217 
1218 boolean_t
1219 dsl_dataset_is_dirty(dsl_dataset_t *ds)
1220 {
1221         for (int t = 0; t < TXG_SIZE; t++) {
1222                 if (txg_list_member(&ds->ds_dir->dd_pool->dp_dirty_datasets,
1223                     ds, t))
1224                         return (B_TRUE);
1225         }
1226         return (B_FALSE);
1227 }
1228 
1229 /*
1230  * The unique space in the head dataset can be calculated by subtracting
1231  * the space used in the most recent snapshot, that is still being used
1232  * in this file system, from the space currently in use.  To figure out
1233  * the space in the most recent snapshot still in use, we need to take
1234  * the total space used in the snapshot and subtract out the space that
1235  * has been freed up since the snapshot was taken.
1236  */
1237 static void
1238 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1239 {
1240         uint64_t mrs_used;
1241         uint64_t dlused, dlcomp, dluncomp;
1242 
1243         ASSERT(!dsl_dataset_is_snapshot(ds));
1244 
1245         if (ds->ds_phys->ds_prev_snap_obj != 0)
1246                 mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1247         else
1248                 mrs_used = 0;
1249 
1250         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1251 
1252         ASSERT3U(dlused, <=, mrs_used);
1253         ds->ds_phys->ds_unique_bytes =
1254             ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1255 
1256         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1257             SPA_VERSION_UNIQUE_ACCURATE)
1258                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1259 }
1260 
1261 struct killarg {
1262         dsl_dataset_t *ds;
1263         dmu_tx_t *tx;
1264 };
1265 
1266 /* ARGSUSED */
1267 static int
1268 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1269     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1270 {
1271         struct killarg *ka = arg;
1272         dmu_tx_t *tx = ka->tx;
1273 
1274         if (bp == NULL)
1275                 return (0);
1276 
1277         if (zb->zb_level == ZB_ZIL_LEVEL) {
1278                 ASSERT(zilog != NULL);
1279                 /*
1280                  * It's a block in the intent log.  It has no
1281                  * accounting, so just free it.
1282                  */
1283                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1284         } else {
1285                 ASSERT(zilog == NULL);
1286                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1287                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1288         }
1289 
1290         return (0);
1291 }
1292 
1293 /* ARGSUSED */
1294 static int
1295 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1296 {
1297         dsl_dataset_t *ds = arg1;
1298         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1299         uint64_t count;
1300         int err;
1301 
1302         /*
1303          * Can't delete a head dataset if there are snapshots of it.
1304          * (Except if the only snapshots are from the branch we cloned
1305          * from.)
1306          */
1307         if (ds->ds_prev != NULL &&
1308             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1309                 return (EBUSY);
1310 
1311         /*
1312          * This is really a dsl_dir thing, but check it here so that
1313          * we'll be less likely to leave this dataset inconsistent &
1314          * nearly destroyed.
1315          */
1316         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1317         if (err)
1318                 return (err);
1319         if (count != 0)
1320                 return (EEXIST);
1321 
1322         return (0);
1323 }
1324 
1325 /* ARGSUSED */
1326 static void
1327 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1328 {
1329         dsl_dataset_t *ds = arg1;
1330 
1331         /* Mark it as inconsistent on-disk, in case we crash */
1332         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1333         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1334 
1335         spa_history_log_internal_ds(ds, "destroy begin", tx, "");
1336 }
1337 
1338 static int
1339 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1340     dmu_tx_t *tx)
1341 {
1342         dsl_dataset_t *ds = dsda->ds;
1343         dsl_dataset_t *ds_prev = ds->ds_prev;
1344 
1345         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1346                 struct dsl_ds_destroyarg ndsda = {0};
1347 
1348                 /*
1349                  * If we're not prepared to remove the origin, don't remove
1350                  * the clone either.
1351                  */
1352                 if (dsda->rm_origin == NULL) {
1353                         dsda->need_prep = B_TRUE;
1354                         return (EBUSY);
1355                 }
1356 
1357                 ndsda.ds = ds_prev;
1358                 ndsda.is_origin_rm = B_TRUE;
1359                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1360         }
1361 
1362         /*
1363          * If we're not going to remove the origin after all,
1364          * undo the open context setup.
1365          */
1366         if (dsda->rm_origin != NULL) {
1367                 dsl_dataset_disown(dsda->rm_origin, tag);
1368                 dsda->rm_origin = NULL;
1369         }
1370 
1371         return (0);
1372 }
1373 
1374 /*
1375  * If you add new checks here, you may need to add
1376  * additional checks to the "temporary" case in
1377  * snapshot_check() in dmu_objset.c.
1378  */
1379 /* ARGSUSED */
1380 int
1381 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1382 {
1383         struct dsl_ds_destroyarg *dsda = arg1;
1384         dsl_dataset_t *ds = dsda->ds;
1385 
1386         /* we have an owner hold, so noone else can destroy us */
1387         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1388 
1389         /*
1390          * Only allow deferred destroy on pools that support it.
1391          * NOTE: deferred destroy is only supported on snapshots.
1392          */
1393         if (dsda->defer) {
1394                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1395                     SPA_VERSION_USERREFS)
1396                         return (ENOTSUP);
1397                 ASSERT(dsl_dataset_is_snapshot(ds));
1398                 return (0);
1399         }
1400 
1401         /*
1402          * Can't delete a head dataset if there are snapshots of it.
1403          * (Except if the only snapshots are from the branch we cloned
1404          * from.)
1405          */
1406         if (ds->ds_prev != NULL &&
1407             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1408                 return (EBUSY);
1409 
1410         /*
1411          * If we made changes this txg, traverse_dsl_dataset won't find
1412          * them.  Try again.
1413          */
1414         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1415                 return (EAGAIN);
1416 
1417         if (dsl_dataset_is_snapshot(ds)) {
1418                 /*
1419                  * If this snapshot has an elevated user reference count,
1420                  * we can't destroy it yet.
1421                  */
1422                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1423                         return (EBUSY);
1424 
1425                 mutex_enter(&ds->ds_lock);
1426                 /*
1427                  * Can't delete a branch point. However, if we're destroying
1428                  * a clone and removing its origin due to it having a user
1429                  * hold count of 0 and having been marked for deferred destroy,
1430                  * it's OK for the origin to have a single clone.
1431                  */
1432                 if (ds->ds_phys->ds_num_children >
1433                     (dsda->is_origin_rm ? 2 : 1)) {
1434                         mutex_exit(&ds->ds_lock);
1435                         return (EEXIST);
1436                 }
1437                 mutex_exit(&ds->ds_lock);
1438         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1439                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1440         }
1441 
1442         /* XXX we should do some i/o error checking... */
1443         return (0);
1444 }
1445 
1446 struct refsarg {
1447         kmutex_t lock;
1448         boolean_t gone;
1449         kcondvar_t cv;
1450 };
1451 
1452 /* ARGSUSED */
1453 static void
1454 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1455 {
1456         struct refsarg *arg = argv;
1457 
1458         mutex_enter(&arg->lock);
1459         arg->gone = TRUE;
1460         cv_signal(&arg->cv);
1461         mutex_exit(&arg->lock);
1462 }
1463 
1464 static void
1465 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1466 {
1467         struct refsarg arg;
1468 
1469         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1470         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1471         arg.gone = FALSE;
1472         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1473             dsl_dataset_refs_gone);
1474         dmu_buf_rele(ds->ds_dbuf, tag);
1475         mutex_enter(&arg.lock);
1476         while (!arg.gone)
1477                 cv_wait(&arg.cv, &arg.lock);
1478         ASSERT(arg.gone);
1479         mutex_exit(&arg.lock);
1480         ds->ds_dbuf = NULL;
1481         ds->ds_phys = NULL;
1482         mutex_destroy(&arg.lock);
1483         cv_destroy(&arg.cv);
1484 }
1485 
1486 static void
1487 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1488 {
1489         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1490         uint64_t count;
1491         int err;
1492 
1493         ASSERT(ds->ds_phys->ds_num_children >= 2);
1494         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1495         /*
1496          * The err should not be ENOENT, but a bug in a previous version
1497          * of the code could cause upgrade_clones_cb() to not set
1498          * ds_next_snap_obj when it should, leading to a missing entry.
1499          * If we knew that the pool was created after
1500          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1501          * ENOENT.  However, at least we can check that we don't have
1502          * too many entries in the next_clones_obj even after failing to
1503          * remove this one.
1504          */
1505         if (err != ENOENT) {
1506                 VERIFY0(err);
1507         }
1508         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1509             &count));
1510         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1511 }
1512 
1513 static void
1514 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1515 {
1516         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1517         zap_cursor_t zc;
1518         zap_attribute_t za;
1519 
1520         /*
1521          * If it is the old version, dd_clones doesn't exist so we can't
1522          * find the clones, but deadlist_remove_key() is a no-op so it
1523          * doesn't matter.
1524          */
1525         if (ds->ds_dir->dd_phys->dd_clones == 0)
1526                 return;
1527 
1528         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1529             zap_cursor_retrieve(&zc, &za) == 0;
1530             zap_cursor_advance(&zc)) {
1531                 dsl_dataset_t *clone;
1532 
1533                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1534                     za.za_first_integer, FTAG, &clone));
1535                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1536                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1537                             mintxg, tx);
1538                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1539                 }
1540                 dsl_dataset_rele(clone, FTAG);
1541         }
1542         zap_cursor_fini(&zc);
1543 }
1544 
1545 struct process_old_arg {
1546         dsl_dataset_t *ds;
1547         dsl_dataset_t *ds_prev;
1548         boolean_t after_branch_point;
1549         zio_t *pio;
1550         uint64_t used, comp, uncomp;
1551 };
1552 
1553 static int
1554 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1555 {
1556         struct process_old_arg *poa = arg;
1557         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1558 
1559         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1560                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1561                 if (poa->ds_prev && !poa->after_branch_point &&
1562                     bp->blk_birth >
1563                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1564                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1565                             bp_get_dsize_sync(dp->dp_spa, bp);
1566                 }
1567         } else {
1568                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1569                 poa->comp += BP_GET_PSIZE(bp);
1570                 poa->uncomp += BP_GET_UCSIZE(bp);
1571                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1572         }
1573         return (0);
1574 }
1575 
1576 static void
1577 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1578     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1579 {
1580         struct process_old_arg poa = { 0 };
1581         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1582         objset_t *mos = dp->dp_meta_objset;
1583 
1584         ASSERT(ds->ds_deadlist.dl_oldfmt);
1585         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1586 
1587         poa.ds = ds;
1588         poa.ds_prev = ds_prev;
1589         poa.after_branch_point = after_branch_point;
1590         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1591         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1592             process_old_cb, &poa, tx));
1593         VERIFY0(zio_wait(poa.pio));
1594         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1595 
1596         /* change snapused */
1597         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1598             -poa.used, -poa.comp, -poa.uncomp, tx);
1599 
1600         /* swap next's deadlist to our deadlist */
1601         dsl_deadlist_close(&ds->ds_deadlist);
1602         dsl_deadlist_close(&ds_next->ds_deadlist);
1603         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1604             ds->ds_phys->ds_deadlist_obj);
1605         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1606         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1607             ds_next->ds_phys->ds_deadlist_obj);
1608 }
1609 
1610 static int
1611 old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1612 {
1613         int err;
1614         struct killarg ka;
1615 
1616         /*
1617          * Free everything that we point to (that's born after
1618          * the previous snapshot, if we are a clone)
1619          *
1620          * NB: this should be very quick, because we already
1621          * freed all the objects in open context.
1622          */
1623         ka.ds = ds;
1624         ka.tx = tx;
1625         err = traverse_dataset(ds,
1626             ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1627             kill_blkptr, &ka);
1628         ASSERT0(err);
1629         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1630 
1631         return (err);
1632 }
1633 
1634 void
1635 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1636 {
1637         struct dsl_ds_destroyarg *dsda = arg1;
1638         dsl_dataset_t *ds = dsda->ds;
1639         int err;
1640         int after_branch_point = FALSE;
1641         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1642         objset_t *mos = dp->dp_meta_objset;
1643         dsl_dataset_t *ds_prev = NULL;
1644         boolean_t wont_destroy;
1645         uint64_t obj;
1646 
1647         wont_destroy = (dsda->defer &&
1648             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1649 
1650         ASSERT(ds->ds_owner || wont_destroy);
1651         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1652         ASSERT(ds->ds_prev == NULL ||
1653             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1654         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1655 
1656         if (wont_destroy) {
1657                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1658                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1659                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1660                 spa_history_log_internal_ds(ds, "defer_destroy", tx, "");
1661                 return;
1662         }
1663 
1664         /* We need to log before removing it from the namespace. */
1665         spa_history_log_internal_ds(ds, "destroy", tx, "");
1666 
1667         /* signal any waiters that this dataset is going away */
1668         mutex_enter(&ds->ds_lock);
1669         ds->ds_owner = dsl_reaper;
1670         cv_broadcast(&ds->ds_exclusive_cv);
1671         mutex_exit(&ds->ds_lock);
1672 
1673         /* Remove our reservation */
1674         if (ds->ds_reserved != 0) {
1675                 dsl_prop_setarg_t psa;
1676                 uint64_t value = 0;
1677 
1678                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1679                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1680                     &value);
1681                 psa.psa_effective_value = 0;    /* predict default value */
1682 
1683                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1684                 ASSERT0(ds->ds_reserved);
1685         }
1686 
1687         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1688 
1689         dsl_scan_ds_destroyed(ds, tx);
1690 
1691         obj = ds->ds_object;
1692 
1693         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1694                 if (ds->ds_prev) {
1695                         ds_prev = ds->ds_prev;
1696                 } else {
1697                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1698                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1699                 }
1700                 after_branch_point =
1701                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1702 
1703                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1704                 if (after_branch_point &&
1705                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1706                         remove_from_next_clones(ds_prev, obj, tx);
1707                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1708                                 VERIFY(0 == zap_add_int(mos,
1709                                     ds_prev->ds_phys->ds_next_clones_obj,
1710                                     ds->ds_phys->ds_next_snap_obj, tx));
1711                         }
1712                 }
1713                 if (after_branch_point &&
1714                     ds->ds_phys->ds_next_snap_obj == 0) {
1715                         /* This clone is toast. */
1716                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1717                         ds_prev->ds_phys->ds_num_children--;
1718 
1719                         /*
1720                          * If the clone's origin has no other clones, no
1721                          * user holds, and has been marked for deferred
1722                          * deletion, then we should have done the necessary
1723                          * destroy setup for it.
1724                          */
1725                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1726                             ds_prev->ds_userrefs == 0 &&
1727                             DS_IS_DEFER_DESTROY(ds_prev)) {
1728                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1729                         } else {
1730                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1731                         }
1732                 } else if (!after_branch_point) {
1733                         ds_prev->ds_phys->ds_next_snap_obj =
1734                             ds->ds_phys->ds_next_snap_obj;
1735                 }
1736         }
1737 
1738         if (dsl_dataset_is_snapshot(ds)) {
1739                 dsl_dataset_t *ds_next;
1740                 uint64_t old_unique;
1741                 uint64_t used = 0, comp = 0, uncomp = 0;
1742 
1743                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1744                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1745                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1746 
1747                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1748 
1749                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1750                 ds_next->ds_phys->ds_prev_snap_obj =
1751                     ds->ds_phys->ds_prev_snap_obj;
1752                 ds_next->ds_phys->ds_prev_snap_txg =
1753                     ds->ds_phys->ds_prev_snap_txg;
1754                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1755                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1756 
1757 
1758                 if (ds_next->ds_deadlist.dl_oldfmt) {
1759                         process_old_deadlist(ds, ds_prev, ds_next,
1760                             after_branch_point, tx);
1761                 } else {
1762                         /* Adjust prev's unique space. */
1763                         if (ds_prev && !after_branch_point) {
1764                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1765                                     ds_prev->ds_phys->ds_prev_snap_txg,
1766                                     ds->ds_phys->ds_prev_snap_txg,
1767                                     &used, &comp, &uncomp);
1768                                 ds_prev->ds_phys->ds_unique_bytes += used;
1769                         }
1770 
1771                         /* Adjust snapused. */
1772                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1773                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1774                             &used, &comp, &uncomp);
1775                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1776                             -used, -comp, -uncomp, tx);
1777 
1778                         /* Move blocks to be freed to pool's free list. */
1779                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1780                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1781                             tx);
1782                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1783                             DD_USED_HEAD, used, comp, uncomp, tx);
1784 
1785                         /* Merge our deadlist into next's and free it. */
1786                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1787                             ds->ds_phys->ds_deadlist_obj, tx);
1788                 }
1789                 dsl_deadlist_close(&ds->ds_deadlist);
1790                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1791 
1792                 /* Collapse range in clone heads */
1793                 dsl_dataset_remove_clones_key(ds,
1794                     ds->ds_phys->ds_creation_txg, tx);
1795 
1796                 if (dsl_dataset_is_snapshot(ds_next)) {
1797                         dsl_dataset_t *ds_nextnext;
1798 
1799                         /*
1800                          * Update next's unique to include blocks which
1801                          * were previously shared by only this snapshot
1802                          * and it.  Those blocks will be born after the
1803                          * prev snap and before this snap, and will have
1804                          * died after the next snap and before the one
1805                          * after that (ie. be on the snap after next's
1806                          * deadlist).
1807                          */
1808                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1809                             ds_next->ds_phys->ds_next_snap_obj,
1810                             FTAG, &ds_nextnext));
1811                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1812                             ds->ds_phys->ds_prev_snap_txg,
1813                             ds->ds_phys->ds_creation_txg,
1814                             &used, &comp, &uncomp);
1815                         ds_next->ds_phys->ds_unique_bytes += used;
1816                         dsl_dataset_rele(ds_nextnext, FTAG);
1817                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1818 
1819                         /* Collapse range in this head. */
1820                         dsl_dataset_t *hds;
1821                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1822                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1823                             FTAG, &hds));
1824                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1825                             ds->ds_phys->ds_creation_txg, tx);
1826                         dsl_dataset_rele(hds, FTAG);
1827 
1828                 } else {
1829                         ASSERT3P(ds_next->ds_prev, ==, ds);
1830                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1831                         ds_next->ds_prev = NULL;
1832                         if (ds_prev) {
1833                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1834                                     ds->ds_phys->ds_prev_snap_obj,
1835                                     ds_next, &ds_next->ds_prev));
1836                         }
1837 
1838                         dsl_dataset_recalc_head_uniq(ds_next);
1839 
1840                         /*
1841                          * Reduce the amount of our unconsmed refreservation
1842                          * being charged to our parent by the amount of
1843                          * new unique data we have gained.
1844                          */
1845                         if (old_unique < ds_next->ds_reserved) {
1846                                 int64_t mrsdelta;
1847                                 uint64_t new_unique =
1848                                     ds_next->ds_phys->ds_unique_bytes;
1849 
1850                                 ASSERT(old_unique <= new_unique);
1851                                 mrsdelta = MIN(new_unique - old_unique,
1852                                     ds_next->ds_reserved - old_unique);
1853                                 dsl_dir_diduse_space(ds->ds_dir,
1854                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1855                         }
1856                 }
1857                 dsl_dataset_rele(ds_next, FTAG);
1858         } else {
1859                 zfeature_info_t *async_destroy =
1860                     &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1861                 objset_t *os;
1862 
1863                 /*
1864                  * There's no next snapshot, so this is a head dataset.
1865                  * Destroy the deadlist.  Unless it's a clone, the
1866                  * deadlist should be empty.  (If it's a clone, it's
1867                  * safe to ignore the deadlist contents.)
1868                  */
1869                 dsl_deadlist_close(&ds->ds_deadlist);
1870                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1871                 ds->ds_phys->ds_deadlist_obj = 0;
1872 
1873                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
1874 
1875                 if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1876                         err = old_synchronous_dataset_destroy(ds, tx);
1877                 } else {
1878                         /*
1879                          * Move the bptree into the pool's list of trees to
1880                          * clean up and update space accounting information.
1881                          */
1882                         uint64_t used, comp, uncomp;
1883 
1884                         zil_destroy_sync(dmu_objset_zil(os), tx);
1885 
1886                         if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1887                                 spa_feature_incr(dp->dp_spa, async_destroy, tx);
1888                                 dp->dp_bptree_obj = bptree_alloc(mos, tx);
1889                                 VERIFY(zap_add(mos,
1890                                     DMU_POOL_DIRECTORY_OBJECT,
1891                                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1892                                     &dp->dp_bptree_obj, tx) == 0);
1893                         }
1894 
1895                         used = ds->ds_dir->dd_phys->dd_used_bytes;
1896                         comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1897                         uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1898 
1899                         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1900                             ds->ds_phys->ds_unique_bytes == used);
1901 
1902                         bptree_add(mos, dp->dp_bptree_obj,
1903                             &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1904                             used, comp, uncomp, tx);
1905                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1906                             -used, -comp, -uncomp, tx);
1907                         dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1908                             used, comp, uncomp, tx);
1909                 }
1910 
1911                 if (ds->ds_prev != NULL) {
1912                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1913                                 VERIFY3U(0, ==, zap_remove_int(mos,
1914                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1915                                     ds->ds_object, tx));
1916                         }
1917                         dsl_dataset_rele(ds->ds_prev, ds);
1918                         ds->ds_prev = ds_prev = NULL;
1919                 }
1920         }
1921 
1922         /*
1923          * This must be done after the dsl_traverse(), because it will
1924          * re-open the objset.
1925          */
1926         if (ds->ds_objset) {
1927                 dmu_objset_evict(ds->ds_objset);
1928                 ds->ds_objset = NULL;
1929         }
1930 
1931         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1932                 /* Erase the link in the dir */
1933                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1934                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1935                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1936                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1937                 ASSERT(err == 0);
1938         } else {
1939                 /* remove from snapshot namespace */
1940                 dsl_dataset_t *ds_head;
1941                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1942                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1943                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1944                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1945 #ifdef ZFS_DEBUG
1946                 {
1947                         uint64_t val;
1948 
1949                         err = dsl_dataset_snap_lookup(ds_head,
1950                             ds->ds_snapname, &val);
1951                         ASSERT0(err);
1952                         ASSERT3U(val, ==, obj);
1953                 }
1954 #endif
1955                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1956                 ASSERT(err == 0);
1957                 dsl_dataset_rele(ds_head, FTAG);
1958         }
1959 
1960         if (ds_prev && ds->ds_prev != ds_prev)
1961                 dsl_dataset_rele(ds_prev, FTAG);
1962 
1963         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1964 
1965         if (ds->ds_phys->ds_next_clones_obj != 0) {
1966                 uint64_t count;
1967                 ASSERT(0 == zap_count(mos,
1968                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1969                 VERIFY(0 == dmu_object_free(mos,
1970                     ds->ds_phys->ds_next_clones_obj, tx));
1971         }
1972         if (ds->ds_phys->ds_props_obj != 0)
1973                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1974         if (ds->ds_phys->ds_userrefs_obj != 0)
1975                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1976         dsl_dir_close(ds->ds_dir, ds);
1977         ds->ds_dir = NULL;
1978         dsl_dataset_drain_refs(ds, tag);
1979         VERIFY(0 == dmu_object_free(mos, obj, tx));
1980 
1981         if (dsda->rm_origin) {
1982                 /*
1983                  * Remove the origin of the clone we just destroyed.
1984                  */
1985                 struct dsl_ds_destroyarg ndsda = {0};
1986 
1987                 ndsda.ds = dsda->rm_origin;
1988                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
1989         }
1990 }
1991 
1992 static int
1993 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1994 {
1995         uint64_t asize;
1996 
1997         if (!dmu_tx_is_syncing(tx))
1998                 return (0);
1999 
2000         /*
2001          * If there's an fs-only reservation, any blocks that might become
2002          * owned by the snapshot dataset must be accommodated by space
2003          * outside of the reservation.
2004          */
2005         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2006         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2007         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2008                 return (ENOSPC);
2009 
2010         /*
2011          * Propagate any reserved space for this snapshot to other
2012          * snapshot checks in this sync group.
2013          */
2014         if (asize > 0)
2015                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2016 
2017         return (0);
2018 }
2019 
2020 /*
2021  * Check if adding additional snapshot(s) would exceed any snapshot limits.
2022  * Note that all snapshot limits up to the root dataset (i.e. the pool itself)
2023  * or the given ancestor must be satisfied. Note that it is valid for the
2024  * count to exceed the limit. This can happen if a snapshot is taken by an
2025  * administrative user in the global zone (e.g. a recursive snapshot by root).
2026  */
2027 int
2028 dsl_snapcount_check(dsl_dir_t *dd, uint64_t cnt, dsl_dir_t *ancestor)
2029 {
2030         uint64_t limit;
2031         int err = 0;
2032 
2033         /*
2034          * The limit is never enforced for the admin user in global zone.
2035          * If we're not in the global zone then we need to run this check in
2036          * open context, since thats when we know what zone we're in and
2037          * syncing is only performed in the global zone.
2038          */
2039         if (INGLOBALZONE(curproc))
2040                 return (0);
2041 
2042         /*
2043          * If renaming a dataset with no snapshots, count adjustment is 0.
2044          */
2045         if (cnt == 0)
2046                 return (0);
2047 
2048         /*
2049          * If an ancestor has been provided, stop checking the limit once we
2050          * hit that dir. We need this during rename so that we don't overcount
2051          * the check once we recurse up to the common ancestor.
2052          */
2053         if (ancestor == dd)
2054                 return (0);
2055 
2056         /*
2057          * If we hit an uninitialized node while recursing up the tree, we can
2058          * stop since we know the counts are not valid on this node and we
2059          * know we won't touch this node's counts.
2060          */
2061         if (dd->dd_phys->dd_filesystem_count == 0)
2062                 return (0);
2063 
2064         /*
2065          * If there's no value for this property, there's no need to enforce a
2066          * snapshot limit.
2067          */
2068         err = dsl_prop_get_dd(dd, zfs_prop_to_name(ZFS_PROP_SNAPSHOT_LIMIT),
2069             8, 1, &limit, NULL, B_FALSE);
2070         if (err == ENOENT)
2071                 return (0);
2072         else if (err != 0)
2073                 return (err);
2074 
2075 #ifdef _KERNEL
2076         extern void __dtrace_probe_zfs__ss__limit(uint64_t, uint64_t, char *);
2077         __dtrace_probe_zfs__ss__limit(
2078             (uint64_t)dd->dd_phys->dd_snapshot_count, (uint64_t)limit,
2079             dd->dd_myname);
2080 #endif
2081 
2082         if (limit != MAXLIMIT &&
2083             (dd->dd_phys->dd_snapshot_count + cnt) > limit)
2084                 return (EDQUOT);
2085 
2086         if (dd->dd_parent != NULL)
2087                 err = dsl_snapcount_check(dd->dd_parent, cnt, ancestor);
2088 
2089         return (err);
2090 }
2091 
2092 /*
2093  * Adjust the snapshot count for the specified dsl_dir_t and all parents.
2094  * When a new snapshot is created, increment the count on all parents, and when
2095  * a snapshot is destroyed, decrement the count.
2096  */
2097 void
2098 dsl_snapcount_adjust(dsl_dir_t *dd, dmu_tx_t *tx, int64_t delta,
2099     boolean_t first)
2100 {
2101         /*
2102          * If we hit an uninitialized node while recursing up the tree, we can
2103          * stop since we know the counts are not valid on this node and we
2104          * know we shouldn't touch this node's counts. An uninitialized count
2105          * on the node indicates that either the feature has not yet been
2106          * activated or there are no limits on this part of the tree.
2107          */
2108         if (dd->dd_phys->dd_filesystem_count == 0)
2109                 return;
2110 
2111         /*
2112          * The feature might have previously been active, so there could be
2113          * non-0 counts on the nodes, but it might now be inactive.
2114          *
2115          * On initial entry we need to check if this feature is active, but
2116          * we don't want to re-check this on each recursive call. Note: the
2117          * feature cannot be active if its not enabled. If the feature is not
2118          * active, don't touch the on-disk count fields.
2119          */
2120         if (first) {
2121                 dsl_dataset_t *ds = NULL;
2122                 spa_t *spa;
2123                 zfeature_info_t *quota_feat =
2124                     &spa_feature_table[SPA_FEATURE_FS_SS_LIMIT];
2125 
2126                 VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2127                     dd->dd_phys->dd_head_dataset_obj, FTAG, &ds));
2128                 spa = dsl_dataset_get_spa(ds);
2129                 dsl_dataset_rele(ds, FTAG);
2130                 if (!spa_feature_is_active(spa, quota_feat))
2131                         return;
2132         }
2133 
2134         /*
2135          * As with dsl_dataset_set_reservation_check(), wdon't want to run
2136          * this check in open context.
2137          */
2138         if (!dmu_tx_is_syncing(tx))
2139                 return;
2140 
2141         /* if renaming a dataset with no snapshots, count adjustment is 0 */
2142         if (delta == 0)
2143                 return;
2144 
2145         /*
2146          * If we hit an uninitialized node while recursing up the tree, we can
2147          * stop since we know the counts are not valid on this node and we
2148          * know we shouldn't touch this node's counts.
2149          */
2150         if (dd->dd_phys->dd_filesystem_count == 0)
2151                 return;
2152 
2153         /* Increment count for parent */
2154         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2155 
2156         mutex_enter(&dd->dd_lock);
2157 
2158         dd->dd_phys->dd_snapshot_count += delta;
2159 
2160         /* Roll up this additional count into our ancestors */
2161         if (dd->dd_parent != NULL)
2162                 dsl_snapcount_adjust(dd->dd_parent, tx, delta, B_FALSE);
2163 
2164         mutex_exit(&dd->dd_lock);
2165 }
2166 
2167 int
2168 dsl_dataset_snapshot_check(dsl_dataset_t *ds, const char *snapname,
2169     uint64_t cnt, dmu_tx_t *tx)
2170 {
2171         int err;
2172         uint64_t value;
2173 
2174         /*
2175          * We don't allow multiple snapshots of the same txg.  If there
2176          * is already one, try again.
2177          */
2178         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2179                 return (EAGAIN);
2180 
2181         /*
2182          * Check for conflicting snapshot name.
2183          */
2184         err = dsl_dataset_snap_lookup(ds, snapname, &value);
2185         if (err == 0)
2186                 return (EEXIST);
2187         if (err != ENOENT)
2188                 return (err);
2189 
2190         /*
2191          * Check that the dataset's name is not too long.  Name consists
2192          * of the dataset's length + 1 for the @-sign + snapshot name's length
2193          */
2194         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2195                 return (ENAMETOOLONG);
2196 
2197         err = dsl_snapcount_check(ds->ds_dir, cnt, NULL);
2198         if (err)
2199                 return (err);
2200 
2201         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2202         if (err)
2203                 return (err);
2204 
2205         ds->ds_trysnap_txg = tx->tx_txg;
2206         return (0);
2207 }
2208 
2209 void
2210 dsl_dataset_snapshot_sync(dsl_dataset_t *ds, const char *snapname,
2211     dmu_tx_t *tx)
2212 {
2213         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2214         dmu_buf_t *dbuf;
2215         dsl_dataset_phys_t *dsphys;
2216         uint64_t dsobj, crtxg;
2217         objset_t *mos = dp->dp_meta_objset;
2218         int err;
2219 
2220         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2221 
2222         dsl_snapcount_adjust(ds->ds_dir, tx, 1, B_TRUE);
2223 
2224         /*
2225          * The origin's ds_creation_txg has to be < TXG_INITIAL
2226          */
2227         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2228                 crtxg = 1;
2229         else
2230                 crtxg = tx->tx_txg;
2231 
2232         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2233             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2234         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2235         dmu_buf_will_dirty(dbuf, tx);
2236         dsphys = dbuf->db_data;
2237         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2238         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2239         dsphys->ds_fsid_guid = unique_create();
2240         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2241             sizeof (dsphys->ds_guid));
2242         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2243         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2244         dsphys->ds_next_snap_obj = ds->ds_object;
2245         dsphys->ds_num_children = 1;
2246         dsphys->ds_creation_time = gethrestime_sec();
2247         dsphys->ds_creation_txg = crtxg;
2248         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2249         dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2250         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2251         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2252         dsphys->ds_flags = ds->ds_phys->ds_flags;
2253         dsphys->ds_bp = ds->ds_phys->ds_bp;
2254         dmu_buf_rele(dbuf, FTAG);
2255 
2256         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2257         if (ds->ds_prev) {
2258                 uint64_t next_clones_obj =
2259                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2260                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2261                     ds->ds_object ||
2262                     ds->ds_prev->ds_phys->ds_num_children > 1);
2263                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2264                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2265                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2266                             ds->ds_prev->ds_phys->ds_creation_txg);
2267                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2268                 } else if (next_clones_obj != 0) {
2269                         remove_from_next_clones(ds->ds_prev,
2270                             dsphys->ds_next_snap_obj, tx);
2271                         VERIFY3U(0, ==, zap_add_int(mos,
2272                             next_clones_obj, dsobj, tx));
2273                 }
2274         }
2275 
2276         /*
2277          * If we have a reference-reservation on this dataset, we will
2278          * need to increase the amount of refreservation being charged
2279          * since our unique space is going to zero.
2280          */
2281         if (ds->ds_reserved) {
2282                 int64_t delta;
2283                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2284                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2285                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2286                     delta, 0, 0, tx);
2287         }
2288 
2289         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2290         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2291             ds->ds_dir->dd_myname, snapname, dsobj,
2292             ds->ds_phys->ds_prev_snap_txg);
2293         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2294             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2295         dsl_deadlist_close(&ds->ds_deadlist);
2296         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2297         dsl_deadlist_add_key(&ds->ds_deadlist,
2298             ds->ds_phys->ds_prev_snap_txg, tx);
2299 
2300         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2301         ds->ds_phys->ds_prev_snap_obj = dsobj;
2302         ds->ds_phys->ds_prev_snap_txg = crtxg;
2303         ds->ds_phys->ds_unique_bytes = 0;
2304         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2305                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2306 
2307         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2308             snapname, 8, 1, &dsobj, tx);
2309         ASSERT(err == 0);
2310 
2311         if (ds->ds_prev)
2312                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2313         VERIFY(0 == dsl_dataset_get_ref(dp,
2314             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2315 
2316         dsl_scan_ds_snapshotted(ds, tx);
2317 
2318         dsl_dir_snap_cmtime_update(ds->ds_dir);
2319 
2320         spa_history_log_internal_ds(ds->ds_prev, "snapshot", tx, "");
2321 }
2322 
2323 void
2324 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2325 {
2326         ASSERT(dmu_tx_is_syncing(tx));
2327         ASSERT(ds->ds_objset != NULL);
2328         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2329 
2330         /*
2331          * in case we had to change ds_fsid_guid when we opened it,
2332          * sync it out now.
2333          */
2334         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2335         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2336 
2337         dmu_objset_sync(ds->ds_objset, zio, tx);
2338 }
2339 
2340 static void
2341 get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2342 {
2343         uint64_t count = 0;
2344         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2345         zap_cursor_t zc;
2346         zap_attribute_t za;
2347         nvlist_t *propval;
2348         nvlist_t *val;
2349 
2350         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2351         VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2352         VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2353 
2354         /*
2355          * There may me missing entries in ds_next_clones_obj
2356          * due to a bug in a previous version of the code.
2357          * Only trust it if it has the right number of entries.
2358          */
2359         if (ds->ds_phys->ds_next_clones_obj != 0) {
2360                 ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2361                     &count));
2362         }
2363         if (count != ds->ds_phys->ds_num_children - 1) {
2364                 goto fail;
2365         }
2366         for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2367             zap_cursor_retrieve(&zc, &za) == 0;
2368             zap_cursor_advance(&zc)) {
2369                 dsl_dataset_t *clone;
2370                 char buf[ZFS_MAXNAMELEN];
2371                 /*
2372                  * Even though we hold the dp_config_rwlock, the dataset
2373                  * may fail to open, returning ENOENT.  If there is a
2374                  * thread concurrently attempting to destroy this
2375                  * dataset, it will have the ds_rwlock held for
2376                  * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2377                  * dsl_dataset_hold_ref() will fail its
2378                  * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2379                  * dp_config_rwlock, and wait for the destroy progress
2380                  * and signal ds_exclusive_cv.  If the destroy was
2381                  * successful, we will see that
2382                  * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2383                  */
2384                 if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2385                     za.za_first_integer, FTAG, &clone) != 0)
2386                         continue;
2387                 dsl_dir_name(clone->ds_dir, buf);
2388                 VERIFY(nvlist_add_boolean(val, buf) == 0);
2389                 dsl_dataset_rele(clone, FTAG);
2390         }
2391         zap_cursor_fini(&zc);
2392         VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2393         VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2394             propval) == 0);
2395 fail:
2396         nvlist_free(val);
2397         nvlist_free(propval);
2398         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2399 }
2400 
2401 void
2402 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2403 {
2404         uint64_t refd, avail, uobjs, aobjs, ratio;
2405 
2406         ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2407             (ds->ds_phys->ds_uncompressed_bytes * 100 /
2408             ds->ds_phys->ds_compressed_bytes);
2409 
2410         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2411 
2412         if (dsl_dataset_is_snapshot(ds)) {
2413                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2414                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2415                     ds->ds_phys->ds_unique_bytes);
2416                 get_clones_stat(ds, nv);
2417         } else {
2418                 dsl_dir_stats(ds->ds_dir, nv);
2419         }
2420 
2421         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2422         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2423         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2424 
2425         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2426             ds->ds_phys->ds_creation_time);
2427         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2428             ds->ds_phys->ds_creation_txg);
2429         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2430             ds->ds_quota);
2431         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2432             ds->ds_reserved);
2433         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2434             ds->ds_phys->ds_guid);
2435         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2436             ds->ds_phys->ds_unique_bytes);
2437         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2438             ds->ds_object);
2439         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2440             ds->ds_userrefs);
2441         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2442             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2443 
2444         if (ds->ds_phys->ds_prev_snap_obj != 0) {
2445                 uint64_t written, comp, uncomp;
2446                 dsl_pool_t *dp = ds->ds_dir->dd_pool;
2447                 dsl_dataset_t *prev;
2448 
2449                 rw_enter(&dp->dp_config_rwlock, RW_READER);
2450                 int err = dsl_dataset_hold_obj(dp,
2451                     ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2452                 rw_exit(&dp->dp_config_rwlock);
2453                 if (err == 0) {
2454                         err = dsl_dataset_space_written(prev, ds, &written,
2455                             &comp, &uncomp);
2456                         dsl_dataset_rele(prev, FTAG);
2457                         if (err == 0) {
2458                                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2459                                     written);
2460                         }
2461                 }
2462         }
2463 }
2464 
2465 void
2466 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2467 {
2468         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2469         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2470         stat->dds_guid = ds->ds_phys->ds_guid;
2471         stat->dds_origin[0] = '\0';
2472         if (dsl_dataset_is_snapshot(ds)) {
2473                 stat->dds_is_snapshot = B_TRUE;
2474                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2475         } else {
2476                 stat->dds_is_snapshot = B_FALSE;
2477                 stat->dds_num_clones = 0;
2478 
2479                 rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2480                 if (dsl_dir_is_clone(ds->ds_dir)) {
2481                         dsl_dataset_t *ods;
2482 
2483                         VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2484                             ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2485                         dsl_dataset_name(ods, stat->dds_origin);
2486                         dsl_dataset_drop_ref(ods, FTAG);
2487                 }
2488                 rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2489         }
2490 }
2491 
2492 uint64_t
2493 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2494 {
2495         return (ds->ds_fsid_guid);
2496 }
2497 
2498 void
2499 dsl_dataset_space(dsl_dataset_t *ds,
2500     uint64_t *refdbytesp, uint64_t *availbytesp,
2501     uint64_t *usedobjsp, uint64_t *availobjsp)
2502 {
2503         *refdbytesp = ds->ds_phys->ds_referenced_bytes;
2504         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2505         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2506                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2507         if (ds->ds_quota != 0) {
2508                 /*
2509                  * Adjust available bytes according to refquota
2510                  */
2511                 if (*refdbytesp < ds->ds_quota)
2512                         *availbytesp = MIN(*availbytesp,
2513                             ds->ds_quota - *refdbytesp);
2514                 else
2515                         *availbytesp = 0;
2516         }
2517         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2518         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2519 }
2520 
2521 boolean_t
2522 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2523 {
2524         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2525 
2526         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2527             dsl_pool_sync_context(dp));
2528         if (ds->ds_prev == NULL)
2529                 return (B_FALSE);
2530         if (ds->ds_phys->ds_bp.blk_birth >
2531             ds->ds_prev->ds_phys->ds_creation_txg) {
2532                 objset_t *os, *os_prev;
2533                 /*
2534                  * It may be that only the ZIL differs, because it was
2535                  * reset in the head.  Don't count that as being
2536                  * modified.
2537                  */
2538                 if (dmu_objset_from_ds(ds, &os) != 0)
2539                         return (B_TRUE);
2540                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2541                         return (B_TRUE);
2542                 return (bcmp(&os->os_phys->os_meta_dnode,
2543                     &os_prev->os_phys->os_meta_dnode,
2544                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2545         }
2546         return (B_FALSE);
2547 }
2548 
2549 /* ARGSUSED */
2550 static int
2551 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2552 {
2553         dsl_dataset_t *ds = arg1;
2554         char *newsnapname = arg2;
2555         dsl_dir_t *dd = ds->ds_dir;
2556         dsl_dataset_t *hds;
2557         uint64_t val;
2558         int err;
2559 
2560         err = dsl_dataset_hold_obj(dd->dd_pool,
2561             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2562         if (err)
2563                 return (err);
2564 
2565         /* new name better not be in use */
2566         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2567         dsl_dataset_rele(hds, FTAG);
2568 
2569         if (err == 0)
2570                 err = EEXIST;
2571         else if (err == ENOENT)
2572                 err = 0;
2573 
2574         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2575         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2576                 err = ENAMETOOLONG;
2577 
2578         return (err);
2579 }
2580 
2581 static void
2582 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2583 {
2584         dsl_dataset_t *ds = arg1;
2585         const char *newsnapname = arg2;
2586         dsl_dir_t *dd = ds->ds_dir;
2587         objset_t *mos = dd->dd_pool->dp_meta_objset;
2588         dsl_dataset_t *hds;
2589         int err;
2590 
2591         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2592 
2593         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2594             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2595 
2596         VERIFY(0 == dsl_dataset_get_snapname(ds));
2597         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2598         ASSERT0(err);
2599         mutex_enter(&ds->ds_lock);
2600         (void) strcpy(ds->ds_snapname, newsnapname);
2601         mutex_exit(&ds->ds_lock);
2602         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2603             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2604         ASSERT0(err);
2605 
2606         spa_history_log_internal_ds(ds, "rename", tx,
2607             "-> @%s", newsnapname);
2608         dsl_dataset_rele(hds, FTAG);
2609 }
2610 
2611 struct renamesnaparg {
2612         dsl_sync_task_group_t *dstg;
2613         char failed[MAXPATHLEN];
2614         char *oldsnap;
2615         char *newsnap;
2616 };
2617 
2618 static int
2619 dsl_snapshot_rename_one(const char *name, void *arg)
2620 {
2621         struct renamesnaparg *ra = arg;
2622         dsl_dataset_t *ds = NULL;
2623         char *snapname;
2624         int err;
2625 
2626         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2627         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2628 
2629         /*
2630          * For recursive snapshot renames the parent won't be changing
2631          * so we just pass name for both the to/from argument.
2632          */
2633         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2634         if (err != 0) {
2635                 strfree(snapname);
2636                 return (err == ENOENT ? 0 : err);
2637         }
2638 
2639 #ifdef _KERNEL
2640         /*
2641          * For all filesystems undergoing rename, we'll need to unmount it.
2642          */
2643         (void) zfs_unmount_snap(snapname, NULL);
2644 #endif
2645         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2646         strfree(snapname);
2647         if (err != 0)
2648                 return (err == ENOENT ? 0 : err);
2649 
2650         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2651             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2652 
2653         return (0);
2654 }
2655 
2656 static int
2657 dsl_recursive_rename(char *oldname, const char *newname)
2658 {
2659         int err;
2660         struct renamesnaparg *ra;
2661         dsl_sync_task_t *dst;
2662         spa_t *spa;
2663         char *cp, *fsname = spa_strdup(oldname);
2664         int len = strlen(oldname) + 1;
2665 
2666         /* truncate the snapshot name to get the fsname */
2667         cp = strchr(fsname, '@');
2668         *cp = '\0';
2669 
2670         err = spa_open(fsname, &spa, FTAG);
2671         if (err) {
2672                 kmem_free(fsname, len);
2673                 return (err);
2674         }
2675         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2676         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2677 
2678         ra->oldsnap = strchr(oldname, '@') + 1;
2679         ra->newsnap = strchr(newname, '@') + 1;
2680         *ra->failed = '\0';
2681 
2682         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2683             DS_FIND_CHILDREN);
2684         kmem_free(fsname, len);
2685 
2686         if (err == 0) {
2687                 err = dsl_sync_task_group_wait(ra->dstg);
2688         }
2689 
2690         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2691             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2692                 dsl_dataset_t *ds = dst->dst_arg1;
2693                 if (dst->dst_err) {
2694                         dsl_dir_name(ds->ds_dir, ra->failed);
2695                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2696                         (void) strlcat(ra->failed, ra->newsnap,
2697                             sizeof (ra->failed));
2698                 }
2699                 dsl_dataset_rele(ds, ra->dstg);
2700         }
2701 
2702         if (err)
2703                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2704 
2705         dsl_sync_task_group_destroy(ra->dstg);
2706         kmem_free(ra, sizeof (struct renamesnaparg));
2707         spa_close(spa, FTAG);
2708         return (err);
2709 }
2710 
2711 static int
2712 dsl_valid_rename(const char *oldname, void *arg)
2713 {
2714         int delta = *(int *)arg;
2715 
2716         if (strlen(oldname) + delta >= MAXNAMELEN)
2717                 return (ENAMETOOLONG);
2718 
2719         return (0);
2720 }
2721 
2722 #pragma weak dmu_objset_rename = dsl_dataset_rename
2723 int
2724 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2725 {
2726         dsl_dir_t *dd;
2727         dsl_dataset_t *ds;
2728         const char *tail;
2729         int err;
2730 
2731         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2732         if (err)
2733                 return (err);
2734 
2735         if (tail == NULL) {
2736                 int delta = strlen(newname) - strlen(oldname);
2737 
2738                 /* if we're growing, validate child name lengths */
2739                 if (delta > 0)
2740                         err = dmu_objset_find(oldname, dsl_valid_rename,
2741                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2742 
2743                 if (err == 0)
2744                         err = dsl_dir_rename(dd, newname);
2745                 dsl_dir_close(dd, FTAG);
2746                 return (err);
2747         }
2748 
2749         if (tail[0] != '@') {
2750                 /* the name ended in a nonexistent component */
2751                 dsl_dir_close(dd, FTAG);
2752                 return (ENOENT);
2753         }
2754 
2755         dsl_dir_close(dd, FTAG);
2756 
2757         /* new name must be snapshot in same filesystem */
2758         tail = strchr(newname, '@');
2759         if (tail == NULL)
2760                 return (EINVAL);
2761         tail++;
2762         if (strncmp(oldname, newname, tail - newname) != 0)
2763                 return (EXDEV);
2764 
2765         if (recursive) {
2766                 err = dsl_recursive_rename(oldname, newname);
2767         } else {
2768                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2769                 if (err)
2770                         return (err);
2771 
2772                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2773                     dsl_dataset_snapshot_rename_check,
2774                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2775 
2776                 dsl_dataset_rele(ds, FTAG);
2777         }
2778 
2779         return (err);
2780 }
2781 
2782 struct promotenode {
2783         list_node_t link;
2784         dsl_dataset_t *ds;
2785 };
2786 
2787 struct promotearg {
2788         list_t shared_snaps, origin_snaps, clone_snaps;
2789         dsl_dataset_t *origin_origin;
2790         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2791         char *err_ds;
2792 };
2793 
2794 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2795 static boolean_t snaplist_unstable(list_t *l);
2796 
2797 static int
2798 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2799 {
2800         dsl_dataset_t *hds = arg1;
2801         struct promotearg *pa = arg2;
2802         struct promotenode *snap = list_head(&pa->shared_snaps);
2803         dsl_dataset_t *origin_ds = snap->ds;
2804         int err;
2805         uint64_t unused;
2806 
2807         /* Check that it is a real clone */
2808         if (!dsl_dir_is_clone(hds->ds_dir))
2809                 return (EINVAL);
2810 
2811         /* Since this is so expensive, don't do the preliminary check */
2812         if (!dmu_tx_is_syncing(tx))
2813                 return (0);
2814 
2815         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2816                 return (EXDEV);
2817 
2818         /* compute origin's new unique space */
2819         snap = list_tail(&pa->clone_snaps);
2820         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2821         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2822             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2823             &pa->unique, &unused, &unused);
2824 
2825         /*
2826          * Walk the snapshots that we are moving
2827          *
2828          * Compute space to transfer.  Consider the incremental changes
2829          * to used for each snapshot:
2830          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2831          * So each snapshot gave birth to:
2832          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2833          * So a sequence would look like:
2834          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2835          * Which simplifies to:
2836          * uN + kN + kN-1 + ... + k1 + k0
2837          * Note however, if we stop before we reach the ORIGIN we get:
2838          * uN + kN + kN-1 + ... + kM - uM-1
2839          */
2840         pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2841         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2842         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2843         for (snap = list_head(&pa->shared_snaps); snap;
2844             snap = list_next(&pa->shared_snaps, snap)) {
2845                 uint64_t val, dlused, dlcomp, dluncomp;
2846                 dsl_dataset_t *ds = snap->ds;
2847 
2848                 /* Check that the snapshot name does not conflict */
2849                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2850                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2851                 if (err == 0) {
2852                         err = EEXIST;
2853                         goto out;
2854                 }
2855                 if (err != ENOENT)
2856                         goto out;
2857 
2858                 /* The very first snapshot does not have a deadlist */
2859                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2860                         continue;
2861 
2862                 dsl_deadlist_space(&ds->ds_deadlist,
2863                     &dlused, &dlcomp, &dluncomp);
2864                 pa->used += dlused;
2865                 pa->comp += dlcomp;
2866                 pa->uncomp += dluncomp;
2867         }
2868 
2869         /*
2870          * If we are a clone of a clone then we never reached ORIGIN,
2871          * so we need to subtract out the clone origin's used space.
2872          */
2873         if (pa->origin_origin) {
2874                 pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2875                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2876                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2877         }
2878 
2879         /* Check that there is enough space and limit headroom here */
2880         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2881             origin_ds->ds_dir, pa->used, tx);
2882         if (err)
2883                 return (err);
2884 
2885         /*
2886          * Compute the amounts of space that will be used by snapshots
2887          * after the promotion (for both origin and clone).  For each,
2888          * it is the amount of space that will be on all of their
2889          * deadlists (that was not born before their new origin).
2890          */
2891         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2892                 uint64_t space;
2893 
2894                 /*
2895                  * Note, typically this will not be a clone of a clone,
2896                  * so dd_origin_txg will be < TXG_INITIAL, so
2897                  * these snaplist_space() -> dsl_deadlist_space_range()
2898                  * calls will be fast because they do not have to
2899                  * iterate over all bps.
2900                  */
2901                 snap = list_head(&pa->origin_snaps);
2902                 err = snaplist_space(&pa->shared_snaps,
2903                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2904                 if (err)
2905                         return (err);
2906 
2907                 err = snaplist_space(&pa->clone_snaps,
2908                     snap->ds->ds_dir->dd_origin_txg, &space);
2909                 if (err)
2910                         return (err);
2911                 pa->cloneusedsnap += space;
2912         }
2913         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2914                 err = snaplist_space(&pa->origin_snaps,
2915                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2916                 if (err)
2917                         return (err);
2918         }
2919 
2920         return (0);
2921 out:
2922         pa->err_ds =  snap->ds->ds_snapname;
2923         return (err);
2924 }
2925 
2926 static void
2927 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2928 {
2929         dsl_dataset_t *hds = arg1;
2930         struct promotearg *pa = arg2;
2931         struct promotenode *snap = list_head(&pa->shared_snaps);
2932         dsl_dataset_t *origin_ds = snap->ds;
2933         dsl_dataset_t *origin_head;
2934         dsl_dir_t *dd = hds->ds_dir;
2935         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2936         dsl_dir_t *odd = NULL;
2937         uint64_t oldnext_obj;
2938         int64_t delta;
2939 
2940         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2941 
2942         snap = list_head(&pa->origin_snaps);
2943         origin_head = snap->ds;
2944 
2945         /*
2946          * We need to explicitly open odd, since origin_ds's dd will be
2947          * changing.
2948          */
2949         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2950             NULL, FTAG, &odd));
2951 
2952         /* change origin's next snap */
2953         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2954         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2955         snap = list_tail(&pa->clone_snaps);
2956         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2957         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2958 
2959         /* change the origin's next clone */
2960         if (origin_ds->ds_phys->ds_next_clones_obj) {
2961                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2962                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2963                     origin_ds->ds_phys->ds_next_clones_obj,
2964                     oldnext_obj, tx));
2965         }
2966 
2967         /* change origin */
2968         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2969         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2970         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2971         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2972         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2973         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2974         origin_head->ds_dir->dd_origin_txg =
2975             origin_ds->ds_phys->ds_creation_txg;
2976 
2977         /* change dd_clone entries */
2978         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2979                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2980                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2981                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2982                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2983                     hds->ds_object, tx));
2984 
2985                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2986                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2987                     origin_head->ds_object, tx));
2988                 if (dd->dd_phys->dd_clones == 0) {
2989                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2990                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2991                 }
2992                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2993                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2994 
2995         }
2996 
2997         /* move snapshots to this dir */
2998         for (snap = list_head(&pa->shared_snaps); snap;
2999             snap = list_next(&pa->shared_snaps, snap)) {
3000                 dsl_dataset_t *ds = snap->ds;
3001 
3002                 /* unregister props as dsl_dir is changing */
3003                 if (ds->ds_objset) {
3004                         dmu_objset_evict(ds->ds_objset);
3005                         ds->ds_objset = NULL;
3006                 }
3007                 /* move snap name entry */
3008                 VERIFY(0 == dsl_dataset_get_snapname(ds));
3009                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
3010                     ds->ds_snapname, tx));
3011                 VERIFY(0 == zap_add(dp->dp_meta_objset,
3012                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
3013                     8, 1, &ds->ds_object, tx));
3014                 dsl_snapcount_adjust(hds->ds_dir, tx, 1, B_TRUE);
3015 
3016                 /* change containing dsl_dir */
3017                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3018                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
3019                 ds->ds_phys->ds_dir_obj = dd->dd_object;
3020                 ASSERT3P(ds->ds_dir, ==, odd);
3021                 dsl_dir_close(ds->ds_dir, ds);
3022                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
3023                     NULL, ds, &ds->ds_dir));
3024 
3025                 /* move any clone references */
3026                 if (ds->ds_phys->ds_next_clones_obj &&
3027                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
3028                         zap_cursor_t zc;
3029                         zap_attribute_t za;
3030 
3031                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
3032                             ds->ds_phys->ds_next_clones_obj);
3033                             zap_cursor_retrieve(&zc, &za) == 0;
3034                             zap_cursor_advance(&zc)) {
3035                                 dsl_dataset_t *cnds;
3036                                 uint64_t o;
3037 
3038                                 if (za.za_first_integer == oldnext_obj) {
3039                                         /*
3040                                          * We've already moved the
3041                                          * origin's reference.
3042                                          */
3043                                         continue;
3044                                 }
3045 
3046                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
3047                                     za.za_first_integer, FTAG, &cnds));
3048                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
3049 
3050                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
3051                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
3052                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
3053                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
3054                                 dsl_dataset_rele(cnds, FTAG);
3055                         }
3056                         zap_cursor_fini(&zc);
3057                 }
3058 
3059                 ASSERT0(dsl_prop_numcb(ds));
3060         }
3061 
3062         /*
3063          * Change space accounting.
3064          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
3065          * both be valid, or both be 0 (resulting in delta == 0).  This
3066          * is true for each of {clone,origin} independently.
3067          */
3068 
3069         delta = pa->cloneusedsnap -
3070             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
3071         ASSERT3S(delta, >=, 0);
3072         ASSERT3U(pa->used, >=, delta);
3073         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
3074         dsl_dir_diduse_space(dd, DD_USED_HEAD,
3075             pa->used - delta, pa->comp, pa->uncomp, tx);
3076 
3077         delta = pa->originusedsnap -
3078             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
3079         ASSERT3S(delta, <=, 0);
3080         ASSERT3U(pa->used, >=, -delta);
3081         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
3082         dsl_dir_diduse_space(odd, DD_USED_HEAD,
3083             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
3084 
3085         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
3086 
3087         /* log history record */
3088         spa_history_log_internal_ds(hds, "promote", tx, "");
3089 
3090         dsl_dir_close(odd, FTAG);
3091 }
3092 
3093 static char *snaplist_tag = "snaplist";
3094 /*
3095  * Make a list of dsl_dataset_t's for the snapshots between first_obj
3096  * (exclusive) and last_obj (inclusive).  The list will be in reverse
3097  * order (last_obj will be the list_head()).  If first_obj == 0, do all
3098  * snapshots back to this dataset's origin.
3099  */
3100 static int
3101 snaplist_make(dsl_pool_t *dp, boolean_t own,
3102     uint64_t first_obj, uint64_t last_obj, list_t *l)
3103 {
3104         uint64_t obj = last_obj;
3105 
3106         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
3107 
3108         list_create(l, sizeof (struct promotenode),
3109             offsetof(struct promotenode, link));
3110 
3111         while (obj != first_obj) {
3112                 dsl_dataset_t *ds;
3113                 struct promotenode *snap;
3114                 int err;
3115 
3116                 if (own) {
3117                         err = dsl_dataset_own_obj(dp, obj,
3118                             0, snaplist_tag, &ds);
3119                         if (err == 0)
3120                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
3121                 } else {
3122                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
3123                 }
3124                 if (err == ENOENT) {
3125                         /* lost race with snapshot destroy */
3126                         struct promotenode *last = list_tail(l);
3127                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
3128                         obj = last->ds->ds_phys->ds_prev_snap_obj;
3129                         continue;
3130                 } else if (err) {
3131                         return (err);
3132                 }
3133 
3134                 if (first_obj == 0)
3135                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
3136 
3137                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
3138                 snap->ds = ds;
3139                 list_insert_tail(l, snap);
3140                 obj = ds->ds_phys->ds_prev_snap_obj;
3141         }
3142 
3143         return (0);
3144 }
3145 
3146 static int
3147 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3148 {
3149         struct promotenode *snap;
3150 
3151         *spacep = 0;
3152         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3153                 uint64_t used, comp, uncomp;
3154                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3155                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
3156                 *spacep += used;
3157         }
3158         return (0);
3159 }
3160 
3161 static void
3162 snaplist_destroy(list_t *l, boolean_t own)
3163 {
3164         struct promotenode *snap;
3165 
3166         if (!l || !list_link_active(&l->list_head))
3167                 return;
3168 
3169         while ((snap = list_tail(l)) != NULL) {
3170                 list_remove(l, snap);
3171                 if (own)
3172                         dsl_dataset_disown(snap->ds, snaplist_tag);
3173                 else
3174                         dsl_dataset_rele(snap->ds, snaplist_tag);
3175                 kmem_free(snap, sizeof (struct promotenode));
3176         }
3177         list_destroy(l);
3178 }
3179 
3180 /*
3181  * Promote a clone.  Nomenclature note:
3182  * "clone" or "cds": the original clone which is being promoted
3183  * "origin" or "ods": the snapshot which is originally clone's origin
3184  * "origin head" or "ohds": the dataset which is the head
3185  * (filesystem/volume) for the origin
3186  * "origin origin": the origin of the origin's filesystem (typically
3187  * NULL, indicating that the clone is not a clone of a clone).
3188  */
3189 int
3190 dsl_dataset_promote(const char *name, char *conflsnap)
3191 {
3192         dsl_dataset_t *ds;
3193         dsl_dir_t *dd;
3194         dsl_pool_t *dp;
3195         dmu_object_info_t doi;
3196         struct promotearg pa = { 0 };
3197         struct promotenode *snap;
3198         int err;
3199 
3200         err = dsl_dataset_hold(name, FTAG, &ds);
3201         if (err)
3202                 return (err);
3203         dd = ds->ds_dir;
3204         dp = dd->dd_pool;
3205 
3206         err = dmu_object_info(dp->dp_meta_objset,
3207             ds->ds_phys->ds_snapnames_zapobj, &doi);
3208         if (err) {
3209                 dsl_dataset_rele(ds, FTAG);
3210                 return (err);
3211         }
3212 
3213         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3214                 dsl_dataset_rele(ds, FTAG);
3215                 return (EINVAL);
3216         }
3217 
3218         /*
3219          * We are going to inherit all the snapshots taken before our
3220          * origin (i.e., our new origin will be our parent's origin).
3221          * Take ownership of them so that we can rename them into our
3222          * namespace.
3223          */
3224         rw_enter(&dp->dp_config_rwlock, RW_READER);
3225 
3226         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3227             &pa.shared_snaps);
3228         if (err != 0)
3229                 goto out;
3230 
3231         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3232         if (err != 0)
3233                 goto out;
3234 
3235         snap = list_head(&pa.shared_snaps);
3236         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3237         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3238             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3239         if (err != 0)
3240                 goto out;
3241 
3242         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3243                 err = dsl_dataset_hold_obj(dp,
3244                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
3245                     FTAG, &pa.origin_origin);
3246                 if (err != 0)
3247                         goto out;
3248         }
3249 
3250 out:
3251         rw_exit(&dp->dp_config_rwlock);
3252 
3253         /*
3254          * Add in 128x the snapnames zapobj size, since we will be moving
3255          * a bunch of snapnames to the promoted ds, and dirtying their
3256          * bonus buffers.
3257          */
3258         if (err == 0) {
3259                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3260                     dsl_dataset_promote_sync, ds, &pa,
3261                     2 + 2 * doi.doi_physical_blocks_512);
3262                 if (err && pa.err_ds && conflsnap)
3263                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3264         }
3265 
3266         snaplist_destroy(&pa.shared_snaps, B_TRUE);
3267         snaplist_destroy(&pa.clone_snaps, B_FALSE);
3268         snaplist_destroy(&pa.origin_snaps, B_FALSE);
3269         if (pa.origin_origin)
3270                 dsl_dataset_rele(pa.origin_origin, FTAG);
3271         dsl_dataset_rele(ds, FTAG);
3272         return (err);
3273 }
3274 
3275 struct cloneswaparg {
3276         dsl_dataset_t *cds; /* clone dataset */
3277         dsl_dataset_t *ohds; /* origin's head dataset */
3278         boolean_t force;
3279         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3280 };
3281 
3282 /* ARGSUSED */
3283 static int
3284 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3285 {
3286         struct cloneswaparg *csa = arg1;
3287 
3288         /* they should both be heads */
3289         if (dsl_dataset_is_snapshot(csa->cds) ||
3290             dsl_dataset_is_snapshot(csa->ohds))
3291                 return (EINVAL);
3292 
3293         /* the branch point should be just before them */
3294         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3295                 return (EINVAL);
3296 
3297         /* cds should be the clone (unless they are unrelated) */
3298         if (csa->cds->ds_prev != NULL &&
3299             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3300             csa->ohds->ds_object !=
3301             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3302                 return (EINVAL);
3303 
3304         /* the clone should be a child of the origin */
3305         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3306                 return (EINVAL);
3307 
3308         /* ohds shouldn't be modified unless 'force' */
3309         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3310                 return (ETXTBSY);
3311 
3312         /* adjust amount of any unconsumed refreservation */
3313         csa->unused_refres_delta =
3314             (int64_t)MIN(csa->ohds->ds_reserved,
3315             csa->ohds->ds_phys->ds_unique_bytes) -
3316             (int64_t)MIN(csa->ohds->ds_reserved,
3317             csa->cds->ds_phys->ds_unique_bytes);
3318 
3319         if (csa->unused_refres_delta > 0 &&
3320             csa->unused_refres_delta >
3321             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3322                 return (ENOSPC);
3323 
3324         if (csa->ohds->ds_quota != 0 &&
3325             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3326                 return (EDQUOT);
3327 
3328         return (0);
3329 }
3330 
3331 /* ARGSUSED */
3332 static void
3333 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3334 {
3335         struct cloneswaparg *csa = arg1;
3336         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3337 
3338         ASSERT(csa->cds->ds_reserved == 0);
3339         ASSERT(csa->ohds->ds_quota == 0 ||
3340             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3341 
3342         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3343         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3344 
3345         if (csa->cds->ds_objset != NULL) {
3346                 dmu_objset_evict(csa->cds->ds_objset);
3347                 csa->cds->ds_objset = NULL;
3348         }
3349 
3350         if (csa->ohds->ds_objset != NULL) {
3351                 dmu_objset_evict(csa->ohds->ds_objset);
3352                 csa->ohds->ds_objset = NULL;
3353         }
3354 
3355         /*
3356          * Reset origin's unique bytes, if it exists.
3357          */
3358         if (csa->cds->ds_prev) {
3359                 dsl_dataset_t *origin = csa->cds->ds_prev;
3360                 uint64_t comp, uncomp;
3361 
3362                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3363                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3364                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3365                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3366         }
3367 
3368         /* swap blkptrs */
3369         {
3370                 blkptr_t tmp;
3371                 tmp = csa->ohds->ds_phys->ds_bp;
3372                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3373                 csa->cds->ds_phys->ds_bp = tmp;
3374         }
3375 
3376         /* set dd_*_bytes */
3377         {
3378                 int64_t dused, dcomp, duncomp;
3379                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3380                 uint64_t odl_used, odl_comp, odl_uncomp;
3381 
3382                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3383                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3384 
3385                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3386                     &cdl_used, &cdl_comp, &cdl_uncomp);
3387                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3388                     &odl_used, &odl_comp, &odl_uncomp);
3389 
3390                 dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3391                     (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3392                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3393                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3394                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3395                     cdl_uncomp -
3396                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3397 
3398                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3399                     dused, dcomp, duncomp, tx);
3400                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3401                     -dused, -dcomp, -duncomp, tx);
3402 
3403                 /*
3404                  * The difference in the space used by snapshots is the
3405                  * difference in snapshot space due to the head's
3406                  * deadlist (since that's the only thing that's
3407                  * changing that affects the snapused).
3408                  */
3409                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3410                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3411                     &cdl_used, &cdl_comp, &cdl_uncomp);
3412                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3413                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3414                     &odl_used, &odl_comp, &odl_uncomp);
3415                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3416                     DD_USED_HEAD, DD_USED_SNAP, tx);
3417         }
3418 
3419         /* swap ds_*_bytes */
3420         SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3421             csa->cds->ds_phys->ds_referenced_bytes);
3422         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3423             csa->cds->ds_phys->ds_compressed_bytes);
3424         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3425             csa->cds->ds_phys->ds_uncompressed_bytes);
3426         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3427             csa->cds->ds_phys->ds_unique_bytes);
3428 
3429         /* apply any parent delta for change in unconsumed refreservation */
3430         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3431             csa->unused_refres_delta, 0, 0, tx);
3432 
3433         /*
3434          * Swap deadlists.
3435          */
3436         dsl_deadlist_close(&csa->cds->ds_deadlist);
3437         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3438         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3439             csa->cds->ds_phys->ds_deadlist_obj);
3440         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3441             csa->cds->ds_phys->ds_deadlist_obj);
3442         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3443             csa->ohds->ds_phys->ds_deadlist_obj);
3444 
3445         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3446 
3447         spa_history_log_internal_ds(csa->cds, "clone swap", tx,
3448             "parent=%s", csa->ohds->ds_dir->dd_myname);
3449 }
3450 
3451 /*
3452  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3453  * recv" into an existing fs to swizzle the file system to the new
3454  * version, and by "zfs rollback".  Can also be used to swap two
3455  * independent head datasets if neither has any snapshots.
3456  */
3457 int
3458 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3459     boolean_t force)
3460 {
3461         struct cloneswaparg csa;
3462         int error;
3463 
3464         ASSERT(clone->ds_owner);
3465         ASSERT(origin_head->ds_owner);
3466 retry:
3467         /*
3468          * Need exclusive access for the swap. If we're swapping these
3469          * datasets back after an error, we already hold the locks.
3470          */
3471         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3472                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3473         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3474             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3475                 rw_exit(&clone->ds_rwlock);
3476                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3477                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3478                         rw_exit(&origin_head->ds_rwlock);
3479                         goto retry;
3480                 }
3481         }
3482         csa.cds = clone;
3483         csa.ohds = origin_head;
3484         csa.force = force;
3485         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3486             dsl_dataset_clone_swap_check,
3487             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3488         return (error);
3489 }
3490 
3491 /*
3492  * Given a pool name and a dataset object number in that pool,
3493  * return the name of that dataset.
3494  */
3495 int
3496 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3497 {
3498         spa_t *spa;
3499         dsl_pool_t *dp;
3500         dsl_dataset_t *ds;
3501         int error;
3502 
3503         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3504                 return (error);
3505         dp = spa_get_dsl(spa);
3506         rw_enter(&dp->dp_config_rwlock, RW_READER);
3507         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3508                 dsl_dataset_name(ds, buf);
3509                 dsl_dataset_rele(ds, FTAG);
3510         }
3511         rw_exit(&dp->dp_config_rwlock);
3512         spa_close(spa, FTAG);
3513 
3514         return (error);
3515 }
3516 
3517 int
3518 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3519     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3520 {
3521         int error = 0;
3522 
3523         ASSERT3S(asize, >, 0);
3524 
3525         /*
3526          * *ref_rsrv is the portion of asize that will come from any
3527          * unconsumed refreservation space.
3528          */
3529         *ref_rsrv = 0;
3530 
3531         mutex_enter(&ds->ds_lock);
3532         /*
3533          * Make a space adjustment for reserved bytes.
3534          */
3535         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3536                 ASSERT3U(*used, >=,
3537                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3538                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3539                 *ref_rsrv =
3540                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3541         }
3542 
3543         if (!check_quota || ds->ds_quota == 0) {
3544                 mutex_exit(&ds->ds_lock);
3545                 return (0);
3546         }
3547         /*
3548          * If they are requesting more space, and our current estimate
3549          * is over quota, they get to try again unless the actual
3550          * on-disk is over quota and there are no pending changes (which
3551          * may free up space for us).
3552          */
3553         if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3554                 if (inflight > 0 ||
3555                     ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3556                         error = ERESTART;
3557                 else
3558                         error = EDQUOT;
3559         }
3560         mutex_exit(&ds->ds_lock);
3561 
3562         return (error);
3563 }
3564 
3565 /* ARGSUSED */
3566 static int
3567 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3568 {
3569         dsl_dataset_t *ds = arg1;
3570         dsl_prop_setarg_t *psa = arg2;
3571         int err;
3572 
3573         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3574                 return (ENOTSUP);
3575 
3576         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3577                 return (err);
3578 
3579         if (psa->psa_effective_value == 0)
3580                 return (0);
3581 
3582         if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3583             psa->psa_effective_value < ds->ds_reserved)
3584                 return (ENOSPC);
3585 
3586         return (0);
3587 }
3588 
3589 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3590 
3591 void
3592 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3593 {
3594         dsl_dataset_t *ds = arg1;
3595         dsl_prop_setarg_t *psa = arg2;
3596         uint64_t effective_value = psa->psa_effective_value;
3597 
3598         dsl_prop_set_sync(ds, psa, tx);
3599         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3600 
3601         if (ds->ds_quota != effective_value) {
3602                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3603                 ds->ds_quota = effective_value;
3604         }
3605 }
3606 
3607 int
3608 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3609 {
3610         dsl_dataset_t *ds;
3611         dsl_prop_setarg_t psa;
3612         int err;
3613 
3614         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3615 
3616         err = dsl_dataset_hold(dsname, FTAG, &ds);
3617         if (err)
3618                 return (err);
3619 
3620         /*
3621          * If someone removes a file, then tries to set the quota, we
3622          * want to make sure the file freeing takes effect.
3623          */
3624         txg_wait_open(ds->ds_dir->dd_pool, 0);
3625 
3626         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3627             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3628             ds, &psa, 0);
3629 
3630         dsl_dataset_rele(ds, FTAG);
3631         return (err);
3632 }
3633 
3634 static int
3635 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3636 {
3637         dsl_dataset_t *ds = arg1;
3638         dsl_prop_setarg_t *psa = arg2;
3639         uint64_t effective_value;
3640         uint64_t unique;
3641         int err;
3642 
3643         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3644             SPA_VERSION_REFRESERVATION)
3645                 return (ENOTSUP);
3646 
3647         if (dsl_dataset_is_snapshot(ds))
3648                 return (EINVAL);
3649 
3650         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3651                 return (err);
3652 
3653         effective_value = psa->psa_effective_value;
3654 
3655         /*
3656          * If we are doing the preliminary check in open context, the
3657          * space estimates may be inaccurate.
3658          */
3659         if (!dmu_tx_is_syncing(tx))
3660                 return (0);
3661 
3662         mutex_enter(&ds->ds_lock);
3663         if (!DS_UNIQUE_IS_ACCURATE(ds))
3664                 dsl_dataset_recalc_head_uniq(ds);
3665         unique = ds->ds_phys->ds_unique_bytes;
3666         mutex_exit(&ds->ds_lock);
3667 
3668         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3669                 uint64_t delta = MAX(unique, effective_value) -
3670                     MAX(unique, ds->ds_reserved);
3671 
3672                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3673                         return (ENOSPC);
3674                 if (ds->ds_quota > 0 &&
3675                     effective_value > ds->ds_quota)
3676                         return (ENOSPC);
3677         }
3678 
3679         return (0);
3680 }
3681 
3682 static void
3683 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3684 {
3685         dsl_dataset_t *ds = arg1;
3686         dsl_prop_setarg_t *psa = arg2;
3687         uint64_t effective_value = psa->psa_effective_value;
3688         uint64_t unique;
3689         int64_t delta;
3690 
3691         dsl_prop_set_sync(ds, psa, tx);
3692         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3693 
3694         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3695 
3696         mutex_enter(&ds->ds_dir->dd_lock);
3697         mutex_enter(&ds->ds_lock);
3698         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3699         unique = ds->ds_phys->ds_unique_bytes;
3700         delta = MAX(0, (int64_t)(effective_value - unique)) -
3701             MAX(0, (int64_t)(ds->ds_reserved - unique));
3702         ds->ds_reserved = effective_value;
3703         mutex_exit(&ds->ds_lock);
3704 
3705         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3706         mutex_exit(&ds->ds_dir->dd_lock);
3707 }
3708 
3709 int
3710 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3711     uint64_t reservation)
3712 {
3713         dsl_dataset_t *ds;
3714         dsl_prop_setarg_t psa;
3715         int err;
3716 
3717         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3718             &reservation);
3719 
3720         err = dsl_dataset_hold(dsname, FTAG, &ds);
3721         if (err)
3722                 return (err);
3723 
3724         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3725             dsl_dataset_set_reservation_check,
3726             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3727 
3728         dsl_dataset_rele(ds, FTAG);
3729         return (err);
3730 }
3731 
3732 typedef struct zfs_hold_cleanup_arg {
3733         dsl_pool_t *dp;
3734         uint64_t dsobj;
3735         char htag[MAXNAMELEN];
3736 } zfs_hold_cleanup_arg_t;
3737 
3738 static void
3739 dsl_dataset_user_release_onexit(void *arg)
3740 {
3741         zfs_hold_cleanup_arg_t *ca = arg;
3742 
3743         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3744             B_TRUE);
3745         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3746 }
3747 
3748 void
3749 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3750     minor_t minor)
3751 {
3752         zfs_hold_cleanup_arg_t *ca;
3753 
3754         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3755         ca->dp = ds->ds_dir->dd_pool;
3756         ca->dsobj = ds->ds_object;
3757         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3758         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3759             dsl_dataset_user_release_onexit, ca, NULL));
3760 }
3761 
3762 /*
3763  * If you add new checks here, you may need to add
3764  * additional checks to the "temporary" case in
3765  * snapshot_check() in dmu_objset.c.
3766  */
3767 static int
3768 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3769 {
3770         dsl_dataset_t *ds = arg1;
3771         struct dsl_ds_holdarg *ha = arg2;
3772         const char *htag = ha->htag;
3773         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3774         int error = 0;
3775 
3776         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3777                 return (ENOTSUP);
3778 
3779         if (!dsl_dataset_is_snapshot(ds))
3780                 return (EINVAL);
3781 
3782         /* tags must be unique */
3783         mutex_enter(&ds->ds_lock);
3784         if (ds->ds_phys->ds_userrefs_obj) {
3785                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3786                     8, 1, tx);
3787                 if (error == 0)
3788                         error = EEXIST;
3789                 else if (error == ENOENT)
3790                         error = 0;
3791         }
3792         mutex_exit(&ds->ds_lock);
3793 
3794         if (error == 0 && ha->temphold &&
3795             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3796                 error = E2BIG;
3797 
3798         return (error);
3799 }
3800 
3801 void
3802 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3803 {
3804         dsl_dataset_t *ds = arg1;
3805         struct dsl_ds_holdarg *ha = arg2;
3806         const char *htag = ha->htag;
3807         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3808         objset_t *mos = dp->dp_meta_objset;
3809         uint64_t now = gethrestime_sec();
3810         uint64_t zapobj;
3811 
3812         mutex_enter(&ds->ds_lock);
3813         if (ds->ds_phys->ds_userrefs_obj == 0) {
3814                 /*
3815                  * This is the first user hold for this dataset.  Create
3816                  * the userrefs zap object.
3817                  */
3818                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3819                 zapobj = ds->ds_phys->ds_userrefs_obj =
3820                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3821         } else {
3822                 zapobj = ds->ds_phys->ds_userrefs_obj;
3823         }
3824         ds->ds_userrefs++;
3825         mutex_exit(&ds->ds_lock);
3826 
3827         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3828 
3829         if (ha->temphold) {
3830                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3831                     htag, &now, tx));
3832         }
3833 
3834         spa_history_log_internal_ds(ds, "hold", tx,
3835             "tag = %s temp = %d holds now = %llu",
3836             htag, (int)ha->temphold, ds->ds_userrefs);
3837 }
3838 
3839 static int
3840 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3841 {
3842         struct dsl_ds_holdarg *ha = arg;
3843         dsl_dataset_t *ds;
3844         int error;
3845         char *name;
3846 
3847         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3848         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3849         error = dsl_dataset_hold(name, ha->dstg, &ds);
3850         strfree(name);
3851         if (error == 0) {
3852                 ha->gotone = B_TRUE;
3853                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3854                     dsl_dataset_user_hold_sync, ds, ha, 0);
3855         } else if (error == ENOENT && ha->recursive) {
3856                 error = 0;
3857         } else {
3858                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3859         }
3860         return (error);
3861 }
3862 
3863 int
3864 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3865     boolean_t temphold)
3866 {
3867         struct dsl_ds_holdarg *ha;
3868         int error;
3869 
3870         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3871         ha->htag = htag;
3872         ha->temphold = temphold;
3873         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3874             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3875             ds, ha, 0);
3876         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3877 
3878         return (error);
3879 }
3880 
3881 int
3882 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3883     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3884 {
3885         struct dsl_ds_holdarg *ha;
3886         dsl_sync_task_t *dst;
3887         spa_t *spa;
3888         int error;
3889         minor_t minor = 0;
3890 
3891         if (cleanup_fd != -1) {
3892                 /* Currently we only support cleanup-on-exit of tempholds. */
3893                 if (!temphold)
3894                         return (EINVAL);
3895                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3896                 if (error)
3897                         return (error);
3898         }
3899 
3900         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3901 
3902         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3903 
3904         error = spa_open(dsname, &spa, FTAG);
3905         if (error) {
3906                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3907                 if (cleanup_fd != -1)
3908                         zfs_onexit_fd_rele(cleanup_fd);
3909                 return (error);
3910         }
3911 
3912         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3913         ha->htag = htag;
3914         ha->snapname = snapname;
3915         ha->recursive = recursive;
3916         ha->temphold = temphold;
3917 
3918         if (recursive) {
3919                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3920                     ha, DS_FIND_CHILDREN);
3921         } else {
3922                 error = dsl_dataset_user_hold_one(dsname, ha);
3923         }
3924         if (error == 0)
3925                 error = dsl_sync_task_group_wait(ha->dstg);
3926 
3927         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3928             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3929                 dsl_dataset_t *ds = dst->dst_arg1;
3930 
3931                 if (dst->dst_err) {
3932                         dsl_dataset_name(ds, ha->failed);
3933                         *strchr(ha->failed, '@') = '\0';
3934                 } else if (error == 0 && minor != 0 && temphold) {
3935                         /*
3936                          * If this hold is to be released upon process exit,
3937                          * register that action now.
3938                          */
3939                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3940                 }
3941                 dsl_dataset_rele(ds, ha->dstg);
3942         }
3943 
3944         if (error == 0 && recursive && !ha->gotone)
3945                 error = ENOENT;
3946 
3947         if (error)
3948                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3949 
3950         dsl_sync_task_group_destroy(ha->dstg);
3951 
3952         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3953         spa_close(spa, FTAG);
3954         if (cleanup_fd != -1)
3955                 zfs_onexit_fd_rele(cleanup_fd);
3956         return (error);
3957 }
3958 
3959 struct dsl_ds_releasearg {
3960         dsl_dataset_t *ds;
3961         const char *htag;
3962         boolean_t own;          /* do we own or just hold ds? */
3963 };
3964 
3965 static int
3966 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3967     boolean_t *might_destroy)
3968 {
3969         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3970         uint64_t zapobj;
3971         uint64_t tmp;
3972         int error;
3973 
3974         *might_destroy = B_FALSE;
3975 
3976         mutex_enter(&ds->ds_lock);
3977         zapobj = ds->ds_phys->ds_userrefs_obj;
3978         if (zapobj == 0) {
3979                 /* The tag can't possibly exist */
3980                 mutex_exit(&ds->ds_lock);
3981                 return (ESRCH);
3982         }
3983 
3984         /* Make sure the tag exists */
3985         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3986         if (error) {
3987                 mutex_exit(&ds->ds_lock);
3988                 if (error == ENOENT)
3989                         error = ESRCH;
3990                 return (error);
3991         }
3992 
3993         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3994             DS_IS_DEFER_DESTROY(ds))
3995                 *might_destroy = B_TRUE;
3996 
3997         mutex_exit(&ds->ds_lock);
3998         return (0);
3999 }
4000 
4001 static int
4002 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
4003 {
4004         struct dsl_ds_releasearg *ra = arg1;
4005         dsl_dataset_t *ds = ra->ds;
4006         boolean_t might_destroy;
4007         int error;
4008 
4009         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
4010                 return (ENOTSUP);
4011 
4012         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
4013         if (error)
4014                 return (error);
4015 
4016         if (might_destroy) {
4017                 struct dsl_ds_destroyarg dsda = {0};
4018 
4019                 if (dmu_tx_is_syncing(tx)) {
4020                         /*
4021                          * If we're not prepared to remove the snapshot,
4022                          * we can't allow the release to happen right now.
4023                          */
4024                         if (!ra->own)
4025                                 return (EBUSY);
4026                 }
4027                 dsda.ds = ds;
4028                 dsda.releasing = B_TRUE;
4029                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
4030         }
4031 
4032         return (0);
4033 }
4034 
4035 static void
4036 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
4037 {
4038         struct dsl_ds_releasearg *ra = arg1;
4039         dsl_dataset_t *ds = ra->ds;
4040         dsl_pool_t *dp = ds->ds_dir->dd_pool;
4041         objset_t *mos = dp->dp_meta_objset;
4042         uint64_t zapobj;
4043         uint64_t refs;
4044         int error;
4045 
4046         mutex_enter(&ds->ds_lock);
4047         ds->ds_userrefs--;
4048         refs = ds->ds_userrefs;
4049         mutex_exit(&ds->ds_lock);
4050         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
4051         VERIFY(error == 0 || error == ENOENT);
4052         zapobj = ds->ds_phys->ds_userrefs_obj;
4053         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
4054 
4055         spa_history_log_internal_ds(ds, "release", tx,
4056             "tag = %s refs now = %lld", ra->htag, (longlong_t)refs);
4057 
4058         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
4059             DS_IS_DEFER_DESTROY(ds)) {
4060                 struct dsl_ds_destroyarg dsda = {0};
4061 
4062                 ASSERT(ra->own);
4063                 dsda.ds = ds;
4064                 dsda.releasing = B_TRUE;
4065                 /* We already did the destroy_check */
4066                 dsl_dataset_destroy_sync(&dsda, tag, tx);
4067         }
4068 }
4069 
4070 static int
4071 dsl_dataset_user_release_one(const char *dsname, void *arg)
4072 {
4073         struct dsl_ds_holdarg *ha = arg;
4074         struct dsl_ds_releasearg *ra;
4075         dsl_dataset_t *ds;
4076         int error;
4077         void *dtag = ha->dstg;
4078         char *name;
4079         boolean_t own = B_FALSE;
4080         boolean_t might_destroy;
4081 
4082         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
4083         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4084         error = dsl_dataset_hold(name, dtag, &ds);
4085         strfree(name);
4086         if (error == ENOENT && ha->recursive)
4087                 return (0);
4088         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4089         if (error)
4090                 return (error);
4091 
4092         ha->gotone = B_TRUE;
4093 
4094         ASSERT(dsl_dataset_is_snapshot(ds));
4095 
4096         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
4097         if (error) {
4098                 dsl_dataset_rele(ds, dtag);
4099                 return (error);
4100         }
4101 
4102         if (might_destroy) {
4103 #ifdef _KERNEL
4104                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4105                 error = zfs_unmount_snap(name, NULL);
4106                 strfree(name);
4107                 if (error) {
4108                         dsl_dataset_rele(ds, dtag);
4109                         return (error);
4110                 }
4111 #endif
4112                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
4113                         dsl_dataset_rele(ds, dtag);
4114                         return (EBUSY);
4115                 } else {
4116                         own = B_TRUE;
4117                         dsl_dataset_make_exclusive(ds, dtag);
4118                 }
4119         }
4120 
4121         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
4122         ra->ds = ds;
4123         ra->htag = ha->htag;
4124         ra->own = own;
4125         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
4126             dsl_dataset_user_release_sync, ra, dtag, 0);
4127 
4128         return (0);
4129 }
4130 
4131 int
4132 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
4133     boolean_t recursive)
4134 {
4135         struct dsl_ds_holdarg *ha;
4136         dsl_sync_task_t *dst;
4137         spa_t *spa;
4138         int error;
4139 
4140 top:
4141         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4142 
4143         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4144 
4145         error = spa_open(dsname, &spa, FTAG);
4146         if (error) {
4147                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4148                 return (error);
4149         }
4150 
4151         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4152         ha->htag = htag;
4153         ha->snapname = snapname;
4154         ha->recursive = recursive;
4155         if (recursive) {
4156                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4157                     ha, DS_FIND_CHILDREN);
4158         } else {
4159                 error = dsl_dataset_user_release_one(dsname, ha);
4160         }
4161         if (error == 0)
4162                 error = dsl_sync_task_group_wait(ha->dstg);
4163 
4164         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4165             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4166                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
4167                 dsl_dataset_t *ds = ra->ds;
4168 
4169                 if (dst->dst_err)
4170                         dsl_dataset_name(ds, ha->failed);
4171 
4172                 if (ra->own)
4173                         dsl_dataset_disown(ds, ha->dstg);
4174                 else
4175                         dsl_dataset_rele(ds, ha->dstg);
4176 
4177                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4178         }
4179 
4180         if (error == 0 && recursive && !ha->gotone)
4181                 error = ENOENT;
4182 
4183         if (error && error != EBUSY)
4184                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4185 
4186         dsl_sync_task_group_destroy(ha->dstg);
4187         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4188         spa_close(spa, FTAG);
4189 
4190         /*
4191          * We can get EBUSY if we were racing with deferred destroy and
4192          * dsl_dataset_user_release_check() hadn't done the necessary
4193          * open context setup.  We can also get EBUSY if we're racing
4194          * with destroy and that thread is the ds_owner.  Either way
4195          * the busy condition should be transient, and we should retry
4196          * the release operation.
4197          */
4198         if (error == EBUSY)
4199                 goto top;
4200 
4201         return (error);
4202 }
4203 
4204 /*
4205  * Called at spa_load time (with retry == B_FALSE) to release a stale
4206  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4207  */
4208 int
4209 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4210     boolean_t retry)
4211 {
4212         dsl_dataset_t *ds;
4213         char *snap;
4214         char *name;
4215         int namelen;
4216         int error;
4217 
4218         do {
4219                 rw_enter(&dp->dp_config_rwlock, RW_READER);
4220                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4221                 rw_exit(&dp->dp_config_rwlock);
4222                 if (error)
4223                         return (error);
4224                 namelen = dsl_dataset_namelen(ds)+1;
4225                 name = kmem_alloc(namelen, KM_SLEEP);
4226                 dsl_dataset_name(ds, name);
4227                 dsl_dataset_rele(ds, FTAG);
4228 
4229                 snap = strchr(name, '@');
4230                 *snap = '\0';
4231                 ++snap;
4232                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4233                 kmem_free(name, namelen);
4234 
4235                 /*
4236                  * The object can't have been destroyed because we have a hold,
4237                  * but it might have been renamed, resulting in ENOENT.  Retry
4238                  * if we've been requested to do so.
4239                  *
4240                  * It would be nice if we could use the dsobj all the way
4241                  * through and avoid ENOENT entirely.  But we might need to
4242                  * unmount the snapshot, and there's currently no way to lookup
4243                  * a vfsp using a ZFS object id.
4244                  */
4245         } while ((error == ENOENT) && retry);
4246 
4247         return (error);
4248 }
4249 
4250 int
4251 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4252 {
4253         dsl_dataset_t *ds;
4254         int err;
4255 
4256         err = dsl_dataset_hold(dsname, FTAG, &ds);
4257         if (err)
4258                 return (err);
4259 
4260         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4261         if (ds->ds_phys->ds_userrefs_obj != 0) {
4262                 zap_attribute_t *za;
4263                 zap_cursor_t zc;
4264 
4265                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4266                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4267                     ds->ds_phys->ds_userrefs_obj);
4268                     zap_cursor_retrieve(&zc, za) == 0;
4269                     zap_cursor_advance(&zc)) {
4270                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4271                             za->za_first_integer));
4272                 }
4273                 zap_cursor_fini(&zc);
4274                 kmem_free(za, sizeof (zap_attribute_t));
4275         }
4276         dsl_dataset_rele(ds, FTAG);
4277         return (0);
4278 }
4279 
4280 /*
4281  * Note, this function is used as the callback for dmu_objset_find().  We
4282  * always return 0 so that we will continue to find and process
4283  * inconsistent datasets, even if we encounter an error trying to
4284  * process one of them.
4285  */
4286 /* ARGSUSED */
4287 int
4288 dsl_destroy_inconsistent(const char *dsname, void *arg)
4289 {
4290         dsl_dataset_t *ds;
4291 
4292         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4293                 if (DS_IS_INCONSISTENT(ds))
4294                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4295                 else
4296                         dsl_dataset_disown(ds, FTAG);
4297         }
4298         return (0);
4299 }
4300 
4301 /*
4302  * Return (in *usedp) the amount of space written in new that is not
4303  * present in oldsnap.  New may be a snapshot or the head.  Old must be
4304  * a snapshot before new, in new's filesystem (or its origin).  If not then
4305  * fail and return EINVAL.
4306  *
4307  * The written space is calculated by considering two components:  First, we
4308  * ignore any freed space, and calculate the written as new's used space
4309  * minus old's used space.  Next, we add in the amount of space that was freed
4310  * between the two snapshots, thus reducing new's used space relative to old's.
4311  * Specifically, this is the space that was born before old->ds_creation_txg,
4312  * and freed before new (ie. on new's deadlist or a previous deadlist).
4313  *
4314  * space freed                         [---------------------]
4315  * snapshots                       ---O-------O--------O-------O------
4316  *                                         oldsnap            new
4317  */
4318 int
4319 dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4320     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4321 {
4322         int err = 0;
4323         uint64_t snapobj;
4324         dsl_pool_t *dp = new->ds_dir->dd_pool;
4325 
4326         *usedp = 0;
4327         *usedp += new->ds_phys->ds_referenced_bytes;
4328         *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4329 
4330         *compp = 0;
4331         *compp += new->ds_phys->ds_compressed_bytes;
4332         *compp -= oldsnap->ds_phys->ds_compressed_bytes;
4333 
4334         *uncompp = 0;
4335         *uncompp += new->ds_phys->ds_uncompressed_bytes;
4336         *uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4337 
4338         rw_enter(&dp->dp_config_rwlock, RW_READER);
4339         snapobj = new->ds_object;
4340         while (snapobj != oldsnap->ds_object) {
4341                 dsl_dataset_t *snap;
4342                 uint64_t used, comp, uncomp;
4343 
4344                 if (snapobj == new->ds_object) {
4345                         snap = new;
4346                 } else {
4347                         err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4348                         if (err != 0)
4349                                 break;
4350                 }
4351 
4352                 if (snap->ds_phys->ds_prev_snap_txg ==
4353                     oldsnap->ds_phys->ds_creation_txg) {
4354                         /*
4355                          * The blocks in the deadlist can not be born after
4356                          * ds_prev_snap_txg, so get the whole deadlist space,
4357                          * which is more efficient (especially for old-format
4358                          * deadlists).  Unfortunately the deadlist code
4359                          * doesn't have enough information to make this
4360                          * optimization itself.
4361                          */
4362                         dsl_deadlist_space(&snap->ds_deadlist,
4363                             &used, &comp, &uncomp);
4364                 } else {
4365                         dsl_deadlist_space_range(&snap->ds_deadlist,
4366                             0, oldsnap->ds_phys->ds_creation_txg,
4367                             &used, &comp, &uncomp);
4368                 }
4369                 *usedp += used;
4370                 *compp += comp;
4371                 *uncompp += uncomp;
4372 
4373                 /*
4374                  * If we get to the beginning of the chain of snapshots
4375                  * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4376                  * was not a snapshot of/before new.
4377                  */
4378                 snapobj = snap->ds_phys->ds_prev_snap_obj;
4379                 if (snap != new)
4380                         dsl_dataset_rele(snap, FTAG);
4381                 if (snapobj == 0) {
4382                         err = EINVAL;
4383                         break;
4384                 }
4385 
4386         }
4387         rw_exit(&dp->dp_config_rwlock);
4388         return (err);
4389 }
4390 
4391 /*
4392  * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4393  * lastsnap, and all snapshots in between are deleted.
4394  *
4395  * blocks that would be freed            [---------------------------]
4396  * snapshots                       ---O-------O--------O-------O--------O
4397  *                                        firstsnap        lastsnap
4398  *
4399  * This is the set of blocks that were born after the snap before firstsnap,
4400  * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4401  * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4402  * We calculate this by iterating over the relevant deadlists (from the snap
4403  * after lastsnap, backward to the snap after firstsnap), summing up the
4404  * space on the deadlist that was born after the snap before firstsnap.
4405  */
4406 int
4407 dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4408     dsl_dataset_t *lastsnap,
4409     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4410 {
4411         int err = 0;
4412         uint64_t snapobj;
4413         dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4414 
4415         ASSERT(dsl_dataset_is_snapshot(firstsnap));
4416         ASSERT(dsl_dataset_is_snapshot(lastsnap));
4417 
4418         /*
4419          * Check that the snapshots are in the same dsl_dir, and firstsnap
4420          * is before lastsnap.
4421          */
4422         if (firstsnap->ds_dir != lastsnap->ds_dir ||
4423             firstsnap->ds_phys->ds_creation_txg >
4424             lastsnap->ds_phys->ds_creation_txg)
4425                 return (EINVAL);
4426 
4427         *usedp = *compp = *uncompp = 0;
4428 
4429         rw_enter(&dp->dp_config_rwlock, RW_READER);
4430         snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4431         while (snapobj != firstsnap->ds_object) {
4432                 dsl_dataset_t *ds;
4433                 uint64_t used, comp, uncomp;
4434 
4435                 err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4436                 if (err != 0)
4437                         break;
4438 
4439                 dsl_deadlist_space_range(&ds->ds_deadlist,
4440                     firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4441                     &used, &comp, &uncomp);
4442                 *usedp += used;
4443                 *compp += comp;
4444                 *uncompp += uncomp;
4445 
4446                 snapobj = ds->ds_phys->ds_prev_snap_obj;
4447                 ASSERT3U(snapobj, !=, 0);
4448                 dsl_dataset_rele(ds, FTAG);
4449         }
4450         rw_exit(&dp->dp_config_rwlock);
4451         return (err);
4452 }