1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  25  */
  26 
  27 #include <sys/dmu_objset.h>
  28 #include <sys/dsl_dataset.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_prop.h>
  31 #include <sys/dsl_synctask.h>
  32 #include <sys/dmu_traverse.h>
  33 #include <sys/dmu_impl.h>
  34 #include <sys/dmu_tx.h>
  35 #include <sys/arc.h>
  36 #include <sys/zio.h>
  37 #include <sys/zap.h>
  38 #include <sys/zfeature.h>
  39 #include <sys/unique.h>
  40 #include <sys/zfs_context.h>
  41 #include <sys/zfs_ioctl.h>
  42 #include <sys/spa.h>
  43 #include <sys/zfs_znode.h>
  44 #include <sys/zfs_onexit.h>
  45 #include <sys/zvol.h>
  46 #include <sys/dsl_scan.h>
  47 #include <sys/dsl_deadlist.h>
  48 #include "zfs_prop.h"
  49 
  50 static char *dsl_reaper = "the grim reaper";
  51 
  52 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
  53 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
  54 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
  55 
  56 #define SWITCH64(x, y) \
  57         { \
  58                 uint64_t __tmp = (x); \
  59                 (x) = (y); \
  60                 (y) = __tmp; \
  61         }
  62 
  63 #define DS_REF_MAX      (1ULL << 62)
  64 
  65 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
  66 
  67 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
  68 
  69 
  70 /*
  71  * Figure out how much of this delta should be propogated to the dsl_dir
  72  * layer.  If there's a refreservation, that space has already been
  73  * partially accounted for in our ancestors.
  74  */
  75 static int64_t
  76 parent_delta(dsl_dataset_t *ds, int64_t delta)
  77 {
  78         uint64_t old_bytes, new_bytes;
  79 
  80         if (ds->ds_reserved == 0)
  81                 return (delta);
  82 
  83         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
  84         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
  85 
  86         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
  87         return (new_bytes - old_bytes);
  88 }
  89 
  90 void
  91 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
  92 {
  93         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
  94         int compressed = BP_GET_PSIZE(bp);
  95         int uncompressed = BP_GET_UCSIZE(bp);
  96         int64_t delta;
  97 
  98         dprintf_bp(bp, "ds=%p", ds);
  99 
 100         ASSERT(dmu_tx_is_syncing(tx));
 101         /* It could have been compressed away to nothing */
 102         if (BP_IS_HOLE(bp))
 103                 return;
 104         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
 105         ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
 106         if (ds == NULL) {
 107                 dsl_pool_mos_diduse_space(tx->tx_pool,
 108                     used, compressed, uncompressed);
 109                 return;
 110         }
 111         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 112 
 113         mutex_enter(&ds->ds_dir->dd_lock);
 114         mutex_enter(&ds->ds_lock);
 115         delta = parent_delta(ds, used);
 116         ds->ds_phys->ds_referenced_bytes += used;
 117         ds->ds_phys->ds_compressed_bytes += compressed;
 118         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
 119         ds->ds_phys->ds_unique_bytes += used;
 120         mutex_exit(&ds->ds_lock);
 121         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
 122             compressed, uncompressed, tx);
 123         dsl_dir_transfer_space(ds->ds_dir, used - delta,
 124             DD_USED_REFRSRV, DD_USED_HEAD, tx);
 125         mutex_exit(&ds->ds_dir->dd_lock);
 126 }
 127 
 128 int
 129 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
 130     boolean_t async)
 131 {
 132         if (BP_IS_HOLE(bp))
 133                 return (0);
 134 
 135         ASSERT(dmu_tx_is_syncing(tx));
 136         ASSERT(bp->blk_birth <= tx->tx_txg);
 137 
 138         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
 139         int compressed = BP_GET_PSIZE(bp);
 140         int uncompressed = BP_GET_UCSIZE(bp);
 141 
 142         ASSERT(used > 0);
 143         if (ds == NULL) {
 144                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 145                 dsl_pool_mos_diduse_space(tx->tx_pool,
 146                     -used, -compressed, -uncompressed);
 147                 return (used);
 148         }
 149         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
 150 
 151         ASSERT(!dsl_dataset_is_snapshot(ds));
 152         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 153 
 154         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
 155                 int64_t delta;
 156 
 157                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
 158                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 159 
 160                 mutex_enter(&ds->ds_dir->dd_lock);
 161                 mutex_enter(&ds->ds_lock);
 162                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
 163                     !DS_UNIQUE_IS_ACCURATE(ds));
 164                 delta = parent_delta(ds, -used);
 165                 ds->ds_phys->ds_unique_bytes -= used;
 166                 mutex_exit(&ds->ds_lock);
 167                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
 168                     delta, -compressed, -uncompressed, tx);
 169                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
 170                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
 171                 mutex_exit(&ds->ds_dir->dd_lock);
 172         } else {
 173                 dprintf_bp(bp, "putting on dead list: %s", "");
 174                 if (async) {
 175                         /*
 176                          * We are here as part of zio's write done callback,
 177                          * which means we're a zio interrupt thread.  We can't
 178                          * call dsl_deadlist_insert() now because it may block
 179                          * waiting for I/O.  Instead, put bp on the deferred
 180                          * queue and let dsl_pool_sync() finish the job.
 181                          */
 182                         bplist_append(&ds->ds_pending_deadlist, bp);
 183                 } else {
 184                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
 185                 }
 186                 ASSERT3U(ds->ds_prev->ds_object, ==,
 187                     ds->ds_phys->ds_prev_snap_obj);
 188                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
 189                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
 190                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
 191                     ds->ds_object && bp->blk_birth >
 192                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
 193                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
 194                         mutex_enter(&ds->ds_prev->ds_lock);
 195                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
 196                         mutex_exit(&ds->ds_prev->ds_lock);
 197                 }
 198                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
 199                         dsl_dir_transfer_space(ds->ds_dir, used,
 200                             DD_USED_HEAD, DD_USED_SNAP, tx);
 201                 }
 202         }
 203         mutex_enter(&ds->ds_lock);
 204         ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
 205         ds->ds_phys->ds_referenced_bytes -= used;
 206         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
 207         ds->ds_phys->ds_compressed_bytes -= compressed;
 208         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
 209         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
 210         mutex_exit(&ds->ds_lock);
 211 
 212         return (used);
 213 }
 214 
 215 uint64_t
 216 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
 217 {
 218         uint64_t trysnap = 0;
 219 
 220         if (ds == NULL)
 221                 return (0);
 222         /*
 223          * The snapshot creation could fail, but that would cause an
 224          * incorrect FALSE return, which would only result in an
 225          * overestimation of the amount of space that an operation would
 226          * consume, which is OK.
 227          *
 228          * There's also a small window where we could miss a pending
 229          * snapshot, because we could set the sync task in the quiescing
 230          * phase.  So this should only be used as a guess.
 231          */
 232         if (ds->ds_trysnap_txg >
 233             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
 234                 trysnap = ds->ds_trysnap_txg;
 235         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
 236 }
 237 
 238 boolean_t
 239 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
 240     uint64_t blk_birth)
 241 {
 242         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
 243                 return (B_FALSE);
 244 
 245         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
 246 
 247         return (B_TRUE);
 248 }
 249 
 250 /* ARGSUSED */
 251 static void
 252 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
 253 {
 254         dsl_dataset_t *ds = dsv;
 255 
 256         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 257 
 258         unique_remove(ds->ds_fsid_guid);
 259 
 260         if (ds->ds_objset != NULL)
 261                 dmu_objset_evict(ds->ds_objset);
 262 
 263         if (ds->ds_prev) {
 264                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 265                 ds->ds_prev = NULL;
 266         }
 267 
 268         bplist_destroy(&ds->ds_pending_deadlist);
 269         if (db != NULL) {
 270                 dsl_deadlist_close(&ds->ds_deadlist);
 271         } else {
 272                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
 273                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
 274         }
 275         if (ds->ds_dir)
 276                 dsl_dir_close(ds->ds_dir, ds);
 277 
 278         ASSERT(!list_link_active(&ds->ds_synced_link));
 279 
 280         mutex_destroy(&ds->ds_lock);
 281         mutex_destroy(&ds->ds_recvlock);
 282         mutex_destroy(&ds->ds_opening_lock);
 283         rw_destroy(&ds->ds_rwlock);
 284         cv_destroy(&ds->ds_exclusive_cv);
 285 
 286         kmem_free(ds, sizeof (dsl_dataset_t));
 287 }
 288 
 289 static int
 290 dsl_dataset_get_snapname(dsl_dataset_t *ds)
 291 {
 292         dsl_dataset_phys_t *headphys;
 293         int err;
 294         dmu_buf_t *headdbuf;
 295         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 296         objset_t *mos = dp->dp_meta_objset;
 297 
 298         if (ds->ds_snapname[0])
 299                 return (0);
 300         if (ds->ds_phys->ds_next_snap_obj == 0)
 301                 return (0);
 302 
 303         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
 304             FTAG, &headdbuf);
 305         if (err)
 306                 return (err);
 307         headphys = headdbuf->db_data;
 308         err = zap_value_search(dp->dp_meta_objset,
 309             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
 310         dmu_buf_rele(headdbuf, FTAG);
 311         return (err);
 312 }
 313 
 314 static int
 315 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 316 {
 317         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 318         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 319         matchtype_t mt;
 320         int err;
 321 
 322         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 323                 mt = MT_FIRST;
 324         else
 325                 mt = MT_EXACT;
 326 
 327         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 328             value, mt, NULL, 0, NULL);
 329         if (err == ENOTSUP && mt == MT_FIRST)
 330                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
 331         return (err);
 332 }
 333 
 334 static int
 335 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx,
 336     boolean_t adj_cnt)
 337 {
 338         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 339         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 340         matchtype_t mt;
 341         int err;
 342 
 343         dsl_dir_snap_cmtime_update(ds->ds_dir);
 344 
 345         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 346                 mt = MT_FIRST;
 347         else
 348                 mt = MT_EXACT;
 349 
 350         err = zap_remove_norm(mos, snapobj, name, mt, tx);
 351         if (err == ENOTSUP && mt == MT_FIRST)
 352                 err = zap_remove(mos, snapobj, name, tx);
 353 
 354         if (err == 0 && adj_cnt)
 355                 dsl_snapcount_adjust(ds->ds_dir, tx, -1, B_TRUE);
 356 
 357         return (err);
 358 }
 359 
 360 static int
 361 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 362     dsl_dataset_t **dsp)
 363 {
 364         objset_t *mos = dp->dp_meta_objset;
 365         dmu_buf_t *dbuf;
 366         dsl_dataset_t *ds;
 367         int err;
 368         dmu_object_info_t doi;
 369 
 370         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
 371             dsl_pool_sync_context(dp));
 372 
 373         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
 374         if (err)
 375                 return (err);
 376 
 377         /* Make sure dsobj has the correct object type. */
 378         dmu_object_info_from_db(dbuf, &doi);
 379         if (doi.doi_type != DMU_OT_DSL_DATASET)
 380                 return (EINVAL);
 381 
 382         ds = dmu_buf_get_user(dbuf);
 383         if (ds == NULL) {
 384                 dsl_dataset_t *winner;
 385 
 386                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
 387                 ds->ds_dbuf = dbuf;
 388                 ds->ds_object = dsobj;
 389                 ds->ds_phys = dbuf->db_data;
 390 
 391                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
 392                 mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
 393                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 394                 mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
 395 
 396                 rw_init(&ds->ds_rwlock, 0, 0, 0);
 397                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 398 
 399                 bplist_create(&ds->ds_pending_deadlist);
 400                 dsl_deadlist_open(&ds->ds_deadlist,
 401                     mos, ds->ds_phys->ds_deadlist_obj);
 402 
 403                 list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
 404                     offsetof(dmu_sendarg_t, dsa_link));
 405 
 406                 if (err == 0) {
 407                         err = dsl_dir_open_obj(dp,
 408                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
 409                 }
 410                 if (err) {
 411                         mutex_destroy(&ds->ds_lock);
 412                         mutex_destroy(&ds->ds_recvlock);
 413                         mutex_destroy(&ds->ds_opening_lock);
 414                         rw_destroy(&ds->ds_rwlock);
 415                         cv_destroy(&ds->ds_exclusive_cv);
 416                         bplist_destroy(&ds->ds_pending_deadlist);
 417                         dsl_deadlist_close(&ds->ds_deadlist);
 418                         kmem_free(ds, sizeof (dsl_dataset_t));
 419                         dmu_buf_rele(dbuf, tag);
 420                         return (err);
 421                 }
 422 
 423                 if (!dsl_dataset_is_snapshot(ds)) {
 424                         ds->ds_snapname[0] = '\0';
 425                         if (ds->ds_phys->ds_prev_snap_obj) {
 426                                 err = dsl_dataset_get_ref(dp,
 427                                     ds->ds_phys->ds_prev_snap_obj,
 428                                     ds, &ds->ds_prev);
 429                         }
 430                 } else {
 431                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
 432                                 err = dsl_dataset_get_snapname(ds);
 433                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
 434                                 err = zap_count(
 435                                     ds->ds_dir->dd_pool->dp_meta_objset,
 436                                     ds->ds_phys->ds_userrefs_obj,
 437                                     &ds->ds_userrefs);
 438                         }
 439                 }
 440 
 441                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
 442                         /*
 443                          * In sync context, we're called with either no lock
 444                          * or with the write lock.  If we're not syncing,
 445                          * we're always called with the read lock held.
 446                          */
 447                         boolean_t need_lock =
 448                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
 449                             dsl_pool_sync_context(dp);
 450 
 451                         if (need_lock)
 452                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 453 
 454                         err = dsl_prop_get_ds(ds,
 455                             "refreservation", sizeof (uint64_t), 1,
 456                             &ds->ds_reserved, NULL);
 457                         if (err == 0) {
 458                                 err = dsl_prop_get_ds(ds,
 459                                     "refquota", sizeof (uint64_t), 1,
 460                                     &ds->ds_quota, NULL);
 461                         }
 462 
 463                         if (need_lock)
 464                                 rw_exit(&dp->dp_config_rwlock);
 465                 } else {
 466                         ds->ds_reserved = ds->ds_quota = 0;
 467                 }
 468 
 469                 if (err == 0) {
 470                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
 471                             dsl_dataset_evict);
 472                 }
 473                 if (err || winner) {
 474                         bplist_destroy(&ds->ds_pending_deadlist);
 475                         dsl_deadlist_close(&ds->ds_deadlist);
 476                         if (ds->ds_prev)
 477                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 478                         dsl_dir_close(ds->ds_dir, ds);
 479                         mutex_destroy(&ds->ds_lock);
 480                         mutex_destroy(&ds->ds_recvlock);
 481                         mutex_destroy(&ds->ds_opening_lock);
 482                         rw_destroy(&ds->ds_rwlock);
 483                         cv_destroy(&ds->ds_exclusive_cv);
 484                         kmem_free(ds, sizeof (dsl_dataset_t));
 485                         if (err) {
 486                                 dmu_buf_rele(dbuf, tag);
 487                                 return (err);
 488                         }
 489                         ds = winner;
 490                 } else {
 491                         ds->ds_fsid_guid =
 492                             unique_insert(ds->ds_phys->ds_fsid_guid);
 493                 }
 494         }
 495         ASSERT3P(ds->ds_dbuf, ==, dbuf);
 496         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
 497         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
 498             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
 499             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
 500         mutex_enter(&ds->ds_lock);
 501         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 502                 mutex_exit(&ds->ds_lock);
 503                 dmu_buf_rele(ds->ds_dbuf, tag);
 504                 return (ENOENT);
 505         }
 506         mutex_exit(&ds->ds_lock);
 507         *dsp = ds;
 508         return (0);
 509 }
 510 
 511 static int
 512 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
 513 {
 514         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 515 
 516         /*
 517          * In syncing context we don't want the rwlock lock: there
 518          * may be an existing writer waiting for sync phase to
 519          * finish.  We don't need to worry about such writers, since
 520          * sync phase is single-threaded, so the writer can't be
 521          * doing anything while we are active.
 522          */
 523         if (dsl_pool_sync_context(dp)) {
 524                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 525                 return (0);
 526         }
 527 
 528         /*
 529          * Normal users will hold the ds_rwlock as a READER until they
 530          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
 531          * drop their READER lock after they set the ds_owner field.
 532          *
 533          * If the dataset is being destroyed, the destroy thread will
 534          * obtain a WRITER lock for exclusive access after it's done its
 535          * open-context work and then change the ds_owner to
 536          * dsl_reaper once destruction is assured.  So threads
 537          * may block here temporarily, until the "destructability" of
 538          * the dataset is determined.
 539          */
 540         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
 541         mutex_enter(&ds->ds_lock);
 542         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
 543                 rw_exit(&dp->dp_config_rwlock);
 544                 cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
 545                 if (DSL_DATASET_IS_DESTROYED(ds)) {
 546                         mutex_exit(&ds->ds_lock);
 547                         dsl_dataset_drop_ref(ds, tag);
 548                         rw_enter(&dp->dp_config_rwlock, RW_READER);
 549                         return (ENOENT);
 550                 }
 551                 /*
 552                  * The dp_config_rwlock lives above the ds_lock. And
 553                  * we need to check DSL_DATASET_IS_DESTROYED() while
 554                  * holding the ds_lock, so we have to drop and reacquire
 555                  * the ds_lock here.
 556                  */
 557                 mutex_exit(&ds->ds_lock);
 558                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 559                 mutex_enter(&ds->ds_lock);
 560         }
 561         mutex_exit(&ds->ds_lock);
 562         return (0);
 563 }
 564 
 565 int
 566 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 567     dsl_dataset_t **dsp)
 568 {
 569         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
 570 
 571         if (err)
 572                 return (err);
 573         return (dsl_dataset_hold_ref(*dsp, tag));
 574 }
 575 
 576 int
 577 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
 578     void *tag, dsl_dataset_t **dsp)
 579 {
 580         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
 581         if (err)
 582                 return (err);
 583         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 584                 dsl_dataset_rele(*dsp, tag);
 585                 *dsp = NULL;
 586                 return (EBUSY);
 587         }
 588         return (0);
 589 }
 590 
 591 int
 592 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 593 {
 594         dsl_dir_t *dd;
 595         dsl_pool_t *dp;
 596         const char *snapname;
 597         uint64_t obj;
 598         int err = 0;
 599 
 600         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 601         if (err)
 602                 return (err);
 603 
 604         dp = dd->dd_pool;
 605         obj = dd->dd_phys->dd_head_dataset_obj;
 606         rw_enter(&dp->dp_config_rwlock, RW_READER);
 607         if (obj)
 608                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
 609         else
 610                 err = ENOENT;
 611         if (err)
 612                 goto out;
 613 
 614         err = dsl_dataset_hold_ref(*dsp, tag);
 615 
 616         /* we may be looking for a snapshot */
 617         if (err == 0 && snapname != NULL) {
 618                 dsl_dataset_t *ds = NULL;
 619 
 620                 if (*snapname++ != '@') {
 621                         dsl_dataset_rele(*dsp, tag);
 622                         err = ENOENT;
 623                         goto out;
 624                 }
 625 
 626                 dprintf("looking for snapshot '%s'\n", snapname);
 627                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
 628                 if (err == 0)
 629                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
 630                 dsl_dataset_rele(*dsp, tag);
 631 
 632                 ASSERT3U((err == 0), ==, (ds != NULL));
 633 
 634                 if (ds) {
 635                         mutex_enter(&ds->ds_lock);
 636                         if (ds->ds_snapname[0] == 0)
 637                                 (void) strlcpy(ds->ds_snapname, snapname,
 638                                     sizeof (ds->ds_snapname));
 639                         mutex_exit(&ds->ds_lock);
 640                         err = dsl_dataset_hold_ref(ds, tag);
 641                         *dsp = err ? NULL : ds;
 642                 }
 643         }
 644 out:
 645         rw_exit(&dp->dp_config_rwlock);
 646         dsl_dir_close(dd, FTAG);
 647         return (err);
 648 }
 649 
 650 int
 651 dsl_dataset_own(const char *name, boolean_t inconsistentok,
 652     void *tag, dsl_dataset_t **dsp)
 653 {
 654         int err = dsl_dataset_hold(name, tag, dsp);
 655         if (err)
 656                 return (err);
 657         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 658                 dsl_dataset_rele(*dsp, tag);
 659                 return (EBUSY);
 660         }
 661         return (0);
 662 }
 663 
 664 void
 665 dsl_dataset_name(dsl_dataset_t *ds, char *name)
 666 {
 667         if (ds == NULL) {
 668                 (void) strcpy(name, "mos");
 669         } else {
 670                 dsl_dir_name(ds->ds_dir, name);
 671                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 672                 if (ds->ds_snapname[0]) {
 673                         (void) strcat(name, "@");
 674                         /*
 675                          * We use a "recursive" mutex so that we
 676                          * can call dprintf_ds() with ds_lock held.
 677                          */
 678                         if (!MUTEX_HELD(&ds->ds_lock)) {
 679                                 mutex_enter(&ds->ds_lock);
 680                                 (void) strcat(name, ds->ds_snapname);
 681                                 mutex_exit(&ds->ds_lock);
 682                         } else {
 683                                 (void) strcat(name, ds->ds_snapname);
 684                         }
 685                 }
 686         }
 687 }
 688 
 689 static int
 690 dsl_dataset_namelen(dsl_dataset_t *ds)
 691 {
 692         int result;
 693 
 694         if (ds == NULL) {
 695                 result = 3;     /* "mos" */
 696         } else {
 697                 result = dsl_dir_namelen(ds->ds_dir);
 698                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 699                 if (ds->ds_snapname[0]) {
 700                         ++result;       /* adding one for the @-sign */
 701                         if (!MUTEX_HELD(&ds->ds_lock)) {
 702                                 mutex_enter(&ds->ds_lock);
 703                                 result += strlen(ds->ds_snapname);
 704                                 mutex_exit(&ds->ds_lock);
 705                         } else {
 706                                 result += strlen(ds->ds_snapname);
 707                         }
 708                 }
 709         }
 710 
 711         return (result);
 712 }
 713 
 714 void
 715 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 716 {
 717         dmu_buf_rele(ds->ds_dbuf, tag);
 718 }
 719 
 720 void
 721 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 722 {
 723         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
 724                 rw_exit(&ds->ds_rwlock);
 725         }
 726         dsl_dataset_drop_ref(ds, tag);
 727 }
 728 
 729 void
 730 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
 731 {
 732         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
 733             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
 734 
 735         mutex_enter(&ds->ds_lock);
 736         ds->ds_owner = NULL;
 737         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
 738                 rw_exit(&ds->ds_rwlock);
 739                 cv_broadcast(&ds->ds_exclusive_cv);
 740         }
 741         mutex_exit(&ds->ds_lock);
 742         if (ds->ds_dbuf)
 743                 dsl_dataset_drop_ref(ds, tag);
 744         else
 745                 dsl_dataset_evict(NULL, ds);
 746 }
 747 
 748 boolean_t
 749 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
 750 {
 751         boolean_t gotit = FALSE;
 752 
 753         mutex_enter(&ds->ds_lock);
 754         if (ds->ds_owner == NULL &&
 755             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
 756                 ds->ds_owner = tag;
 757                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
 758                         rw_exit(&ds->ds_rwlock);
 759                 gotit = TRUE;
 760         }
 761         mutex_exit(&ds->ds_lock);
 762         return (gotit);
 763 }
 764 
 765 void
 766 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
 767 {
 768         ASSERT3P(owner, ==, ds->ds_owner);
 769         if (!RW_WRITE_HELD(&ds->ds_rwlock))
 770                 rw_enter(&ds->ds_rwlock, RW_WRITER);
 771 }
 772 
 773 uint64_t
 774 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
 775     uint64_t flags, dmu_tx_t *tx)
 776 {
 777         dsl_pool_t *dp = dd->dd_pool;
 778         dmu_buf_t *dbuf;
 779         dsl_dataset_phys_t *dsphys;
 780         uint64_t dsobj;
 781         objset_t *mos = dp->dp_meta_objset;
 782 
 783         if (origin == NULL)
 784                 origin = dp->dp_origin_snap;
 785 
 786         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
 787         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
 788         ASSERT(dmu_tx_is_syncing(tx));
 789         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 790 
 791         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
 792             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
 793         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 794         dmu_buf_will_dirty(dbuf, tx);
 795         dsphys = dbuf->db_data;
 796         bzero(dsphys, sizeof (dsl_dataset_phys_t));
 797         dsphys->ds_dir_obj = dd->dd_object;
 798         dsphys->ds_flags = flags;
 799         dsphys->ds_fsid_guid = unique_create();
 800         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
 801             sizeof (dsphys->ds_guid));
 802         dsphys->ds_snapnames_zapobj =
 803             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
 804             DMU_OT_NONE, 0, tx);
 805         dsphys->ds_creation_time = gethrestime_sec();
 806         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
 807 
 808         if (origin == NULL) {
 809                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
 810         } else {
 811                 dsl_dataset_t *ohds;
 812 
 813                 dsphys->ds_prev_snap_obj = origin->ds_object;
 814                 dsphys->ds_prev_snap_txg =
 815                     origin->ds_phys->ds_creation_txg;
 816                 dsphys->ds_referenced_bytes =
 817                     origin->ds_phys->ds_referenced_bytes;
 818                 dsphys->ds_compressed_bytes =
 819                     origin->ds_phys->ds_compressed_bytes;
 820                 dsphys->ds_uncompressed_bytes =
 821                     origin->ds_phys->ds_uncompressed_bytes;
 822                 dsphys->ds_bp = origin->ds_phys->ds_bp;
 823                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
 824 
 825                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
 826                 origin->ds_phys->ds_num_children++;
 827 
 828                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 829                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
 830                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
 831                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
 832                 dsl_dataset_rele(ohds, FTAG);
 833 
 834                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
 835                         if (origin->ds_phys->ds_next_clones_obj == 0) {
 836                                 origin->ds_phys->ds_next_clones_obj =
 837                                     zap_create(mos,
 838                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 839                         }
 840                         VERIFY(0 == zap_add_int(mos,
 841                             origin->ds_phys->ds_next_clones_obj,
 842                             dsobj, tx));
 843                 }
 844 
 845                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
 846                 dd->dd_phys->dd_origin_obj = origin->ds_object;
 847                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
 848                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
 849                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 850                                 origin->ds_dir->dd_phys->dd_clones =
 851                                     zap_create(mos,
 852                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 853                         }
 854                         VERIFY3U(0, ==, zap_add_int(mos,
 855                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 856                 }
 857         }
 858 
 859         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
 860                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
 861 
 862         dmu_buf_rele(dbuf, FTAG);
 863 
 864         dmu_buf_will_dirty(dd->dd_dbuf, tx);
 865         dd->dd_phys->dd_head_dataset_obj = dsobj;
 866 
 867         return (dsobj);
 868 }
 869 
 870 uint64_t
 871 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
 872     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
 873 {
 874         dsl_pool_t *dp = pdd->dd_pool;
 875         uint64_t dsobj, ddobj;
 876         dsl_dir_t *dd;
 877 
 878         ASSERT(lastname[0] != '@');
 879 
 880         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
 881         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
 882 
 883         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
 884 
 885         dsl_deleg_set_create_perms(dd, tx, cr);
 886 
 887         dsl_dir_close(dd, FTAG);
 888 
 889         /*
 890          * If we are creating a clone, make sure we zero out any stale
 891          * data from the origin snapshots zil header.
 892          */
 893         if (origin != NULL) {
 894                 dsl_dataset_t *ds;
 895                 objset_t *os;
 896 
 897                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 898                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
 899                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
 900                 dsl_dataset_dirty(ds, tx);
 901                 dsl_dataset_rele(ds, FTAG);
 902         }
 903 
 904         return (dsobj);
 905 }
 906 
 907 /*
 908  * The snapshots must all be in the same pool.
 909  */
 910 int
 911 dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer,
 912     nvlist_t *errlist)
 913 {
 914         int err;
 915         dsl_sync_task_t *dst;
 916         spa_t *spa;
 917         nvpair_t *pair;
 918         dsl_sync_task_group_t *dstg;
 919 
 920         pair = nvlist_next_nvpair(snaps, NULL);
 921         if (pair == NULL)
 922                 return (0);
 923 
 924         err = spa_open(nvpair_name(pair), &spa, FTAG);
 925         if (err)
 926                 return (err);
 927         dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 928 
 929         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
 930             pair = nvlist_next_nvpair(snaps, pair)) {
 931                 dsl_dataset_t *ds;
 932 
 933                 err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
 934                 if (err == 0) {
 935                         struct dsl_ds_destroyarg *dsda;
 936 
 937                         dsl_dataset_make_exclusive(ds, dstg);
 938                         dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
 939                             KM_SLEEP);
 940                         dsda->ds = ds;
 941                         dsda->defer = defer;
 942                         dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 943                             dsl_dataset_destroy_sync, dsda, dstg, 0);
 944                 } else if (err == ENOENT) {
 945                         err = 0;
 946                 } else {
 947                         fnvlist_add_int32(errlist, nvpair_name(pair), err);
 948                         break;
 949                 }
 950         }
 951 
 952         if (err == 0)
 953                 err = dsl_sync_task_group_wait(dstg);
 954 
 955         for (dst = list_head(&dstg->dstg_tasks); dst;
 956             dst = list_next(&dstg->dstg_tasks, dst)) {
 957                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
 958                 dsl_dataset_t *ds = dsda->ds;
 959 
 960                 /*
 961                  * Return the snapshots that triggered the error.
 962                  */
 963                 if (dst->dst_err != 0) {
 964                         char name[ZFS_MAXNAMELEN];
 965                         dsl_dataset_name(ds, name);
 966                         fnvlist_add_int32(errlist, name, dst->dst_err);
 967                 }
 968                 ASSERT3P(dsda->rm_origin, ==, NULL);
 969                 dsl_dataset_disown(ds, dstg);
 970                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
 971         }
 972 
 973         dsl_sync_task_group_destroy(dstg);
 974         spa_close(spa, FTAG);
 975         return (err);
 976 
 977 }
 978 
 979 static boolean_t
 980 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
 981 {
 982         boolean_t might_destroy = B_FALSE;
 983 
 984         mutex_enter(&ds->ds_lock);
 985         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
 986             DS_IS_DEFER_DESTROY(ds))
 987                 might_destroy = B_TRUE;
 988         mutex_exit(&ds->ds_lock);
 989 
 990         return (might_destroy);
 991 }
 992 
 993 /*
 994  * If we're removing a clone, and these three conditions are true:
 995  *      1) the clone's origin has no other children
 996  *      2) the clone's origin has no user references
 997  *      3) the clone's origin has been marked for deferred destruction
 998  * Then, prepare to remove the origin as part of this sync task group.
 999  */
1000 static int
1001 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1002 {
1003         dsl_dataset_t *ds = dsda->ds;
1004         dsl_dataset_t *origin = ds->ds_prev;
1005 
1006         if (dsl_dataset_might_destroy_origin(origin)) {
1007                 char *name;
1008                 int namelen;
1009                 int error;
1010 
1011                 namelen = dsl_dataset_namelen(origin) + 1;
1012                 name = kmem_alloc(namelen, KM_SLEEP);
1013                 dsl_dataset_name(origin, name);
1014 #ifdef _KERNEL
1015                 error = zfs_unmount_snap(name, NULL);
1016                 if (error) {
1017                         kmem_free(name, namelen);
1018                         return (error);
1019                 }
1020 #endif
1021                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1022                 kmem_free(name, namelen);
1023                 if (error)
1024                         return (error);
1025                 dsda->rm_origin = origin;
1026                 dsl_dataset_make_exclusive(origin, tag);
1027         }
1028 
1029         return (0);
1030 }
1031 
1032 /*
1033  * ds must be opened as OWNER.  On return (whether successful or not),
1034  * ds will be closed and caller can no longer dereference it.
1035  */
1036 int
1037 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1038 {
1039         int err;
1040         dsl_sync_task_group_t *dstg;
1041         objset_t *os;
1042         dsl_dir_t *dd;
1043         uint64_t obj;
1044         struct dsl_ds_destroyarg dsda = { 0 };
1045 
1046         dsda.ds = ds;
1047 
1048         if (dsl_dataset_is_snapshot(ds)) {
1049                 /* Destroying a snapshot is simpler */
1050                 dsl_dataset_make_exclusive(ds, tag);
1051 
1052                 dsda.defer = defer;
1053                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1054                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1055                     &dsda, tag, 0);
1056                 ASSERT3P(dsda.rm_origin, ==, NULL);
1057                 goto out;
1058         } else if (defer) {
1059                 err = EINVAL;
1060                 goto out;
1061         }
1062 
1063         dd = ds->ds_dir;
1064 
1065         if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1066             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1067                 /*
1068                  * Check for errors and mark this ds as inconsistent, in
1069                  * case we crash while freeing the objects.
1070                  */
1071                 err = dsl_sync_task_do(dd->dd_pool,
1072                     dsl_dataset_destroy_begin_check,
1073                     dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1074                 if (err)
1075                         goto out;
1076 
1077                 err = dmu_objset_from_ds(ds, &os);
1078                 if (err)
1079                         goto out;
1080 
1081                 /*
1082                  * Remove all objects while in the open context so that
1083                  * there is less work to do in the syncing context.
1084                  */
1085                 for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1086                     ds->ds_phys->ds_prev_snap_txg)) {
1087                         /*
1088                          * Ignore errors, if there is not enough disk space
1089                          * we will deal with it in dsl_dataset_destroy_sync().
1090                          */
1091                         (void) dmu_free_object(os, obj);
1092                 }
1093                 if (err != ESRCH)
1094                         goto out;
1095 
1096                 /*
1097                  * Sync out all in-flight IO.
1098                  */
1099                 txg_wait_synced(dd->dd_pool, 0);
1100 
1101                 /*
1102                  * If we managed to free all the objects in open
1103                  * context, the user space accounting should be zero.
1104                  */
1105                 if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1106                     dmu_objset_userused_enabled(os)) {
1107                         uint64_t count;
1108 
1109                         ASSERT(zap_count(os, DMU_USERUSED_OBJECT,
1110                             &count) != 0 || count == 0);
1111                         ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT,
1112                             &count) != 0 || count == 0);
1113                 }
1114         }
1115 
1116         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1117         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1118         rw_exit(&dd->dd_pool->dp_config_rwlock);
1119 
1120         if (err)
1121                 goto out;
1122 
1123         /*
1124          * Blow away the dsl_dir + head dataset.
1125          */
1126         dsl_dataset_make_exclusive(ds, tag);
1127         /*
1128          * If we're removing a clone, we might also need to remove its
1129          * origin.
1130          */
1131         do {
1132                 dsda.need_prep = B_FALSE;
1133                 if (dsl_dir_is_clone(dd)) {
1134                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1135                         if (err) {
1136                                 dsl_dir_close(dd, FTAG);
1137                                 goto out;
1138                         }
1139                 }
1140 
1141                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1142                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1143                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1144                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1145                     dsl_dir_destroy_sync, dd, FTAG, 0);
1146                 err = dsl_sync_task_group_wait(dstg);
1147                 dsl_sync_task_group_destroy(dstg);
1148 
1149                 /*
1150                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1151                  * on the origin snap, in which case we can get EBUSY if we
1152                  * needed to destroy the origin snap but were not ready to
1153                  * do so.
1154                  */
1155                 if (dsda.need_prep) {
1156                         ASSERT(err == EBUSY);
1157                         ASSERT(dsl_dir_is_clone(dd));
1158                         ASSERT(dsda.rm_origin == NULL);
1159                 }
1160         } while (dsda.need_prep);
1161 
1162         if (dsda.rm_origin != NULL)
1163                 dsl_dataset_disown(dsda.rm_origin, tag);
1164 
1165         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1166         if (err)
1167                 dsl_dir_close(dd, FTAG);
1168 out:
1169         dsl_dataset_disown(ds, tag);
1170         return (err);
1171 }
1172 
1173 blkptr_t *
1174 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1175 {
1176         return (&ds->ds_phys->ds_bp);
1177 }
1178 
1179 void
1180 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1181 {
1182         ASSERT(dmu_tx_is_syncing(tx));
1183         /* If it's the meta-objset, set dp_meta_rootbp */
1184         if (ds == NULL) {
1185                 tx->tx_pool->dp_meta_rootbp = *bp;
1186         } else {
1187                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1188                 ds->ds_phys->ds_bp = *bp;
1189         }
1190 }
1191 
1192 spa_t *
1193 dsl_dataset_get_spa(dsl_dataset_t *ds)
1194 {
1195         return (ds->ds_dir->dd_pool->dp_spa);
1196 }
1197 
1198 void
1199 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1200 {
1201         dsl_pool_t *dp;
1202 
1203         if (ds == NULL) /* this is the meta-objset */
1204                 return;
1205 
1206         ASSERT(ds->ds_objset != NULL);
1207 
1208         if (ds->ds_phys->ds_next_snap_obj != 0)
1209                 panic("dirtying snapshot!");
1210 
1211         dp = ds->ds_dir->dd_pool;
1212 
1213         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1214                 /* up the hold count until we can be written out */
1215                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1216         }
1217 }
1218 
1219 boolean_t
1220 dsl_dataset_is_dirty(dsl_dataset_t *ds)
1221 {
1222         for (int t = 0; t < TXG_SIZE; t++) {
1223                 if (txg_list_member(&ds->ds_dir->dd_pool->dp_dirty_datasets,
1224                     ds, t))
1225                         return (B_TRUE);
1226         }
1227         return (B_FALSE);
1228 }
1229 
1230 /*
1231  * The unique space in the head dataset can be calculated by subtracting
1232  * the space used in the most recent snapshot, that is still being used
1233  * in this file system, from the space currently in use.  To figure out
1234  * the space in the most recent snapshot still in use, we need to take
1235  * the total space used in the snapshot and subtract out the space that
1236  * has been freed up since the snapshot was taken.
1237  */
1238 static void
1239 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1240 {
1241         uint64_t mrs_used;
1242         uint64_t dlused, dlcomp, dluncomp;
1243 
1244         ASSERT(!dsl_dataset_is_snapshot(ds));
1245 
1246         if (ds->ds_phys->ds_prev_snap_obj != 0)
1247                 mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1248         else
1249                 mrs_used = 0;
1250 
1251         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1252 
1253         ASSERT3U(dlused, <=, mrs_used);
1254         ds->ds_phys->ds_unique_bytes =
1255             ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1256 
1257         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1258             SPA_VERSION_UNIQUE_ACCURATE)
1259                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1260 }
1261 
1262 struct killarg {
1263         dsl_dataset_t *ds;
1264         dmu_tx_t *tx;
1265 };
1266 
1267 /* ARGSUSED */
1268 static int
1269 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1270     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1271 {
1272         struct killarg *ka = arg;
1273         dmu_tx_t *tx = ka->tx;
1274 
1275         if (bp == NULL)
1276                 return (0);
1277 
1278         if (zb->zb_level == ZB_ZIL_LEVEL) {
1279                 ASSERT(zilog != NULL);
1280                 /*
1281                  * It's a block in the intent log.  It has no
1282                  * accounting, so just free it.
1283                  */
1284                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1285         } else {
1286                 ASSERT(zilog == NULL);
1287                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1288                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1289         }
1290 
1291         return (0);
1292 }
1293 
1294 /* ARGSUSED */
1295 static int
1296 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1297 {
1298         dsl_dataset_t *ds = arg1;
1299         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1300         uint64_t count;
1301         int err;
1302 
1303         /*
1304          * Can't delete a head dataset if there are snapshots of it.
1305          * (Except if the only snapshots are from the branch we cloned
1306          * from.)
1307          */
1308         if (ds->ds_prev != NULL &&
1309             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1310                 return (EBUSY);
1311 
1312         /*
1313          * This is really a dsl_dir thing, but check it here so that
1314          * we'll be less likely to leave this dataset inconsistent &
1315          * nearly destroyed.
1316          */
1317         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1318         if (err)
1319                 return (err);
1320         if (count != 0)
1321                 return (EEXIST);
1322 
1323         return (0);
1324 }
1325 
1326 /* ARGSUSED */
1327 static void
1328 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1329 {
1330         dsl_dataset_t *ds = arg1;
1331 
1332         /* Mark it as inconsistent on-disk, in case we crash */
1333         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1334         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1335 
1336         spa_history_log_internal_ds(ds, "destroy begin", tx, "");
1337 }
1338 
1339 static int
1340 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1341     dmu_tx_t *tx)
1342 {
1343         dsl_dataset_t *ds = dsda->ds;
1344         dsl_dataset_t *ds_prev = ds->ds_prev;
1345 
1346         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1347                 struct dsl_ds_destroyarg ndsda = {0};
1348 
1349                 /*
1350                  * If we're not prepared to remove the origin, don't remove
1351                  * the clone either.
1352                  */
1353                 if (dsda->rm_origin == NULL) {
1354                         dsda->need_prep = B_TRUE;
1355                         return (EBUSY);
1356                 }
1357 
1358                 ndsda.ds = ds_prev;
1359                 ndsda.is_origin_rm = B_TRUE;
1360                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1361         }
1362 
1363         /*
1364          * If we're not going to remove the origin after all,
1365          * undo the open context setup.
1366          */
1367         if (dsda->rm_origin != NULL) {
1368                 dsl_dataset_disown(dsda->rm_origin, tag);
1369                 dsda->rm_origin = NULL;
1370         }
1371 
1372         return (0);
1373 }
1374 
1375 /*
1376  * If you add new checks here, you may need to add
1377  * additional checks to the "temporary" case in
1378  * snapshot_check() in dmu_objset.c.
1379  */
1380 /* ARGSUSED */
1381 int
1382 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1383 {
1384         struct dsl_ds_destroyarg *dsda = arg1;
1385         dsl_dataset_t *ds = dsda->ds;
1386 
1387         /* we have an owner hold, so noone else can destroy us */
1388         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1389 
1390         /*
1391          * Only allow deferred destroy on pools that support it.
1392          * NOTE: deferred destroy is only supported on snapshots.
1393          */
1394         if (dsda->defer) {
1395                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1396                     SPA_VERSION_USERREFS)
1397                         return (ENOTSUP);
1398                 ASSERT(dsl_dataset_is_snapshot(ds));
1399                 return (0);
1400         }
1401 
1402         /*
1403          * Can't delete a head dataset if there are snapshots of it.
1404          * (Except if the only snapshots are from the branch we cloned
1405          * from.)
1406          */
1407         if (ds->ds_prev != NULL &&
1408             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1409                 return (EBUSY);
1410 
1411         /*
1412          * If we made changes this txg, traverse_dsl_dataset won't find
1413          * them.  Try again.
1414          */
1415         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1416                 return (EAGAIN);
1417 
1418         if (dsl_dataset_is_snapshot(ds)) {
1419                 /*
1420                  * If this snapshot has an elevated user reference count,
1421                  * we can't destroy it yet.
1422                  */
1423                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1424                         return (EBUSY);
1425 
1426                 mutex_enter(&ds->ds_lock);
1427                 /*
1428                  * Can't delete a branch point. However, if we're destroying
1429                  * a clone and removing its origin due to it having a user
1430                  * hold count of 0 and having been marked for deferred destroy,
1431                  * it's OK for the origin to have a single clone.
1432                  */
1433                 if (ds->ds_phys->ds_num_children >
1434                     (dsda->is_origin_rm ? 2 : 1)) {
1435                         mutex_exit(&ds->ds_lock);
1436                         return (EEXIST);
1437                 }
1438                 mutex_exit(&ds->ds_lock);
1439         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1440                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1441         }
1442 
1443         /* XXX we should do some i/o error checking... */
1444         return (0);
1445 }
1446 
1447 struct refsarg {
1448         kmutex_t lock;
1449         boolean_t gone;
1450         kcondvar_t cv;
1451 };
1452 
1453 /* ARGSUSED */
1454 static void
1455 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1456 {
1457         struct refsarg *arg = argv;
1458 
1459         mutex_enter(&arg->lock);
1460         arg->gone = TRUE;
1461         cv_signal(&arg->cv);
1462         mutex_exit(&arg->lock);
1463 }
1464 
1465 static void
1466 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1467 {
1468         struct refsarg arg;
1469 
1470         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1471         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1472         arg.gone = FALSE;
1473         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1474             dsl_dataset_refs_gone);
1475         dmu_buf_rele(ds->ds_dbuf, tag);
1476         mutex_enter(&arg.lock);
1477         while (!arg.gone)
1478                 cv_wait(&arg.cv, &arg.lock);
1479         ASSERT(arg.gone);
1480         mutex_exit(&arg.lock);
1481         ds->ds_dbuf = NULL;
1482         ds->ds_phys = NULL;
1483         mutex_destroy(&arg.lock);
1484         cv_destroy(&arg.cv);
1485 }
1486 
1487 static void
1488 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1489 {
1490         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1491         uint64_t count;
1492         int err;
1493 
1494         ASSERT(ds->ds_phys->ds_num_children >= 2);
1495         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1496         /*
1497          * The err should not be ENOENT, but a bug in a previous version
1498          * of the code could cause upgrade_clones_cb() to not set
1499          * ds_next_snap_obj when it should, leading to a missing entry.
1500          * If we knew that the pool was created after
1501          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1502          * ENOENT.  However, at least we can check that we don't have
1503          * too many entries in the next_clones_obj even after failing to
1504          * remove this one.
1505          */
1506         if (err != ENOENT) {
1507                 VERIFY0(err);
1508         }
1509         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1510             &count));
1511         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1512 }
1513 
1514 static void
1515 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1516 {
1517         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1518         zap_cursor_t zc;
1519         zap_attribute_t za;
1520 
1521         /*
1522          * If it is the old version, dd_clones doesn't exist so we can't
1523          * find the clones, but deadlist_remove_key() is a no-op so it
1524          * doesn't matter.
1525          */
1526         if (ds->ds_dir->dd_phys->dd_clones == 0)
1527                 return;
1528 
1529         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1530             zap_cursor_retrieve(&zc, &za) == 0;
1531             zap_cursor_advance(&zc)) {
1532                 dsl_dataset_t *clone;
1533 
1534                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1535                     za.za_first_integer, FTAG, &clone));
1536                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1537                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1538                             mintxg, tx);
1539                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1540                 }
1541                 dsl_dataset_rele(clone, FTAG);
1542         }
1543         zap_cursor_fini(&zc);
1544 }
1545 
1546 struct process_old_arg {
1547         dsl_dataset_t *ds;
1548         dsl_dataset_t *ds_prev;
1549         boolean_t after_branch_point;
1550         zio_t *pio;
1551         uint64_t used, comp, uncomp;
1552 };
1553 
1554 static int
1555 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1556 {
1557         struct process_old_arg *poa = arg;
1558         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1559 
1560         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1561                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1562                 if (poa->ds_prev && !poa->after_branch_point &&
1563                     bp->blk_birth >
1564                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1565                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1566                             bp_get_dsize_sync(dp->dp_spa, bp);
1567                 }
1568         } else {
1569                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1570                 poa->comp += BP_GET_PSIZE(bp);
1571                 poa->uncomp += BP_GET_UCSIZE(bp);
1572                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1573         }
1574         return (0);
1575 }
1576 
1577 static void
1578 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1579     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1580 {
1581         struct process_old_arg poa = { 0 };
1582         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1583         objset_t *mos = dp->dp_meta_objset;
1584 
1585         ASSERT(ds->ds_deadlist.dl_oldfmt);
1586         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1587 
1588         poa.ds = ds;
1589         poa.ds_prev = ds_prev;
1590         poa.after_branch_point = after_branch_point;
1591         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1592         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1593             process_old_cb, &poa, tx));
1594         VERIFY0(zio_wait(poa.pio));
1595         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1596 
1597         /* change snapused */
1598         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1599             -poa.used, -poa.comp, -poa.uncomp, tx);
1600 
1601         /* swap next's deadlist to our deadlist */
1602         dsl_deadlist_close(&ds->ds_deadlist);
1603         dsl_deadlist_close(&ds_next->ds_deadlist);
1604         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1605             ds->ds_phys->ds_deadlist_obj);
1606         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1607         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1608             ds_next->ds_phys->ds_deadlist_obj);
1609 }
1610 
1611 static int
1612 old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1613 {
1614         int err;
1615         struct killarg ka;
1616 
1617         /*
1618          * Free everything that we point to (that's born after
1619          * the previous snapshot, if we are a clone)
1620          *
1621          * NB: this should be very quick, because we already
1622          * freed all the objects in open context.
1623          */
1624         ka.ds = ds;
1625         ka.tx = tx;
1626         err = traverse_dataset(ds,
1627             ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1628             kill_blkptr, &ka);
1629         ASSERT0(err);
1630         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1631 
1632         return (err);
1633 }
1634 
1635 void
1636 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1637 {
1638         struct dsl_ds_destroyarg *dsda = arg1;
1639         dsl_dataset_t *ds = dsda->ds;
1640         int err;
1641         int after_branch_point = FALSE;
1642         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1643         objset_t *mos = dp->dp_meta_objset;
1644         dsl_dataset_t *ds_prev = NULL;
1645         boolean_t wont_destroy;
1646         uint64_t obj;
1647 
1648         wont_destroy = (dsda->defer &&
1649             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1650 
1651         ASSERT(ds->ds_owner || wont_destroy);
1652         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1653         ASSERT(ds->ds_prev == NULL ||
1654             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1655         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1656 
1657         if (wont_destroy) {
1658                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1659                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1660                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1661                 spa_history_log_internal_ds(ds, "defer_destroy", tx, "");
1662                 return;
1663         }
1664 
1665         /* We need to log before removing it from the namespace. */
1666         spa_history_log_internal_ds(ds, "destroy", tx, "");
1667 
1668         /* signal any waiters that this dataset is going away */
1669         mutex_enter(&ds->ds_lock);
1670         ds->ds_owner = dsl_reaper;
1671         cv_broadcast(&ds->ds_exclusive_cv);
1672         mutex_exit(&ds->ds_lock);
1673 
1674         /* Remove our reservation */
1675         if (ds->ds_reserved != 0) {
1676                 dsl_prop_setarg_t psa;
1677                 uint64_t value = 0;
1678 
1679                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1680                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1681                     &value);
1682                 psa.psa_effective_value = 0;    /* predict default value */
1683 
1684                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1685                 ASSERT0(ds->ds_reserved);
1686         }
1687 
1688         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1689 
1690         dsl_scan_ds_destroyed(ds, tx);
1691 
1692         obj = ds->ds_object;
1693 
1694         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1695                 if (ds->ds_prev) {
1696                         ds_prev = ds->ds_prev;
1697                 } else {
1698                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1699                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1700                 }
1701                 after_branch_point =
1702                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1703 
1704                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1705                 if (after_branch_point &&
1706                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1707                         remove_from_next_clones(ds_prev, obj, tx);
1708                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1709                                 VERIFY(0 == zap_add_int(mos,
1710                                     ds_prev->ds_phys->ds_next_clones_obj,
1711                                     ds->ds_phys->ds_next_snap_obj, tx));
1712                         }
1713                 }
1714                 if (after_branch_point &&
1715                     ds->ds_phys->ds_next_snap_obj == 0) {
1716                         /* This clone is toast. */
1717                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1718                         ds_prev->ds_phys->ds_num_children--;
1719 
1720                         /*
1721                          * If the clone's origin has no other clones, no
1722                          * user holds, and has been marked for deferred
1723                          * deletion, then we should have done the necessary
1724                          * destroy setup for it.
1725                          */
1726                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1727                             ds_prev->ds_userrefs == 0 &&
1728                             DS_IS_DEFER_DESTROY(ds_prev)) {
1729                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1730                         } else {
1731                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1732                         }
1733                 } else if (!after_branch_point) {
1734                         ds_prev->ds_phys->ds_next_snap_obj =
1735                             ds->ds_phys->ds_next_snap_obj;
1736                 }
1737         }
1738 
1739         if (dsl_dataset_is_snapshot(ds)) {
1740                 dsl_dataset_t *ds_next;
1741                 uint64_t old_unique;
1742                 uint64_t used = 0, comp = 0, uncomp = 0;
1743 
1744                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1745                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1746                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1747 
1748                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1749 
1750                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1751                 ds_next->ds_phys->ds_prev_snap_obj =
1752                     ds->ds_phys->ds_prev_snap_obj;
1753                 ds_next->ds_phys->ds_prev_snap_txg =
1754                     ds->ds_phys->ds_prev_snap_txg;
1755                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1756                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1757 
1758 
1759                 if (ds_next->ds_deadlist.dl_oldfmt) {
1760                         process_old_deadlist(ds, ds_prev, ds_next,
1761                             after_branch_point, tx);
1762                 } else {
1763                         /* Adjust prev's unique space. */
1764                         if (ds_prev && !after_branch_point) {
1765                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1766                                     ds_prev->ds_phys->ds_prev_snap_txg,
1767                                     ds->ds_phys->ds_prev_snap_txg,
1768                                     &used, &comp, &uncomp);
1769                                 ds_prev->ds_phys->ds_unique_bytes += used;
1770                         }
1771 
1772                         /* Adjust snapused. */
1773                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1774                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1775                             &used, &comp, &uncomp);
1776                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1777                             -used, -comp, -uncomp, tx);
1778 
1779                         /* Move blocks to be freed to pool's free list. */
1780                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1781                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1782                             tx);
1783                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1784                             DD_USED_HEAD, used, comp, uncomp, tx);
1785 
1786                         /* Merge our deadlist into next's and free it. */
1787                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1788                             ds->ds_phys->ds_deadlist_obj, tx);
1789                 }
1790                 dsl_deadlist_close(&ds->ds_deadlist);
1791                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1792 
1793                 /* Collapse range in clone heads */
1794                 dsl_dataset_remove_clones_key(ds,
1795                     ds->ds_phys->ds_creation_txg, tx);
1796 
1797                 if (dsl_dataset_is_snapshot(ds_next)) {
1798                         dsl_dataset_t *ds_nextnext;
1799 
1800                         /*
1801                          * Update next's unique to include blocks which
1802                          * were previously shared by only this snapshot
1803                          * and it.  Those blocks will be born after the
1804                          * prev snap and before this snap, and will have
1805                          * died after the next snap and before the one
1806                          * after that (ie. be on the snap after next's
1807                          * deadlist).
1808                          */
1809                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1810                             ds_next->ds_phys->ds_next_snap_obj,
1811                             FTAG, &ds_nextnext));
1812                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1813                             ds->ds_phys->ds_prev_snap_txg,
1814                             ds->ds_phys->ds_creation_txg,
1815                             &used, &comp, &uncomp);
1816                         ds_next->ds_phys->ds_unique_bytes += used;
1817                         dsl_dataset_rele(ds_nextnext, FTAG);
1818                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1819 
1820                         /* Collapse range in this head. */
1821                         dsl_dataset_t *hds;
1822                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1823                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1824                             FTAG, &hds));
1825                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1826                             ds->ds_phys->ds_creation_txg, tx);
1827                         dsl_dataset_rele(hds, FTAG);
1828 
1829                 } else {
1830                         ASSERT3P(ds_next->ds_prev, ==, ds);
1831                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1832                         ds_next->ds_prev = NULL;
1833                         if (ds_prev) {
1834                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1835                                     ds->ds_phys->ds_prev_snap_obj,
1836                                     ds_next, &ds_next->ds_prev));
1837                         }
1838 
1839                         dsl_dataset_recalc_head_uniq(ds_next);
1840 
1841                         /*
1842                          * Reduce the amount of our unconsmed refreservation
1843                          * being charged to our parent by the amount of
1844                          * new unique data we have gained.
1845                          */
1846                         if (old_unique < ds_next->ds_reserved) {
1847                                 int64_t mrsdelta;
1848                                 uint64_t new_unique =
1849                                     ds_next->ds_phys->ds_unique_bytes;
1850 
1851                                 ASSERT(old_unique <= new_unique);
1852                                 mrsdelta = MIN(new_unique - old_unique,
1853                                     ds_next->ds_reserved - old_unique);
1854                                 dsl_dir_diduse_space(ds->ds_dir,
1855                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1856                         }
1857                 }
1858                 dsl_dataset_rele(ds_next, FTAG);
1859         } else {
1860                 zfeature_info_t *async_destroy =
1861                     &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1862                 objset_t *os;
1863 
1864                 /*
1865                  * There's no next snapshot, so this is a head dataset.
1866                  * Destroy the deadlist.  Unless it's a clone, the
1867                  * deadlist should be empty.  (If it's a clone, it's
1868                  * safe to ignore the deadlist contents.)
1869                  */
1870                 dsl_deadlist_close(&ds->ds_deadlist);
1871                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1872                 ds->ds_phys->ds_deadlist_obj = 0;
1873 
1874                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
1875 
1876                 if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1877                         err = old_synchronous_dataset_destroy(ds, tx);
1878                 } else {
1879                         /*
1880                          * Move the bptree into the pool's list of trees to
1881                          * clean up and update space accounting information.
1882                          */
1883                         uint64_t used, comp, uncomp;
1884 
1885                         zil_destroy_sync(dmu_objset_zil(os), tx);
1886 
1887                         if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1888                                 spa_feature_incr(dp->dp_spa, async_destroy, tx);
1889                                 dp->dp_bptree_obj = bptree_alloc(mos, tx);
1890                                 VERIFY(zap_add(mos,
1891                                     DMU_POOL_DIRECTORY_OBJECT,
1892                                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1893                                     &dp->dp_bptree_obj, tx) == 0);
1894                         }
1895 
1896                         used = ds->ds_dir->dd_phys->dd_used_bytes;
1897                         comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1898                         uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1899 
1900                         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1901                             ds->ds_phys->ds_unique_bytes == used);
1902 
1903                         bptree_add(mos, dp->dp_bptree_obj,
1904                             &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1905                             used, comp, uncomp, tx);
1906                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1907                             -used, -comp, -uncomp, tx);
1908                         dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1909                             used, comp, uncomp, tx);
1910                 }
1911 
1912                 if (ds->ds_prev != NULL) {
1913                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1914                                 VERIFY3U(0, ==, zap_remove_int(mos,
1915                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1916                                     ds->ds_object, tx));
1917                         }
1918                         dsl_dataset_rele(ds->ds_prev, ds);
1919                         ds->ds_prev = ds_prev = NULL;
1920                 }
1921         }
1922 
1923         /*
1924          * This must be done after the dsl_traverse(), because it will
1925          * re-open the objset.
1926          */
1927         if (ds->ds_objset) {
1928                 dmu_objset_evict(ds->ds_objset);
1929                 ds->ds_objset = NULL;
1930         }
1931 
1932         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1933                 /* Erase the link in the dir */
1934                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1935                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1936                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1937                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1938                 ASSERT(err == 0);
1939         } else {
1940                 /* remove from snapshot namespace */
1941                 dsl_dataset_t *ds_head;
1942                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1943                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1944                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1945                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1946 #ifdef ZFS_DEBUG
1947                 {
1948                         uint64_t val;
1949 
1950                         err = dsl_dataset_snap_lookup(ds_head,
1951                             ds->ds_snapname, &val);
1952                         ASSERT0(err);
1953                         ASSERT3U(val, ==, obj);
1954                 }
1955 #endif
1956                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx,
1957                     B_TRUE);
1958                 ASSERT(err == 0);
1959                 dsl_dataset_rele(ds_head, FTAG);
1960         }
1961 
1962         if (ds_prev && ds->ds_prev != ds_prev)
1963                 dsl_dataset_rele(ds_prev, FTAG);
1964 
1965         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1966 
1967         if (ds->ds_phys->ds_next_clones_obj != 0) {
1968                 uint64_t count;
1969                 ASSERT(0 == zap_count(mos,
1970                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1971                 VERIFY(0 == dmu_object_free(mos,
1972                     ds->ds_phys->ds_next_clones_obj, tx));
1973         }
1974         if (ds->ds_phys->ds_props_obj != 0)
1975                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1976         if (ds->ds_phys->ds_userrefs_obj != 0)
1977                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1978         dsl_dir_close(ds->ds_dir, ds);
1979         ds->ds_dir = NULL;
1980         dsl_dataset_drain_refs(ds, tag);
1981         VERIFY(0 == dmu_object_free(mos, obj, tx));
1982 
1983         if (dsda->rm_origin) {
1984                 /*
1985                  * Remove the origin of the clone we just destroyed.
1986                  */
1987                 struct dsl_ds_destroyarg ndsda = {0};
1988 
1989                 ndsda.ds = dsda->rm_origin;
1990                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
1991         }
1992 }
1993 
1994 static int
1995 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1996 {
1997         uint64_t asize;
1998 
1999         if (!dmu_tx_is_syncing(tx))
2000                 return (0);
2001 
2002         /*
2003          * If there's an fs-only reservation, any blocks that might become
2004          * owned by the snapshot dataset must be accommodated by space
2005          * outside of the reservation.
2006          */
2007         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2008         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2009         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2010                 return (ENOSPC);
2011 
2012         /*
2013          * Propagate any reserved space for this snapshot to other
2014          * snapshot checks in this sync group.
2015          */
2016         if (asize > 0)
2017                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2018 
2019         return (0);
2020 }
2021 
2022 /*
2023  * Check if adding additional snapshot(s) would exceed any snapshot limits.
2024  * Note that all snapshot limits up to the root dataset (i.e. the pool itself)
2025  * or the given ancestor must be satisfied. Note that it is valid for the
2026  * count to exceed the limit. This can happen if a snapshot is taken by an
2027  * administrative user in the global zone (e.g. a recursive snapshot by root).
2028  */
2029 int
2030 dsl_snapcount_check(dsl_dir_t *dd, uint64_t cnt, dsl_dir_t *ancestor,
2031     cred_t *cr)
2032 {
2033         uint64_t limit;
2034         int err = 0;
2035 
2036         VERIFY(RW_LOCK_HELD(&dd->dd_pool->dp_config_rwlock));
2037 
2038         /* If we're allowed to change the limit, don't enforce the limit. */
2039         if (dsl_secpolicy_write_prop(dd, ZFS_PROP_SNAPSHOT_LIMIT, cr) == 0)
2040                 return (0);
2041 
2042         /*
2043          * If renaming a dataset with no snapshots, count adjustment is 0.
2044          */
2045         if (cnt == 0)
2046                 return (0);
2047 
2048         /*
2049          * If an ancestor has been provided, stop checking the limit once we
2050          * hit that dir. We need this during rename so that we don't overcount
2051          * the check once we recurse up to the common ancestor.
2052          */
2053         if (ancestor == dd)
2054                 return (0);
2055 
2056         /*
2057          * If we hit an uninitialized node while recursing up the tree, we can
2058          * stop since we know the counts are not valid on this node and we
2059          * know we won't touch this node's counts. We also know that the counts
2060          * on the nodes above this one are uninitialized and that there cannot
2061          * be a limit set on any of those nodes.
2062          */
2063         if (dd->dd_phys->dd_filesystem_count == 0)
2064                 return (0);
2065 
2066         err = dsl_prop_get_dd(dd, zfs_prop_to_name(ZFS_PROP_SNAPSHOT_LIMIT),
2067             8, 1, &limit, NULL, B_FALSE);
2068         if (err != 0)
2069                 return (err);
2070 
2071         /* Is there a snapshot limit which we've hit? */
2072         if ((dd->dd_phys->dd_snapshot_count + cnt) > limit)
2073                 return (EDQUOT);
2074 
2075         if (dd->dd_parent != NULL)
2076                 err = dsl_snapcount_check(dd->dd_parent, cnt, ancestor, cr);
2077 
2078         return (err);
2079 }
2080 
2081 /*
2082  * Adjust the snapshot count for the specified dsl_dir_t and all parents.
2083  * When a new snapshot is created, increment the count on all parents, and when
2084  * a snapshot is destroyed, decrement the count.
2085  */
2086 void
2087 dsl_snapcount_adjust(dsl_dir_t *dd, dmu_tx_t *tx, int64_t delta,
2088     boolean_t first)
2089 {
2090         if (first) {
2091                 VERIFY(RW_LOCK_HELD(&dd->dd_pool->dp_config_rwlock));
2092                 VERIFY(dmu_tx_is_syncing(tx));
2093         }
2094 
2095         /*
2096          * If we hit an uninitialized node while recursing up the tree, we can
2097          * stop since we know the counts are not valid on this node and we
2098          * know we shouldn't touch this node's counts. An uninitialized count
2099          * on the node indicates that either the feature has not yet been
2100          * activated or there are no limits on this part of the tree.
2101          */
2102         if (dd->dd_phys->dd_filesystem_count == 0)
2103                 return;
2104 
2105         /* if renaming a dataset with no snapshots, count adjustment is 0 */
2106         if (delta == 0)
2107                 return;
2108 
2109         /*
2110          * On initial entry we need to check if this feature is active, but
2111          * we don't want to re-check this on each recursive call. Note: the
2112          * feature cannot be active if it's not enabled. If the feature is not
2113          * active, don't touch the on-disk count fields.
2114          */
2115         if (first) {
2116                 zfeature_info_t *quota_feat =
2117                     &spa_feature_table[SPA_FEATURE_FS_SS_LIMIT];
2118 
2119                 if (!spa_feature_is_active(dd->dd_pool->dp_spa, quota_feat))
2120                         return;
2121         }
2122 
2123         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2124 
2125         mutex_enter(&dd->dd_lock);
2126 
2127         dd->dd_phys->dd_snapshot_count += delta;
2128         VERIFY(dd->dd_phys->dd_snapshot_count >= 0);
2129 
2130         /* Roll up this additional count into our ancestors */
2131         if (dd->dd_parent != NULL)
2132                 dsl_snapcount_adjust(dd->dd_parent, tx, delta, B_FALSE);
2133 
2134         mutex_exit(&dd->dd_lock);
2135 }
2136 
2137 int
2138 dsl_dataset_snapshot_check(dsl_dataset_t *ds, const char *snapname,
2139     uint64_t cnt, dmu_tx_t *tx, cred_t *cr)
2140 {
2141         int err;
2142         uint64_t value;
2143 
2144         /*
2145          * We don't allow multiple snapshots of the same txg.  If there
2146          * is already one, try again.
2147          */
2148         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2149                 return (EAGAIN);
2150 
2151         /*
2152          * Check for conflicting snapshot name.
2153          */
2154         err = dsl_dataset_snap_lookup(ds, snapname, &value);
2155         if (err == 0)
2156                 return (EEXIST);
2157         if (err != ENOENT)
2158                 return (err);
2159 
2160         /*
2161          * Check that the dataset's name is not too long.  Name consists
2162          * of the dataset's length + 1 for the @-sign + snapshot name's length
2163          */
2164         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2165                 return (ENAMETOOLONG);
2166 
2167         err = dsl_snapcount_check(ds->ds_dir, cnt, NULL, cr);
2168         if (err)
2169                 return (err);
2170 
2171         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2172         if (err)
2173                 return (err);
2174 
2175         ds->ds_trysnap_txg = tx->tx_txg;
2176         return (0);
2177 }
2178 
2179 void
2180 dsl_dataset_snapshot_sync(dsl_dataset_t *ds, const char *snapname,
2181     dmu_tx_t *tx)
2182 {
2183         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2184         dmu_buf_t *dbuf;
2185         dsl_dataset_phys_t *dsphys;
2186         uint64_t dsobj, crtxg;
2187         objset_t *mos = dp->dp_meta_objset;
2188         int err;
2189 
2190         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2191 
2192         dsl_snapcount_adjust(ds->ds_dir, tx, 1, B_TRUE);
2193 
2194         /*
2195          * The origin's ds_creation_txg has to be < TXG_INITIAL
2196          */
2197         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2198                 crtxg = 1;
2199         else
2200                 crtxg = tx->tx_txg;
2201 
2202         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2203             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2204         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2205         dmu_buf_will_dirty(dbuf, tx);
2206         dsphys = dbuf->db_data;
2207         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2208         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2209         dsphys->ds_fsid_guid = unique_create();
2210         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2211             sizeof (dsphys->ds_guid));
2212         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2213         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2214         dsphys->ds_next_snap_obj = ds->ds_object;
2215         dsphys->ds_num_children = 1;
2216         dsphys->ds_creation_time = gethrestime_sec();
2217         dsphys->ds_creation_txg = crtxg;
2218         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2219         dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2220         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2221         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2222         dsphys->ds_flags = ds->ds_phys->ds_flags;
2223         dsphys->ds_bp = ds->ds_phys->ds_bp;
2224         dmu_buf_rele(dbuf, FTAG);
2225 
2226         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2227         if (ds->ds_prev) {
2228                 uint64_t next_clones_obj =
2229                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2230                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2231                     ds->ds_object ||
2232                     ds->ds_prev->ds_phys->ds_num_children > 1);
2233                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2234                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2235                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2236                             ds->ds_prev->ds_phys->ds_creation_txg);
2237                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2238                 } else if (next_clones_obj != 0) {
2239                         remove_from_next_clones(ds->ds_prev,
2240                             dsphys->ds_next_snap_obj, tx);
2241                         VERIFY3U(0, ==, zap_add_int(mos,
2242                             next_clones_obj, dsobj, tx));
2243                 }
2244         }
2245 
2246         /*
2247          * If we have a reference-reservation on this dataset, we will
2248          * need to increase the amount of refreservation being charged
2249          * since our unique space is going to zero.
2250          */
2251         if (ds->ds_reserved) {
2252                 int64_t delta;
2253                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2254                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2255                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2256                     delta, 0, 0, tx);
2257         }
2258 
2259         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2260         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2261             ds->ds_dir->dd_myname, snapname, dsobj,
2262             ds->ds_phys->ds_prev_snap_txg);
2263         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2264             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2265         dsl_deadlist_close(&ds->ds_deadlist);
2266         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2267         dsl_deadlist_add_key(&ds->ds_deadlist,
2268             ds->ds_phys->ds_prev_snap_txg, tx);
2269 
2270         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2271         ds->ds_phys->ds_prev_snap_obj = dsobj;
2272         ds->ds_phys->ds_prev_snap_txg = crtxg;
2273         ds->ds_phys->ds_unique_bytes = 0;
2274         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2275                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2276 
2277         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2278             snapname, 8, 1, &dsobj, tx);
2279         ASSERT(err == 0);
2280 
2281         if (ds->ds_prev)
2282                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2283         VERIFY(0 == dsl_dataset_get_ref(dp,
2284             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2285 
2286         dsl_scan_ds_snapshotted(ds, tx);
2287 
2288         dsl_dir_snap_cmtime_update(ds->ds_dir);
2289 
2290         spa_history_log_internal_ds(ds->ds_prev, "snapshot", tx, "");
2291 }
2292 
2293 void
2294 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2295 {
2296         ASSERT(dmu_tx_is_syncing(tx));
2297         ASSERT(ds->ds_objset != NULL);
2298         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2299 
2300         /*
2301          * in case we had to change ds_fsid_guid when we opened it,
2302          * sync it out now.
2303          */
2304         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2305         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2306 
2307         dmu_objset_sync(ds->ds_objset, zio, tx);
2308 }
2309 
2310 static void
2311 get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2312 {
2313         uint64_t count = 0;
2314         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2315         zap_cursor_t zc;
2316         zap_attribute_t za;
2317         nvlist_t *propval;
2318         nvlist_t *val;
2319 
2320         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2321         VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2322         VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2323 
2324         /*
2325          * There may me missing entries in ds_next_clones_obj
2326          * due to a bug in a previous version of the code.
2327          * Only trust it if it has the right number of entries.
2328          */
2329         if (ds->ds_phys->ds_next_clones_obj != 0) {
2330                 ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2331                     &count));
2332         }
2333         if (count != ds->ds_phys->ds_num_children - 1) {
2334                 goto fail;
2335         }
2336         for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2337             zap_cursor_retrieve(&zc, &za) == 0;
2338             zap_cursor_advance(&zc)) {
2339                 dsl_dataset_t *clone;
2340                 char buf[ZFS_MAXNAMELEN];
2341                 /*
2342                  * Even though we hold the dp_config_rwlock, the dataset
2343                  * may fail to open, returning ENOENT.  If there is a
2344                  * thread concurrently attempting to destroy this
2345                  * dataset, it will have the ds_rwlock held for
2346                  * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2347                  * dsl_dataset_hold_ref() will fail its
2348                  * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2349                  * dp_config_rwlock, and wait for the destroy progress
2350                  * and signal ds_exclusive_cv.  If the destroy was
2351                  * successful, we will see that
2352                  * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2353                  */
2354                 if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2355                     za.za_first_integer, FTAG, &clone) != 0)
2356                         continue;
2357                 dsl_dir_name(clone->ds_dir, buf);
2358                 VERIFY(nvlist_add_boolean(val, buf) == 0);
2359                 dsl_dataset_rele(clone, FTAG);
2360         }
2361         zap_cursor_fini(&zc);
2362         VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2363         VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2364             propval) == 0);
2365 fail:
2366         nvlist_free(val);
2367         nvlist_free(propval);
2368         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2369 }
2370 
2371 void
2372 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2373 {
2374         uint64_t refd, avail, uobjs, aobjs, ratio;
2375 
2376         ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2377             (ds->ds_phys->ds_uncompressed_bytes * 100 /
2378             ds->ds_phys->ds_compressed_bytes);
2379 
2380         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2381 
2382         if (dsl_dataset_is_snapshot(ds)) {
2383                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2384                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2385                     ds->ds_phys->ds_unique_bytes);
2386                 get_clones_stat(ds, nv);
2387         } else {
2388                 dsl_dir_stats(ds->ds_dir, nv);
2389         }
2390 
2391         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2392         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2393         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2394 
2395         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2396             ds->ds_phys->ds_creation_time);
2397         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2398             ds->ds_phys->ds_creation_txg);
2399         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2400             ds->ds_quota);
2401         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2402             ds->ds_reserved);
2403         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2404             ds->ds_phys->ds_guid);
2405         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2406             ds->ds_phys->ds_unique_bytes);
2407         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2408             ds->ds_object);
2409         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2410             ds->ds_userrefs);
2411         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2412             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2413 
2414         if (ds->ds_phys->ds_prev_snap_obj != 0) {
2415                 uint64_t written, comp, uncomp;
2416                 dsl_pool_t *dp = ds->ds_dir->dd_pool;
2417                 dsl_dataset_t *prev;
2418 
2419                 rw_enter(&dp->dp_config_rwlock, RW_READER);
2420                 int err = dsl_dataset_hold_obj(dp,
2421                     ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2422                 rw_exit(&dp->dp_config_rwlock);
2423                 if (err == 0) {
2424                         err = dsl_dataset_space_written(prev, ds, &written,
2425                             &comp, &uncomp);
2426                         dsl_dataset_rele(prev, FTAG);
2427                         if (err == 0) {
2428                                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2429                                     written);
2430                         }
2431                 }
2432         }
2433 }
2434 
2435 void
2436 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2437 {
2438         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2439         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2440         stat->dds_guid = ds->ds_phys->ds_guid;
2441         stat->dds_origin[0] = '\0';
2442         if (dsl_dataset_is_snapshot(ds)) {
2443                 stat->dds_is_snapshot = B_TRUE;
2444                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2445         } else {
2446                 stat->dds_is_snapshot = B_FALSE;
2447                 stat->dds_num_clones = 0;
2448 
2449                 rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2450                 if (dsl_dir_is_clone(ds->ds_dir)) {
2451                         dsl_dataset_t *ods;
2452 
2453                         VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2454                             ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2455                         dsl_dataset_name(ods, stat->dds_origin);
2456                         dsl_dataset_drop_ref(ods, FTAG);
2457                 }
2458                 rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2459         }
2460 }
2461 
2462 uint64_t
2463 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2464 {
2465         return (ds->ds_fsid_guid);
2466 }
2467 
2468 void
2469 dsl_dataset_space(dsl_dataset_t *ds,
2470     uint64_t *refdbytesp, uint64_t *availbytesp,
2471     uint64_t *usedobjsp, uint64_t *availobjsp)
2472 {
2473         *refdbytesp = ds->ds_phys->ds_referenced_bytes;
2474         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2475         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2476                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2477         if (ds->ds_quota != 0) {
2478                 /*
2479                  * Adjust available bytes according to refquota
2480                  */
2481                 if (*refdbytesp < ds->ds_quota)
2482                         *availbytesp = MIN(*availbytesp,
2483                             ds->ds_quota - *refdbytesp);
2484                 else
2485                         *availbytesp = 0;
2486         }
2487         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2488         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2489 }
2490 
2491 boolean_t
2492 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2493 {
2494         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2495 
2496         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2497             dsl_pool_sync_context(dp));
2498         if (ds->ds_prev == NULL)
2499                 return (B_FALSE);
2500         if (ds->ds_phys->ds_bp.blk_birth >
2501             ds->ds_prev->ds_phys->ds_creation_txg) {
2502                 objset_t *os, *os_prev;
2503                 /*
2504                  * It may be that only the ZIL differs, because it was
2505                  * reset in the head.  Don't count that as being
2506                  * modified.
2507                  */
2508                 if (dmu_objset_from_ds(ds, &os) != 0)
2509                         return (B_TRUE);
2510                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2511                         return (B_TRUE);
2512                 return (bcmp(&os->os_phys->os_meta_dnode,
2513                     &os_prev->os_phys->os_meta_dnode,
2514                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2515         }
2516         return (B_FALSE);
2517 }
2518 
2519 /* ARGSUSED */
2520 static int
2521 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2522 {
2523         dsl_dataset_t *ds = arg1;
2524         char *newsnapname = arg2;
2525         dsl_dir_t *dd = ds->ds_dir;
2526         dsl_dataset_t *hds;
2527         uint64_t val;
2528         int err;
2529 
2530         err = dsl_dataset_hold_obj(dd->dd_pool,
2531             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2532         if (err)
2533                 return (err);
2534 
2535         /* new name better not be in use */
2536         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2537         dsl_dataset_rele(hds, FTAG);
2538 
2539         if (err == 0)
2540                 err = EEXIST;
2541         else if (err == ENOENT)
2542                 err = 0;
2543 
2544         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2545         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2546                 err = ENAMETOOLONG;
2547 
2548         return (err);
2549 }
2550 
2551 static void
2552 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2553 {
2554         dsl_dataset_t *ds = arg1;
2555         const char *newsnapname = arg2;
2556         dsl_dir_t *dd = ds->ds_dir;
2557         objset_t *mos = dd->dd_pool->dp_meta_objset;
2558         dsl_dataset_t *hds;
2559         int err;
2560 
2561         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2562 
2563         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2564             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2565 
2566         VERIFY(0 == dsl_dataset_get_snapname(ds));
2567         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx, B_FALSE);
2568         ASSERT0(err);
2569         mutex_enter(&ds->ds_lock);
2570         (void) strcpy(ds->ds_snapname, newsnapname);
2571         mutex_exit(&ds->ds_lock);
2572         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2573             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2574         ASSERT0(err);
2575 
2576         spa_history_log_internal_ds(ds, "rename", tx,
2577             "-> @%s", newsnapname);
2578         dsl_dataset_rele(hds, FTAG);
2579 }
2580 
2581 struct renamesnaparg {
2582         dsl_sync_task_group_t *dstg;
2583         char failed[MAXPATHLEN];
2584         char *oldsnap;
2585         char *newsnap;
2586 };
2587 
2588 static int
2589 dsl_snapshot_rename_one(const char *name, void *arg)
2590 {
2591         struct renamesnaparg *ra = arg;
2592         dsl_dataset_t *ds = NULL;
2593         char *snapname;
2594         int err;
2595 
2596         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2597         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2598 
2599         /*
2600          * For recursive snapshot renames the parent won't be changing
2601          * so we just pass name for both the to/from argument.
2602          */
2603         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2604         if (err != 0) {
2605                 strfree(snapname);
2606                 return (err == ENOENT ? 0 : err);
2607         }
2608 
2609 #ifdef _KERNEL
2610         /*
2611          * For all filesystems undergoing rename, we'll need to unmount it.
2612          */
2613         (void) zfs_unmount_snap(snapname, NULL);
2614 #endif
2615         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2616         strfree(snapname);
2617         if (err != 0)
2618                 return (err == ENOENT ? 0 : err);
2619 
2620         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2621             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2622 
2623         return (0);
2624 }
2625 
2626 static int
2627 dsl_recursive_rename(char *oldname, const char *newname)
2628 {
2629         int err;
2630         struct renamesnaparg *ra;
2631         dsl_sync_task_t *dst;
2632         spa_t *spa;
2633         char *cp, *fsname = spa_strdup(oldname);
2634         int len = strlen(oldname) + 1;
2635 
2636         /* truncate the snapshot name to get the fsname */
2637         cp = strchr(fsname, '@');
2638         *cp = '\0';
2639 
2640         err = spa_open(fsname, &spa, FTAG);
2641         if (err) {
2642                 kmem_free(fsname, len);
2643                 return (err);
2644         }
2645         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2646         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2647 
2648         ra->oldsnap = strchr(oldname, '@') + 1;
2649         ra->newsnap = strchr(newname, '@') + 1;
2650         *ra->failed = '\0';
2651 
2652         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2653             DS_FIND_CHILDREN);
2654         kmem_free(fsname, len);
2655 
2656         if (err == 0) {
2657                 err = dsl_sync_task_group_wait(ra->dstg);
2658         }
2659 
2660         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2661             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2662                 dsl_dataset_t *ds = dst->dst_arg1;
2663                 if (dst->dst_err) {
2664                         dsl_dir_name(ds->ds_dir, ra->failed);
2665                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2666                         (void) strlcat(ra->failed, ra->newsnap,
2667                             sizeof (ra->failed));
2668                 }
2669                 dsl_dataset_rele(ds, ra->dstg);
2670         }
2671 
2672         if (err)
2673                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2674 
2675         dsl_sync_task_group_destroy(ra->dstg);
2676         kmem_free(ra, sizeof (struct renamesnaparg));
2677         spa_close(spa, FTAG);
2678         return (err);
2679 }
2680 
2681 static int
2682 dsl_valid_rename(const char *oldname, void *arg)
2683 {
2684         int delta = *(int *)arg;
2685 
2686         if (strlen(oldname) + delta >= MAXNAMELEN)
2687                 return (ENAMETOOLONG);
2688 
2689         return (0);
2690 }
2691 
2692 #pragma weak dmu_objset_rename = dsl_dataset_rename
2693 int
2694 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2695 {
2696         dsl_dir_t *dd;
2697         dsl_dataset_t *ds;
2698         const char *tail;
2699         int err;
2700 
2701         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2702         if (err)
2703                 return (err);
2704 
2705         if (tail == NULL) {
2706                 int delta = strlen(newname) - strlen(oldname);
2707 
2708                 /* if we're growing, validate child name lengths */
2709                 if (delta > 0)
2710                         err = dmu_objset_find(oldname, dsl_valid_rename,
2711                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2712 
2713                 if (err == 0)
2714                         err = dsl_dir_rename(dd, newname);
2715                 dsl_dir_close(dd, FTAG);
2716                 return (err);
2717         }
2718 
2719         if (tail[0] != '@') {
2720                 /* the name ended in a nonexistent component */
2721                 dsl_dir_close(dd, FTAG);
2722                 return (ENOENT);
2723         }
2724 
2725         dsl_dir_close(dd, FTAG);
2726 
2727         /* new name must be snapshot in same filesystem */
2728         tail = strchr(newname, '@');
2729         if (tail == NULL)
2730                 return (EINVAL);
2731         tail++;
2732         if (strncmp(oldname, newname, tail - newname) != 0)
2733                 return (EXDEV);
2734 
2735         if (recursive) {
2736                 err = dsl_recursive_rename(oldname, newname);
2737         } else {
2738                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2739                 if (err)
2740                         return (err);
2741 
2742                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2743                     dsl_dataset_snapshot_rename_check,
2744                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2745 
2746                 dsl_dataset_rele(ds, FTAG);
2747         }
2748 
2749         return (err);
2750 }
2751 
2752 struct promotenode {
2753         list_node_t link;
2754         dsl_dataset_t *ds;
2755 };
2756 
2757 struct promotearg {
2758         list_t shared_snaps, origin_snaps, clone_snaps;
2759         dsl_dataset_t *origin_origin;
2760         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2761         char *err_ds;
2762         cred_t *cr;
2763 };
2764 
2765 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2766 static boolean_t snaplist_unstable(list_t *l);
2767 
2768 static int
2769 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2770 {
2771         dsl_dataset_t *hds = arg1;
2772         struct promotearg *pa = arg2;
2773         struct promotenode *snap = list_head(&pa->shared_snaps);
2774         dsl_dataset_t *origin_ds = snap->ds;
2775         int err;
2776         uint64_t unused;
2777 
2778         /* Check that it is a real clone */
2779         if (!dsl_dir_is_clone(hds->ds_dir))
2780                 return (EINVAL);
2781 
2782         /* Since this is so expensive, don't do the preliminary check */
2783         if (!dmu_tx_is_syncing(tx))
2784                 return (0);
2785 
2786         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2787                 return (EXDEV);
2788 
2789         /* compute origin's new unique space */
2790         snap = list_tail(&pa->clone_snaps);
2791         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2792         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2793             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2794             &pa->unique, &unused, &unused);
2795 
2796         /*
2797          * Walk the snapshots that we are moving
2798          *
2799          * Compute space to transfer.  Consider the incremental changes
2800          * to used for each snapshot:
2801          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2802          * So each snapshot gave birth to:
2803          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2804          * So a sequence would look like:
2805          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2806          * Which simplifies to:
2807          * uN + kN + kN-1 + ... + k1 + k0
2808          * Note however, if we stop before we reach the ORIGIN we get:
2809          * uN + kN + kN-1 + ... + kM - uM-1
2810          */
2811         pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2812         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2813         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2814         for (snap = list_head(&pa->shared_snaps); snap;
2815             snap = list_next(&pa->shared_snaps, snap)) {
2816                 uint64_t val, dlused, dlcomp, dluncomp;
2817                 dsl_dataset_t *ds = snap->ds;
2818 
2819                 /* Check that the snapshot name does not conflict */
2820                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2821                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2822                 if (err == 0) {
2823                         err = EEXIST;
2824                         goto out;
2825                 }
2826                 if (err != ENOENT)
2827                         goto out;
2828 
2829                 /* The very first snapshot does not have a deadlist */
2830                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2831                         continue;
2832 
2833                 dsl_deadlist_space(&ds->ds_deadlist,
2834                     &dlused, &dlcomp, &dluncomp);
2835                 pa->used += dlused;
2836                 pa->comp += dlcomp;
2837                 pa->uncomp += dluncomp;
2838         }
2839 
2840         /*
2841          * If we are a clone of a clone then we never reached ORIGIN,
2842          * so we need to subtract out the clone origin's used space.
2843          */
2844         if (pa->origin_origin) {
2845                 pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2846                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2847                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2848         }
2849 
2850         /* Check that there is enough space and limit headroom here */
2851         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2852             origin_ds->ds_dir, pa->used, pa->cr);
2853         if (err)
2854                 return (err);
2855 
2856         /*
2857          * Compute the amounts of space that will be used by snapshots
2858          * after the promotion (for both origin and clone).  For each,
2859          * it is the amount of space that will be on all of their
2860          * deadlists (that was not born before their new origin).
2861          */
2862         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2863                 uint64_t space;
2864 
2865                 /*
2866                  * Note, typically this will not be a clone of a clone,
2867                  * so dd_origin_txg will be < TXG_INITIAL, so
2868                  * these snaplist_space() -> dsl_deadlist_space_range()
2869                  * calls will be fast because they do not have to
2870                  * iterate over all bps.
2871                  */
2872                 snap = list_head(&pa->origin_snaps);
2873                 err = snaplist_space(&pa->shared_snaps,
2874                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2875                 if (err)
2876                         return (err);
2877 
2878                 err = snaplist_space(&pa->clone_snaps,
2879                     snap->ds->ds_dir->dd_origin_txg, &space);
2880                 if (err)
2881                         return (err);
2882                 pa->cloneusedsnap += space;
2883         }
2884         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2885                 err = snaplist_space(&pa->origin_snaps,
2886                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2887                 if (err)
2888                         return (err);
2889         }
2890 
2891         return (0);
2892 out:
2893         pa->err_ds =  snap->ds->ds_snapname;
2894         return (err);
2895 }
2896 
2897 static void
2898 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2899 {
2900         dsl_dataset_t *hds = arg1;
2901         struct promotearg *pa = arg2;
2902         struct promotenode *snap = list_head(&pa->shared_snaps);
2903         dsl_dataset_t *origin_ds = snap->ds;
2904         dsl_dataset_t *origin_head;
2905         dsl_dir_t *dd = hds->ds_dir;
2906         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2907         dsl_dir_t *odd = NULL;
2908         uint64_t oldnext_obj;
2909         int64_t delta;
2910 
2911         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2912 
2913         snap = list_head(&pa->origin_snaps);
2914         origin_head = snap->ds;
2915 
2916         /*
2917          * We need to explicitly open odd, since origin_ds's dd will be
2918          * changing.
2919          */
2920         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2921             NULL, FTAG, &odd));
2922 
2923         /* change origin's next snap */
2924         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2925         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2926         snap = list_tail(&pa->clone_snaps);
2927         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2928         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2929 
2930         /* change the origin's next clone */
2931         if (origin_ds->ds_phys->ds_next_clones_obj) {
2932                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2933                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2934                     origin_ds->ds_phys->ds_next_clones_obj,
2935                     oldnext_obj, tx));
2936         }
2937 
2938         /* change origin */
2939         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2940         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2941         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2942         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2943         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2944         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2945         origin_head->ds_dir->dd_origin_txg =
2946             origin_ds->ds_phys->ds_creation_txg;
2947 
2948         /* change dd_clone entries */
2949         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2950                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2951                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2952                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2953                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2954                     hds->ds_object, tx));
2955 
2956                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2957                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2958                     origin_head->ds_object, tx));
2959                 if (dd->dd_phys->dd_clones == 0) {
2960                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2961                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2962                 }
2963                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2964                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2965 
2966         }
2967 
2968         /* move snapshots to this dir */
2969         for (snap = list_head(&pa->shared_snaps); snap;
2970             snap = list_next(&pa->shared_snaps, snap)) {
2971                 dsl_dataset_t *ds = snap->ds;
2972 
2973                 /* unregister props as dsl_dir is changing */
2974                 if (ds->ds_objset) {
2975                         dmu_objset_evict(ds->ds_objset);
2976                         ds->ds_objset = NULL;
2977                 }
2978                 /* move snap name entry */
2979                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2980                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2981                     ds->ds_snapname, tx, B_TRUE));
2982                 VERIFY(0 == zap_add(dp->dp_meta_objset,
2983                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2984                     8, 1, &ds->ds_object, tx));
2985                 dsl_snapcount_adjust(hds->ds_dir, tx, 1, B_TRUE);
2986 
2987                 /* change containing dsl_dir */
2988                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2989                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2990                 ds->ds_phys->ds_dir_obj = dd->dd_object;
2991                 ASSERT3P(ds->ds_dir, ==, odd);
2992                 dsl_dir_close(ds->ds_dir, ds);
2993                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2994                     NULL, ds, &ds->ds_dir));
2995 
2996                 /* move any clone references */
2997                 if (ds->ds_phys->ds_next_clones_obj &&
2998                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2999                         zap_cursor_t zc;
3000                         zap_attribute_t za;
3001 
3002                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
3003                             ds->ds_phys->ds_next_clones_obj);
3004                             zap_cursor_retrieve(&zc, &za) == 0;
3005                             zap_cursor_advance(&zc)) {
3006                                 dsl_dataset_t *cnds;
3007                                 uint64_t o;
3008 
3009                                 if (za.za_first_integer == oldnext_obj) {
3010                                         /*
3011                                          * We've already moved the
3012                                          * origin's reference.
3013                                          */
3014                                         continue;
3015                                 }
3016 
3017                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
3018                                     za.za_first_integer, FTAG, &cnds));
3019                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
3020 
3021                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
3022                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
3023                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
3024                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
3025                                 dsl_dataset_rele(cnds, FTAG);
3026                         }
3027                         zap_cursor_fini(&zc);
3028                 }
3029 
3030                 ASSERT0(dsl_prop_numcb(ds));
3031         }
3032 
3033         /*
3034          * Change space accounting.
3035          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
3036          * both be valid, or both be 0 (resulting in delta == 0).  This
3037          * is true for each of {clone,origin} independently.
3038          */
3039 
3040         delta = pa->cloneusedsnap -
3041             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
3042         ASSERT3S(delta, >=, 0);
3043         ASSERT3U(pa->used, >=, delta);
3044         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
3045         dsl_dir_diduse_space(dd, DD_USED_HEAD,
3046             pa->used - delta, pa->comp, pa->uncomp, tx);
3047 
3048         delta = pa->originusedsnap -
3049             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
3050         ASSERT3S(delta, <=, 0);
3051         ASSERT3U(pa->used, >=, -delta);
3052         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
3053         dsl_dir_diduse_space(odd, DD_USED_HEAD,
3054             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
3055 
3056         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
3057 
3058         /* log history record */
3059         spa_history_log_internal_ds(hds, "promote", tx, "");
3060 
3061         dsl_dir_close(odd, FTAG);
3062 }
3063 
3064 static char *snaplist_tag = "snaplist";
3065 /*
3066  * Make a list of dsl_dataset_t's for the snapshots between first_obj
3067  * (exclusive) and last_obj (inclusive).  The list will be in reverse
3068  * order (last_obj will be the list_head()).  If first_obj == 0, do all
3069  * snapshots back to this dataset's origin.
3070  */
3071 static int
3072 snaplist_make(dsl_pool_t *dp, boolean_t own,
3073     uint64_t first_obj, uint64_t last_obj, list_t *l)
3074 {
3075         uint64_t obj = last_obj;
3076 
3077         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
3078 
3079         list_create(l, sizeof (struct promotenode),
3080             offsetof(struct promotenode, link));
3081 
3082         while (obj != first_obj) {
3083                 dsl_dataset_t *ds;
3084                 struct promotenode *snap;
3085                 int err;
3086 
3087                 if (own) {
3088                         err = dsl_dataset_own_obj(dp, obj,
3089                             0, snaplist_tag, &ds);
3090                         if (err == 0)
3091                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
3092                 } else {
3093                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
3094                 }
3095                 if (err == ENOENT) {
3096                         /* lost race with snapshot destroy */
3097                         struct promotenode *last = list_tail(l);
3098                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
3099                         obj = last->ds->ds_phys->ds_prev_snap_obj;
3100                         continue;
3101                 } else if (err) {
3102                         return (err);
3103                 }
3104 
3105                 if (first_obj == 0)
3106                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
3107 
3108                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
3109                 snap->ds = ds;
3110                 list_insert_tail(l, snap);
3111                 obj = ds->ds_phys->ds_prev_snap_obj;
3112         }
3113 
3114         return (0);
3115 }
3116 
3117 static int
3118 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3119 {
3120         struct promotenode *snap;
3121 
3122         *spacep = 0;
3123         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3124                 uint64_t used, comp, uncomp;
3125                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3126                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
3127                 *spacep += used;
3128         }
3129         return (0);
3130 }
3131 
3132 static void
3133 snaplist_destroy(list_t *l, boolean_t own)
3134 {
3135         struct promotenode *snap;
3136 
3137         if (!l || !list_link_active(&l->list_head))
3138                 return;
3139 
3140         while ((snap = list_tail(l)) != NULL) {
3141                 list_remove(l, snap);
3142                 if (own)
3143                         dsl_dataset_disown(snap->ds, snaplist_tag);
3144                 else
3145                         dsl_dataset_rele(snap->ds, snaplist_tag);
3146                 kmem_free(snap, sizeof (struct promotenode));
3147         }
3148         list_destroy(l);
3149 }
3150 
3151 /*
3152  * Promote a clone.  Nomenclature note:
3153  * "clone" or "cds": the original clone which is being promoted
3154  * "origin" or "ods": the snapshot which is originally clone's origin
3155  * "origin head" or "ohds": the dataset which is the head
3156  * (filesystem/volume) for the origin
3157  * "origin origin": the origin of the origin's filesystem (typically
3158  * NULL, indicating that the clone is not a clone of a clone).
3159  */
3160 int
3161 dsl_dataset_promote(const char *name, char *conflsnap)
3162 {
3163         dsl_dataset_t *ds;
3164         dsl_dir_t *dd;
3165         dsl_pool_t *dp;
3166         dmu_object_info_t doi;
3167         struct promotearg pa = { 0 };
3168         struct promotenode *snap;
3169         int err;
3170 
3171         err = dsl_dataset_hold(name, FTAG, &ds);
3172         if (err)
3173                 return (err);
3174         dd = ds->ds_dir;
3175         dp = dd->dd_pool;
3176 
3177         err = dmu_object_info(dp->dp_meta_objset,
3178             ds->ds_phys->ds_snapnames_zapobj, &doi);
3179         if (err) {
3180                 dsl_dataset_rele(ds, FTAG);
3181                 return (err);
3182         }
3183 
3184         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3185                 dsl_dataset_rele(ds, FTAG);
3186                 return (EINVAL);
3187         }
3188 
3189         /*
3190          * We are going to inherit all the snapshots taken before our
3191          * origin (i.e., our new origin will be our parent's origin).
3192          * Take ownership of them so that we can rename them into our
3193          * namespace.
3194          */
3195         rw_enter(&dp->dp_config_rwlock, RW_READER);
3196 
3197         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3198             &pa.shared_snaps);
3199         if (err != 0)
3200                 goto out;
3201 
3202         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3203         if (err != 0)
3204                 goto out;
3205 
3206         snap = list_head(&pa.shared_snaps);
3207         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3208         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3209             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3210         if (err != 0)
3211                 goto out;
3212 
3213         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3214                 err = dsl_dataset_hold_obj(dp,
3215                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
3216                     FTAG, &pa.origin_origin);
3217                 if (err != 0)
3218                         goto out;
3219         }
3220 
3221 out:
3222         rw_exit(&dp->dp_config_rwlock);
3223         pa.cr = CRED();
3224 
3225         /*
3226          * Add in 128x the snapnames zapobj size, since we will be moving
3227          * a bunch of snapnames to the promoted ds, and dirtying their
3228          * bonus buffers.
3229          */
3230         if (err == 0) {
3231                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3232                     dsl_dataset_promote_sync, ds, &pa,
3233                     2 + 2 * doi.doi_physical_blocks_512);
3234                 if (err && pa.err_ds && conflsnap)
3235                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3236         }
3237 
3238         snaplist_destroy(&pa.shared_snaps, B_TRUE);
3239         snaplist_destroy(&pa.clone_snaps, B_FALSE);
3240         snaplist_destroy(&pa.origin_snaps, B_FALSE);
3241         if (pa.origin_origin)
3242                 dsl_dataset_rele(pa.origin_origin, FTAG);
3243         dsl_dataset_rele(ds, FTAG);
3244         return (err);
3245 }
3246 
3247 struct cloneswaparg {
3248         dsl_dataset_t *cds; /* clone dataset */
3249         dsl_dataset_t *ohds; /* origin's head dataset */
3250         boolean_t force;
3251         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3252 };
3253 
3254 /* ARGSUSED */
3255 static int
3256 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3257 {
3258         struct cloneswaparg *csa = arg1;
3259 
3260         /* they should both be heads */
3261         if (dsl_dataset_is_snapshot(csa->cds) ||
3262             dsl_dataset_is_snapshot(csa->ohds))
3263                 return (EINVAL);
3264 
3265         /* the branch point should be just before them */
3266         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3267                 return (EINVAL);
3268 
3269         /* cds should be the clone (unless they are unrelated) */
3270         if (csa->cds->ds_prev != NULL &&
3271             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3272             csa->ohds->ds_object !=
3273             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3274                 return (EINVAL);
3275 
3276         /* the clone should be a child of the origin */
3277         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3278                 return (EINVAL);
3279 
3280         /* ohds shouldn't be modified unless 'force' */
3281         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3282                 return (ETXTBSY);
3283 
3284         /* adjust amount of any unconsumed refreservation */
3285         csa->unused_refres_delta =
3286             (int64_t)MIN(csa->ohds->ds_reserved,
3287             csa->ohds->ds_phys->ds_unique_bytes) -
3288             (int64_t)MIN(csa->ohds->ds_reserved,
3289             csa->cds->ds_phys->ds_unique_bytes);
3290 
3291         if (csa->unused_refres_delta > 0 &&
3292             csa->unused_refres_delta >
3293             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3294                 return (ENOSPC);
3295 
3296         if (csa->ohds->ds_quota != 0 &&
3297             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3298                 return (EDQUOT);
3299 
3300         return (0);
3301 }
3302 
3303 /* ARGSUSED */
3304 static void
3305 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3306 {
3307         struct cloneswaparg *csa = arg1;
3308         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3309 
3310         ASSERT(csa->cds->ds_reserved == 0);
3311         ASSERT(csa->ohds->ds_quota == 0 ||
3312             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3313 
3314         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3315         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3316 
3317         if (csa->cds->ds_objset != NULL) {
3318                 dmu_objset_evict(csa->cds->ds_objset);
3319                 csa->cds->ds_objset = NULL;
3320         }
3321 
3322         if (csa->ohds->ds_objset != NULL) {
3323                 dmu_objset_evict(csa->ohds->ds_objset);
3324                 csa->ohds->ds_objset = NULL;
3325         }
3326 
3327         /*
3328          * Reset origin's unique bytes, if it exists.
3329          */
3330         if (csa->cds->ds_prev) {
3331                 dsl_dataset_t *origin = csa->cds->ds_prev;
3332                 uint64_t comp, uncomp;
3333 
3334                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3335                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3336                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3337                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3338         }
3339 
3340         /* swap blkptrs */
3341         {
3342                 blkptr_t tmp;
3343                 tmp = csa->ohds->ds_phys->ds_bp;
3344                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3345                 csa->cds->ds_phys->ds_bp = tmp;
3346         }
3347 
3348         /* set dd_*_bytes */
3349         {
3350                 int64_t dused, dcomp, duncomp;
3351                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3352                 uint64_t odl_used, odl_comp, odl_uncomp;
3353 
3354                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3355                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3356 
3357                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3358                     &cdl_used, &cdl_comp, &cdl_uncomp);
3359                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3360                     &odl_used, &odl_comp, &odl_uncomp);
3361 
3362                 dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3363                     (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3364                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3365                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3366                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3367                     cdl_uncomp -
3368                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3369 
3370                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3371                     dused, dcomp, duncomp, tx);
3372                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3373                     -dused, -dcomp, -duncomp, tx);
3374 
3375                 /*
3376                  * The difference in the space used by snapshots is the
3377                  * difference in snapshot space due to the head's
3378                  * deadlist (since that's the only thing that's
3379                  * changing that affects the snapused).
3380                  */
3381                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3382                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3383                     &cdl_used, &cdl_comp, &cdl_uncomp);
3384                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3385                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3386                     &odl_used, &odl_comp, &odl_uncomp);
3387                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3388                     DD_USED_HEAD, DD_USED_SNAP, tx);
3389         }
3390 
3391         /* swap ds_*_bytes */
3392         SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3393             csa->cds->ds_phys->ds_referenced_bytes);
3394         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3395             csa->cds->ds_phys->ds_compressed_bytes);
3396         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3397             csa->cds->ds_phys->ds_uncompressed_bytes);
3398         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3399             csa->cds->ds_phys->ds_unique_bytes);
3400 
3401         /* apply any parent delta for change in unconsumed refreservation */
3402         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3403             csa->unused_refres_delta, 0, 0, tx);
3404 
3405         /*
3406          * Swap deadlists.
3407          */
3408         dsl_deadlist_close(&csa->cds->ds_deadlist);
3409         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3410         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3411             csa->cds->ds_phys->ds_deadlist_obj);
3412         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3413             csa->cds->ds_phys->ds_deadlist_obj);
3414         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3415             csa->ohds->ds_phys->ds_deadlist_obj);
3416 
3417         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3418 
3419         spa_history_log_internal_ds(csa->cds, "clone swap", tx,
3420             "parent=%s", csa->ohds->ds_dir->dd_myname);
3421 }
3422 
3423 /*
3424  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3425  * recv" into an existing fs to swizzle the file system to the new
3426  * version, and by "zfs rollback".  Can also be used to swap two
3427  * independent head datasets if neither has any snapshots.
3428  */
3429 int
3430 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3431     boolean_t force)
3432 {
3433         struct cloneswaparg csa;
3434         int error;
3435 
3436         ASSERT(clone->ds_owner);
3437         ASSERT(origin_head->ds_owner);
3438 retry:
3439         /*
3440          * Need exclusive access for the swap. If we're swapping these
3441          * datasets back after an error, we already hold the locks.
3442          */
3443         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3444                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3445         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3446             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3447                 rw_exit(&clone->ds_rwlock);
3448                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3449                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3450                         rw_exit(&origin_head->ds_rwlock);
3451                         goto retry;
3452                 }
3453         }
3454         csa.cds = clone;
3455         csa.ohds = origin_head;
3456         csa.force = force;
3457         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3458             dsl_dataset_clone_swap_check,
3459             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3460         return (error);
3461 }
3462 
3463 /*
3464  * Given a pool name and a dataset object number in that pool,
3465  * return the name of that dataset.
3466  */
3467 int
3468 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3469 {
3470         spa_t *spa;
3471         dsl_pool_t *dp;
3472         dsl_dataset_t *ds;
3473         int error;
3474 
3475         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3476                 return (error);
3477         dp = spa_get_dsl(spa);
3478         rw_enter(&dp->dp_config_rwlock, RW_READER);
3479         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3480                 dsl_dataset_name(ds, buf);
3481                 dsl_dataset_rele(ds, FTAG);
3482         }
3483         rw_exit(&dp->dp_config_rwlock);
3484         spa_close(spa, FTAG);
3485 
3486         return (error);
3487 }
3488 
3489 int
3490 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3491     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3492 {
3493         int error = 0;
3494 
3495         ASSERT3S(asize, >, 0);
3496 
3497         /*
3498          * *ref_rsrv is the portion of asize that will come from any
3499          * unconsumed refreservation space.
3500          */
3501         *ref_rsrv = 0;
3502 
3503         mutex_enter(&ds->ds_lock);
3504         /*
3505          * Make a space adjustment for reserved bytes.
3506          */
3507         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3508                 ASSERT3U(*used, >=,
3509                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3510                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3511                 *ref_rsrv =
3512                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3513         }
3514 
3515         if (!check_quota || ds->ds_quota == 0) {
3516                 mutex_exit(&ds->ds_lock);
3517                 return (0);
3518         }
3519         /*
3520          * If they are requesting more space, and our current estimate
3521          * is over quota, they get to try again unless the actual
3522          * on-disk is over quota and there are no pending changes (which
3523          * may free up space for us).
3524          */
3525         if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3526                 if (inflight > 0 ||
3527                     ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3528                         error = ERESTART;
3529                 else
3530                         error = EDQUOT;
3531         }
3532         mutex_exit(&ds->ds_lock);
3533 
3534         return (error);
3535 }
3536 
3537 /* ARGSUSED */
3538 static int
3539 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3540 {
3541         dsl_dataset_t *ds = arg1;
3542         dsl_prop_setarg_t *psa = arg2;
3543         int err;
3544 
3545         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3546                 return (ENOTSUP);
3547 
3548         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3549                 return (err);
3550 
3551         if (psa->psa_effective_value == 0)
3552                 return (0);
3553 
3554         if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3555             psa->psa_effective_value < ds->ds_reserved)
3556                 return (ENOSPC);
3557 
3558         return (0);
3559 }
3560 
3561 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3562 
3563 void
3564 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3565 {
3566         dsl_dataset_t *ds = arg1;
3567         dsl_prop_setarg_t *psa = arg2;
3568         uint64_t effective_value = psa->psa_effective_value;
3569 
3570         dsl_prop_set_sync(ds, psa, tx);
3571         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3572 
3573         if (ds->ds_quota != effective_value) {
3574                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3575                 ds->ds_quota = effective_value;
3576         }
3577 }
3578 
3579 int
3580 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3581 {
3582         dsl_dataset_t *ds;
3583         dsl_prop_setarg_t psa;
3584         int err;
3585 
3586         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3587 
3588         err = dsl_dataset_hold(dsname, FTAG, &ds);
3589         if (err)
3590                 return (err);
3591 
3592         /*
3593          * If someone removes a file, then tries to set the quota, we
3594          * want to make sure the file freeing takes effect.
3595          */
3596         txg_wait_open(ds->ds_dir->dd_pool, 0);
3597 
3598         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3599             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3600             ds, &psa, 0);
3601 
3602         dsl_dataset_rele(ds, FTAG);
3603         return (err);
3604 }
3605 
3606 static int
3607 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3608 {
3609         dsl_dataset_t *ds = arg1;
3610         dsl_prop_setarg_t *psa = arg2;
3611         uint64_t effective_value;
3612         uint64_t unique;
3613         int err;
3614 
3615         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3616             SPA_VERSION_REFRESERVATION)
3617                 return (ENOTSUP);
3618 
3619         if (dsl_dataset_is_snapshot(ds))
3620                 return (EINVAL);
3621 
3622         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3623                 return (err);
3624 
3625         effective_value = psa->psa_effective_value;
3626 
3627         /*
3628          * If we are doing the preliminary check in open context, the
3629          * space estimates may be inaccurate.
3630          */
3631         if (!dmu_tx_is_syncing(tx))
3632                 return (0);
3633 
3634         mutex_enter(&ds->ds_lock);
3635         if (!DS_UNIQUE_IS_ACCURATE(ds))
3636                 dsl_dataset_recalc_head_uniq(ds);
3637         unique = ds->ds_phys->ds_unique_bytes;
3638         mutex_exit(&ds->ds_lock);
3639 
3640         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3641                 uint64_t delta = MAX(unique, effective_value) -
3642                     MAX(unique, ds->ds_reserved);
3643 
3644                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3645                         return (ENOSPC);
3646                 if (ds->ds_quota > 0 &&
3647                     effective_value > ds->ds_quota)
3648                         return (ENOSPC);
3649         }
3650 
3651         return (0);
3652 }
3653 
3654 static void
3655 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3656 {
3657         dsl_dataset_t *ds = arg1;
3658         dsl_prop_setarg_t *psa = arg2;
3659         uint64_t effective_value = psa->psa_effective_value;
3660         uint64_t unique;
3661         int64_t delta;
3662 
3663         dsl_prop_set_sync(ds, psa, tx);
3664         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3665 
3666         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3667 
3668         mutex_enter(&ds->ds_dir->dd_lock);
3669         mutex_enter(&ds->ds_lock);
3670         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3671         unique = ds->ds_phys->ds_unique_bytes;
3672         delta = MAX(0, (int64_t)(effective_value - unique)) -
3673             MAX(0, (int64_t)(ds->ds_reserved - unique));
3674         ds->ds_reserved = effective_value;
3675         mutex_exit(&ds->ds_lock);
3676 
3677         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3678         mutex_exit(&ds->ds_dir->dd_lock);
3679 }
3680 
3681 int
3682 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3683     uint64_t reservation)
3684 {
3685         dsl_dataset_t *ds;
3686         dsl_prop_setarg_t psa;
3687         int err;
3688 
3689         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3690             &reservation);
3691 
3692         err = dsl_dataset_hold(dsname, FTAG, &ds);
3693         if (err)
3694                 return (err);
3695 
3696         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3697             dsl_dataset_set_reservation_check,
3698             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3699 
3700         dsl_dataset_rele(ds, FTAG);
3701         return (err);
3702 }
3703 
3704 typedef struct zfs_hold_cleanup_arg {
3705         dsl_pool_t *dp;
3706         uint64_t dsobj;
3707         char htag[MAXNAMELEN];
3708 } zfs_hold_cleanup_arg_t;
3709 
3710 static void
3711 dsl_dataset_user_release_onexit(void *arg)
3712 {
3713         zfs_hold_cleanup_arg_t *ca = arg;
3714 
3715         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3716             B_TRUE);
3717         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3718 }
3719 
3720 void
3721 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3722     minor_t minor)
3723 {
3724         zfs_hold_cleanup_arg_t *ca;
3725 
3726         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3727         ca->dp = ds->ds_dir->dd_pool;
3728         ca->dsobj = ds->ds_object;
3729         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3730         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3731             dsl_dataset_user_release_onexit, ca, NULL));
3732 }
3733 
3734 /*
3735  * If you add new checks here, you may need to add
3736  * additional checks to the "temporary" case in
3737  * snapshot_check() in dmu_objset.c.
3738  */
3739 static int
3740 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3741 {
3742         dsl_dataset_t *ds = arg1;
3743         struct dsl_ds_holdarg *ha = arg2;
3744         const char *htag = ha->htag;
3745         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3746         int error = 0;
3747 
3748         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3749                 return (ENOTSUP);
3750 
3751         if (!dsl_dataset_is_snapshot(ds))
3752                 return (EINVAL);
3753 
3754         /* tags must be unique */
3755         mutex_enter(&ds->ds_lock);
3756         if (ds->ds_phys->ds_userrefs_obj) {
3757                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3758                     8, 1, tx);
3759                 if (error == 0)
3760                         error = EEXIST;
3761                 else if (error == ENOENT)
3762                         error = 0;
3763         }
3764         mutex_exit(&ds->ds_lock);
3765 
3766         if (error == 0 && ha->temphold &&
3767             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3768                 error = E2BIG;
3769 
3770         return (error);
3771 }
3772 
3773 void
3774 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3775 {
3776         dsl_dataset_t *ds = arg1;
3777         struct dsl_ds_holdarg *ha = arg2;
3778         const char *htag = ha->htag;
3779         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3780         objset_t *mos = dp->dp_meta_objset;
3781         uint64_t now = gethrestime_sec();
3782         uint64_t zapobj;
3783 
3784         mutex_enter(&ds->ds_lock);
3785         if (ds->ds_phys->ds_userrefs_obj == 0) {
3786                 /*
3787                  * This is the first user hold for this dataset.  Create
3788                  * the userrefs zap object.
3789                  */
3790                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3791                 zapobj = ds->ds_phys->ds_userrefs_obj =
3792                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3793         } else {
3794                 zapobj = ds->ds_phys->ds_userrefs_obj;
3795         }
3796         ds->ds_userrefs++;
3797         mutex_exit(&ds->ds_lock);
3798 
3799         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3800 
3801         if (ha->temphold) {
3802                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3803                     htag, &now, tx));
3804         }
3805 
3806         spa_history_log_internal_ds(ds, "hold", tx,
3807             "tag = %s temp = %d holds now = %llu",
3808             htag, (int)ha->temphold, ds->ds_userrefs);
3809 }
3810 
3811 static int
3812 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3813 {
3814         struct dsl_ds_holdarg *ha = arg;
3815         dsl_dataset_t *ds;
3816         int error;
3817         char *name;
3818 
3819         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3820         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3821         error = dsl_dataset_hold(name, ha->dstg, &ds);
3822         strfree(name);
3823         if (error == 0) {
3824                 ha->gotone = B_TRUE;
3825                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3826                     dsl_dataset_user_hold_sync, ds, ha, 0);
3827         } else if (error == ENOENT && ha->recursive) {
3828                 error = 0;
3829         } else {
3830                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3831         }
3832         return (error);
3833 }
3834 
3835 int
3836 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3837     boolean_t temphold)
3838 {
3839         struct dsl_ds_holdarg *ha;
3840         int error;
3841 
3842         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3843         ha->htag = htag;
3844         ha->temphold = temphold;
3845         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3846             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3847             ds, ha, 0);
3848         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3849 
3850         return (error);
3851 }
3852 
3853 int
3854 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3855     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3856 {
3857         struct dsl_ds_holdarg *ha;
3858         dsl_sync_task_t *dst;
3859         spa_t *spa;
3860         int error;
3861         minor_t minor = 0;
3862 
3863         if (cleanup_fd != -1) {
3864                 /* Currently we only support cleanup-on-exit of tempholds. */
3865                 if (!temphold)
3866                         return (EINVAL);
3867                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3868                 if (error)
3869                         return (error);
3870         }
3871 
3872         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3873 
3874         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3875 
3876         error = spa_open(dsname, &spa, FTAG);
3877         if (error) {
3878                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3879                 if (cleanup_fd != -1)
3880                         zfs_onexit_fd_rele(cleanup_fd);
3881                 return (error);
3882         }
3883 
3884         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3885         ha->htag = htag;
3886         ha->snapname = snapname;
3887         ha->recursive = recursive;
3888         ha->temphold = temphold;
3889 
3890         if (recursive) {
3891                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3892                     ha, DS_FIND_CHILDREN);
3893         } else {
3894                 error = dsl_dataset_user_hold_one(dsname, ha);
3895         }
3896         if (error == 0)
3897                 error = dsl_sync_task_group_wait(ha->dstg);
3898 
3899         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3900             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3901                 dsl_dataset_t *ds = dst->dst_arg1;
3902 
3903                 if (dst->dst_err) {
3904                         dsl_dataset_name(ds, ha->failed);
3905                         *strchr(ha->failed, '@') = '\0';
3906                 } else if (error == 0 && minor != 0 && temphold) {
3907                         /*
3908                          * If this hold is to be released upon process exit,
3909                          * register that action now.
3910                          */
3911                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3912                 }
3913                 dsl_dataset_rele(ds, ha->dstg);
3914         }
3915 
3916         if (error == 0 && recursive && !ha->gotone)
3917                 error = ENOENT;
3918 
3919         if (error)
3920                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3921 
3922         dsl_sync_task_group_destroy(ha->dstg);
3923 
3924         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3925         spa_close(spa, FTAG);
3926         if (cleanup_fd != -1)
3927                 zfs_onexit_fd_rele(cleanup_fd);
3928         return (error);
3929 }
3930 
3931 struct dsl_ds_releasearg {
3932         dsl_dataset_t *ds;
3933         const char *htag;
3934         boolean_t own;          /* do we own or just hold ds? */
3935 };
3936 
3937 static int
3938 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3939     boolean_t *might_destroy)
3940 {
3941         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3942         uint64_t zapobj;
3943         uint64_t tmp;
3944         int error;
3945 
3946         *might_destroy = B_FALSE;
3947 
3948         mutex_enter(&ds->ds_lock);
3949         zapobj = ds->ds_phys->ds_userrefs_obj;
3950         if (zapobj == 0) {
3951                 /* The tag can't possibly exist */
3952                 mutex_exit(&ds->ds_lock);
3953                 return (ESRCH);
3954         }
3955 
3956         /* Make sure the tag exists */
3957         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3958         if (error) {
3959                 mutex_exit(&ds->ds_lock);
3960                 if (error == ENOENT)
3961                         error = ESRCH;
3962                 return (error);
3963         }
3964 
3965         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3966             DS_IS_DEFER_DESTROY(ds))
3967                 *might_destroy = B_TRUE;
3968 
3969         mutex_exit(&ds->ds_lock);
3970         return (0);
3971 }
3972 
3973 static int
3974 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3975 {
3976         struct dsl_ds_releasearg *ra = arg1;
3977         dsl_dataset_t *ds = ra->ds;
3978         boolean_t might_destroy;
3979         int error;
3980 
3981         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3982                 return (ENOTSUP);
3983 
3984         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3985         if (error)
3986                 return (error);
3987 
3988         if (might_destroy) {
3989                 struct dsl_ds_destroyarg dsda = {0};
3990 
3991                 if (dmu_tx_is_syncing(tx)) {
3992                         /*
3993                          * If we're not prepared to remove the snapshot,
3994                          * we can't allow the release to happen right now.
3995                          */
3996                         if (!ra->own)
3997                                 return (EBUSY);
3998                 }
3999                 dsda.ds = ds;
4000                 dsda.releasing = B_TRUE;
4001                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
4002         }
4003 
4004         return (0);
4005 }
4006 
4007 static void
4008 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
4009 {
4010         struct dsl_ds_releasearg *ra = arg1;
4011         dsl_dataset_t *ds = ra->ds;
4012         dsl_pool_t *dp = ds->ds_dir->dd_pool;
4013         objset_t *mos = dp->dp_meta_objset;
4014         uint64_t zapobj;
4015         uint64_t refs;
4016         int error;
4017 
4018         mutex_enter(&ds->ds_lock);
4019         ds->ds_userrefs--;
4020         refs = ds->ds_userrefs;
4021         mutex_exit(&ds->ds_lock);
4022         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
4023         VERIFY(error == 0 || error == ENOENT);
4024         zapobj = ds->ds_phys->ds_userrefs_obj;
4025         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
4026 
4027         spa_history_log_internal_ds(ds, "release", tx,
4028             "tag = %s refs now = %lld", ra->htag, (longlong_t)refs);
4029 
4030         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
4031             DS_IS_DEFER_DESTROY(ds)) {
4032                 struct dsl_ds_destroyarg dsda = {0};
4033 
4034                 ASSERT(ra->own);
4035                 dsda.ds = ds;
4036                 dsda.releasing = B_TRUE;
4037                 /* We already did the destroy_check */
4038                 dsl_dataset_destroy_sync(&dsda, tag, tx);
4039         }
4040 }
4041 
4042 static int
4043 dsl_dataset_user_release_one(const char *dsname, void *arg)
4044 {
4045         struct dsl_ds_holdarg *ha = arg;
4046         struct dsl_ds_releasearg *ra;
4047         dsl_dataset_t *ds;
4048         int error;
4049         void *dtag = ha->dstg;
4050         char *name;
4051         boolean_t own = B_FALSE;
4052         boolean_t might_destroy;
4053 
4054         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
4055         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4056         error = dsl_dataset_hold(name, dtag, &ds);
4057         strfree(name);
4058         if (error == ENOENT && ha->recursive)
4059                 return (0);
4060         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4061         if (error)
4062                 return (error);
4063 
4064         ha->gotone = B_TRUE;
4065 
4066         ASSERT(dsl_dataset_is_snapshot(ds));
4067 
4068         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
4069         if (error) {
4070                 dsl_dataset_rele(ds, dtag);
4071                 return (error);
4072         }
4073 
4074         if (might_destroy) {
4075 #ifdef _KERNEL
4076                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
4077                 error = zfs_unmount_snap(name, NULL);
4078                 strfree(name);
4079                 if (error) {
4080                         dsl_dataset_rele(ds, dtag);
4081                         return (error);
4082                 }
4083 #endif
4084                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
4085                         dsl_dataset_rele(ds, dtag);
4086                         return (EBUSY);
4087                 } else {
4088                         own = B_TRUE;
4089                         dsl_dataset_make_exclusive(ds, dtag);
4090                 }
4091         }
4092 
4093         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
4094         ra->ds = ds;
4095         ra->htag = ha->htag;
4096         ra->own = own;
4097         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
4098             dsl_dataset_user_release_sync, ra, dtag, 0);
4099 
4100         return (0);
4101 }
4102 
4103 int
4104 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
4105     boolean_t recursive)
4106 {
4107         struct dsl_ds_holdarg *ha;
4108         dsl_sync_task_t *dst;
4109         spa_t *spa;
4110         int error;
4111 
4112 top:
4113         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4114 
4115         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4116 
4117         error = spa_open(dsname, &spa, FTAG);
4118         if (error) {
4119                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4120                 return (error);
4121         }
4122 
4123         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4124         ha->htag = htag;
4125         ha->snapname = snapname;
4126         ha->recursive = recursive;
4127         if (recursive) {
4128                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4129                     ha, DS_FIND_CHILDREN);
4130         } else {
4131                 error = dsl_dataset_user_release_one(dsname, ha);
4132         }
4133         if (error == 0)
4134                 error = dsl_sync_task_group_wait(ha->dstg);
4135 
4136         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4137             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4138                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
4139                 dsl_dataset_t *ds = ra->ds;
4140 
4141                 if (dst->dst_err)
4142                         dsl_dataset_name(ds, ha->failed);
4143 
4144                 if (ra->own)
4145                         dsl_dataset_disown(ds, ha->dstg);
4146                 else
4147                         dsl_dataset_rele(ds, ha->dstg);
4148 
4149                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4150         }
4151 
4152         if (error == 0 && recursive && !ha->gotone)
4153                 error = ENOENT;
4154 
4155         if (error && error != EBUSY)
4156                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4157 
4158         dsl_sync_task_group_destroy(ha->dstg);
4159         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4160         spa_close(spa, FTAG);
4161 
4162         /*
4163          * We can get EBUSY if we were racing with deferred destroy and
4164          * dsl_dataset_user_release_check() hadn't done the necessary
4165          * open context setup.  We can also get EBUSY if we're racing
4166          * with destroy and that thread is the ds_owner.  Either way
4167          * the busy condition should be transient, and we should retry
4168          * the release operation.
4169          */
4170         if (error == EBUSY)
4171                 goto top;
4172 
4173         return (error);
4174 }
4175 
4176 /*
4177  * Called at spa_load time (with retry == B_FALSE) to release a stale
4178  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4179  */
4180 int
4181 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4182     boolean_t retry)
4183 {
4184         dsl_dataset_t *ds;
4185         char *snap;
4186         char *name;
4187         int namelen;
4188         int error;
4189 
4190         do {
4191                 rw_enter(&dp->dp_config_rwlock, RW_READER);
4192                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4193                 rw_exit(&dp->dp_config_rwlock);
4194                 if (error)
4195                         return (error);
4196                 namelen = dsl_dataset_namelen(ds)+1;
4197                 name = kmem_alloc(namelen, KM_SLEEP);
4198                 dsl_dataset_name(ds, name);
4199                 dsl_dataset_rele(ds, FTAG);
4200 
4201                 snap = strchr(name, '@');
4202                 *snap = '\0';
4203                 ++snap;
4204                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4205                 kmem_free(name, namelen);
4206 
4207                 /*
4208                  * The object can't have been destroyed because we have a hold,
4209                  * but it might have been renamed, resulting in ENOENT.  Retry
4210                  * if we've been requested to do so.
4211                  *
4212                  * It would be nice if we could use the dsobj all the way
4213                  * through and avoid ENOENT entirely.  But we might need to
4214                  * unmount the snapshot, and there's currently no way to lookup
4215                  * a vfsp using a ZFS object id.
4216                  */
4217         } while ((error == ENOENT) && retry);
4218 
4219         return (error);
4220 }
4221 
4222 int
4223 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4224 {
4225         dsl_dataset_t *ds;
4226         int err;
4227 
4228         err = dsl_dataset_hold(dsname, FTAG, &ds);
4229         if (err)
4230                 return (err);
4231 
4232         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4233         if (ds->ds_phys->ds_userrefs_obj != 0) {
4234                 zap_attribute_t *za;
4235                 zap_cursor_t zc;
4236 
4237                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4238                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4239                     ds->ds_phys->ds_userrefs_obj);
4240                     zap_cursor_retrieve(&zc, za) == 0;
4241                     zap_cursor_advance(&zc)) {
4242                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4243                             za->za_first_integer));
4244                 }
4245                 zap_cursor_fini(&zc);
4246                 kmem_free(za, sizeof (zap_attribute_t));
4247         }
4248         dsl_dataset_rele(ds, FTAG);
4249         return (0);
4250 }
4251 
4252 /*
4253  * Note, this function is used as the callback for dmu_objset_find().  We
4254  * always return 0 so that we will continue to find and process
4255  * inconsistent datasets, even if we encounter an error trying to
4256  * process one of them.
4257  */
4258 /* ARGSUSED */
4259 int
4260 dsl_destroy_inconsistent(const char *dsname, void *arg)
4261 {
4262         dsl_dataset_t *ds;
4263 
4264         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4265                 if (DS_IS_INCONSISTENT(ds))
4266                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4267                 else
4268                         dsl_dataset_disown(ds, FTAG);
4269         }
4270         return (0);
4271 }
4272 
4273 /*
4274  * Return (in *usedp) the amount of space written in new that is not
4275  * present in oldsnap.  New may be a snapshot or the head.  Old must be
4276  * a snapshot before new, in new's filesystem (or its origin).  If not then
4277  * fail and return EINVAL.
4278  *
4279  * The written space is calculated by considering two components:  First, we
4280  * ignore any freed space, and calculate the written as new's used space
4281  * minus old's used space.  Next, we add in the amount of space that was freed
4282  * between the two snapshots, thus reducing new's used space relative to old's.
4283  * Specifically, this is the space that was born before old->ds_creation_txg,
4284  * and freed before new (ie. on new's deadlist or a previous deadlist).
4285  *
4286  * space freed                         [---------------------]
4287  * snapshots                       ---O-------O--------O-------O------
4288  *                                         oldsnap            new
4289  */
4290 int
4291 dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4292     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4293 {
4294         int err = 0;
4295         uint64_t snapobj;
4296         dsl_pool_t *dp = new->ds_dir->dd_pool;
4297 
4298         *usedp = 0;
4299         *usedp += new->ds_phys->ds_referenced_bytes;
4300         *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4301 
4302         *compp = 0;
4303         *compp += new->ds_phys->ds_compressed_bytes;
4304         *compp -= oldsnap->ds_phys->ds_compressed_bytes;
4305 
4306         *uncompp = 0;
4307         *uncompp += new->ds_phys->ds_uncompressed_bytes;
4308         *uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4309 
4310         rw_enter(&dp->dp_config_rwlock, RW_READER);
4311         snapobj = new->ds_object;
4312         while (snapobj != oldsnap->ds_object) {
4313                 dsl_dataset_t *snap;
4314                 uint64_t used, comp, uncomp;
4315 
4316                 if (snapobj == new->ds_object) {
4317                         snap = new;
4318                 } else {
4319                         err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4320                         if (err != 0)
4321                                 break;
4322                 }
4323 
4324                 if (snap->ds_phys->ds_prev_snap_txg ==
4325                     oldsnap->ds_phys->ds_creation_txg) {
4326                         /*
4327                          * The blocks in the deadlist can not be born after
4328                          * ds_prev_snap_txg, so get the whole deadlist space,
4329                          * which is more efficient (especially for old-format
4330                          * deadlists).  Unfortunately the deadlist code
4331                          * doesn't have enough information to make this
4332                          * optimization itself.
4333                          */
4334                         dsl_deadlist_space(&snap->ds_deadlist,
4335                             &used, &comp, &uncomp);
4336                 } else {
4337                         dsl_deadlist_space_range(&snap->ds_deadlist,
4338                             0, oldsnap->ds_phys->ds_creation_txg,
4339                             &used, &comp, &uncomp);
4340                 }
4341                 *usedp += used;
4342                 *compp += comp;
4343                 *uncompp += uncomp;
4344 
4345                 /*
4346                  * If we get to the beginning of the chain of snapshots
4347                  * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4348                  * was not a snapshot of/before new.
4349                  */
4350                 snapobj = snap->ds_phys->ds_prev_snap_obj;
4351                 if (snap != new)
4352                         dsl_dataset_rele(snap, FTAG);
4353                 if (snapobj == 0) {
4354                         err = EINVAL;
4355                         break;
4356                 }
4357 
4358         }
4359         rw_exit(&dp->dp_config_rwlock);
4360         return (err);
4361 }
4362 
4363 /*
4364  * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4365  * lastsnap, and all snapshots in between are deleted.
4366  *
4367  * blocks that would be freed            [---------------------------]
4368  * snapshots                       ---O-------O--------O-------O--------O
4369  *                                        firstsnap        lastsnap
4370  *
4371  * This is the set of blocks that were born after the snap before firstsnap,
4372  * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4373  * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4374  * We calculate this by iterating over the relevant deadlists (from the snap
4375  * after lastsnap, backward to the snap after firstsnap), summing up the
4376  * space on the deadlist that was born after the snap before firstsnap.
4377  */
4378 int
4379 dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4380     dsl_dataset_t *lastsnap,
4381     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4382 {
4383         int err = 0;
4384         uint64_t snapobj;
4385         dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4386 
4387         ASSERT(dsl_dataset_is_snapshot(firstsnap));
4388         ASSERT(dsl_dataset_is_snapshot(lastsnap));
4389 
4390         /*
4391          * Check that the snapshots are in the same dsl_dir, and firstsnap
4392          * is before lastsnap.
4393          */
4394         if (firstsnap->ds_dir != lastsnap->ds_dir ||
4395             firstsnap->ds_phys->ds_creation_txg >
4396             lastsnap->ds_phys->ds_creation_txg)
4397                 return (EINVAL);
4398 
4399         *usedp = *compp = *uncompp = 0;
4400 
4401         rw_enter(&dp->dp_config_rwlock, RW_READER);
4402         snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4403         while (snapobj != firstsnap->ds_object) {
4404                 dsl_dataset_t *ds;
4405                 uint64_t used, comp, uncomp;
4406 
4407                 err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4408                 if (err != 0)
4409                         break;
4410 
4411                 dsl_deadlist_space_range(&ds->ds_deadlist,
4412                     firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4413                     &used, &comp, &uncomp);
4414                 *usedp += used;
4415                 *compp += comp;
4416                 *uncompp += uncomp;
4417 
4418                 snapobj = ds->ds_phys->ds_prev_snap_obj;
4419                 ASSERT3U(snapobj, !=, 0);
4420                 dsl_dataset_rele(ds, FTAG);
4421         }
4422         rw_exit(&dp->dp_config_rwlock);
4423         return (err);
4424 }