1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  25  */
  26 
  27 #include <sys/dmu_objset.h>
  28 #include <sys/dsl_dataset.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_prop.h>
  31 #include <sys/dsl_synctask.h>
  32 #include <sys/dmu_traverse.h>
  33 #include <sys/dmu_impl.h>
  34 #include <sys/dmu_tx.h>
  35 #include <sys/arc.h>
  36 #include <sys/zio.h>
  37 #include <sys/zap.h>
  38 #include <sys/zfeature.h>
  39 #include <sys/unique.h>
  40 #include <sys/zfs_context.h>
  41 #include <sys/zfs_ioctl.h>
  42 #include <sys/spa.h>
  43 #include <sys/zfs_znode.h>
  44 #include <sys/zfs_onexit.h>
  45 #include <sys/zvol.h>
  46 #include <sys/dsl_scan.h>
  47 #include <sys/dsl_deadlist.h>
  48 
  49 static char *dsl_reaper = "the grim reaper";
  50 
  51 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
  52 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
  53 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
  54 
  55 #define SWITCH64(x, y) \
  56         { \
  57                 uint64_t __tmp = (x); \
  58                 (x) = (y); \
  59                 (y) = __tmp; \
  60         }
  61 
  62 #define DS_REF_MAX      (1ULL << 62)
  63 
  64 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
  65 
  66 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
  67 
  68 
  69 /*
  70  * Figure out how much of this delta should be propogated to the dsl_dir
  71  * layer.  If there's a refreservation, that space has already been
  72  * partially accounted for in our ancestors.
  73  */
  74 static int64_t
  75 parent_delta(dsl_dataset_t *ds, int64_t delta)
  76 {
  77         uint64_t old_bytes, new_bytes;
  78 
  79         if (ds->ds_reserved == 0)
  80                 return (delta);
  81 
  82         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
  83         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
  84 
  85         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
  86         return (new_bytes - old_bytes);
  87 }
  88 
  89 void
  90 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
  91 {
  92         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
  93         int compressed = BP_GET_PSIZE(bp);
  94         int uncompressed = BP_GET_UCSIZE(bp);
  95         int64_t delta;
  96 
  97         dprintf_bp(bp, "ds=%p", ds);
  98 
  99         ASSERT(dmu_tx_is_syncing(tx));
 100         /* It could have been compressed away to nothing */
 101         if (BP_IS_HOLE(bp))
 102                 return;
 103         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
 104         ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
 105         if (ds == NULL) {
 106                 /*
 107                  * Account for the meta-objset space in its placeholder
 108                  * dsl_dir.
 109                  */
 110                 ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
 111                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
 112                     used, compressed, uncompressed, tx);
 113                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
 114                 return;
 115         }
 116         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 117 
 118         mutex_enter(&ds->ds_dir->dd_lock);
 119         mutex_enter(&ds->ds_lock);
 120         delta = parent_delta(ds, used);
 121         ds->ds_phys->ds_referenced_bytes += used;
 122         ds->ds_phys->ds_compressed_bytes += compressed;
 123         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
 124         ds->ds_phys->ds_unique_bytes += used;
 125         mutex_exit(&ds->ds_lock);
 126         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
 127             compressed, uncompressed, tx);
 128         dsl_dir_transfer_space(ds->ds_dir, used - delta,
 129             DD_USED_REFRSRV, DD_USED_HEAD, tx);
 130         mutex_exit(&ds->ds_dir->dd_lock);
 131 }
 132 
 133 int
 134 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
 135     boolean_t async)
 136 {
 137         if (BP_IS_HOLE(bp))
 138                 return (0);
 139 
 140         ASSERT(dmu_tx_is_syncing(tx));
 141         ASSERT(bp->blk_birth <= tx->tx_txg);
 142 
 143         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
 144         int compressed = BP_GET_PSIZE(bp);
 145         int uncompressed = BP_GET_UCSIZE(bp);
 146 
 147         ASSERT(used > 0);
 148         if (ds == NULL) {
 149                 /*
 150                  * Account for the meta-objset space in its placeholder
 151                  * dataset.
 152                  */
 153                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 154 
 155                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
 156                     -used, -compressed, -uncompressed, tx);
 157                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
 158                 return (used);
 159         }
 160         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
 161 
 162         ASSERT(!dsl_dataset_is_snapshot(ds));
 163         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 164 
 165         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
 166                 int64_t delta;
 167 
 168                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
 169                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 170 
 171                 mutex_enter(&ds->ds_dir->dd_lock);
 172                 mutex_enter(&ds->ds_lock);
 173                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
 174                     !DS_UNIQUE_IS_ACCURATE(ds));
 175                 delta = parent_delta(ds, -used);
 176                 ds->ds_phys->ds_unique_bytes -= used;
 177                 mutex_exit(&ds->ds_lock);
 178                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
 179                     delta, -compressed, -uncompressed, tx);
 180                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
 181                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
 182                 mutex_exit(&ds->ds_dir->dd_lock);
 183         } else {
 184                 dprintf_bp(bp, "putting on dead list: %s", "");
 185                 if (async) {
 186                         /*
 187                          * We are here as part of zio's write done callback,
 188                          * which means we're a zio interrupt thread.  We can't
 189                          * call dsl_deadlist_insert() now because it may block
 190                          * waiting for I/O.  Instead, put bp on the deferred
 191                          * queue and let dsl_pool_sync() finish the job.
 192                          */
 193                         bplist_append(&ds->ds_pending_deadlist, bp);
 194                 } else {
 195                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
 196                 }
 197                 ASSERT3U(ds->ds_prev->ds_object, ==,
 198                     ds->ds_phys->ds_prev_snap_obj);
 199                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
 200                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
 201                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
 202                     ds->ds_object && bp->blk_birth >
 203                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
 204                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
 205                         mutex_enter(&ds->ds_prev->ds_lock);
 206                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
 207                         mutex_exit(&ds->ds_prev->ds_lock);
 208                 }
 209                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
 210                         dsl_dir_transfer_space(ds->ds_dir, used,
 211                             DD_USED_HEAD, DD_USED_SNAP, tx);
 212                 }
 213         }
 214         mutex_enter(&ds->ds_lock);
 215         ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
 216         ds->ds_phys->ds_referenced_bytes -= used;
 217         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
 218         ds->ds_phys->ds_compressed_bytes -= compressed;
 219         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
 220         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
 221         mutex_exit(&ds->ds_lock);
 222 
 223         return (used);
 224 }
 225 
 226 uint64_t
 227 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
 228 {
 229         uint64_t trysnap = 0;
 230 
 231         if (ds == NULL)
 232                 return (0);
 233         /*
 234          * The snapshot creation could fail, but that would cause an
 235          * incorrect FALSE return, which would only result in an
 236          * overestimation of the amount of space that an operation would
 237          * consume, which is OK.
 238          *
 239          * There's also a small window where we could miss a pending
 240          * snapshot, because we could set the sync task in the quiescing
 241          * phase.  So this should only be used as a guess.
 242          */
 243         if (ds->ds_trysnap_txg >
 244             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
 245                 trysnap = ds->ds_trysnap_txg;
 246         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
 247 }
 248 
 249 boolean_t
 250 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
 251     uint64_t blk_birth)
 252 {
 253         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
 254                 return (B_FALSE);
 255 
 256         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
 257 
 258         return (B_TRUE);
 259 }
 260 
 261 /* ARGSUSED */
 262 static void
 263 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
 264 {
 265         dsl_dataset_t *ds = dsv;
 266 
 267         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 268 
 269         unique_remove(ds->ds_fsid_guid);
 270 
 271         if (ds->ds_objset != NULL)
 272                 dmu_objset_evict(ds->ds_objset);
 273 
 274         if (ds->ds_prev) {
 275                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 276                 ds->ds_prev = NULL;
 277         }
 278 
 279         bplist_destroy(&ds->ds_pending_deadlist);
 280         if (db != NULL) {
 281                 dsl_deadlist_close(&ds->ds_deadlist);
 282         } else {
 283                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
 284                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
 285         }
 286         if (ds->ds_dir)
 287                 dsl_dir_close(ds->ds_dir, ds);
 288 
 289         ASSERT(!list_link_active(&ds->ds_synced_link));
 290 
 291         mutex_destroy(&ds->ds_lock);
 292         mutex_destroy(&ds->ds_opening_lock);
 293         rw_destroy(&ds->ds_rwlock);
 294         cv_destroy(&ds->ds_exclusive_cv);
 295 
 296         kmem_free(ds, sizeof (dsl_dataset_t));
 297 }
 298 
 299 static int
 300 dsl_dataset_get_snapname(dsl_dataset_t *ds)
 301 {
 302         dsl_dataset_phys_t *headphys;
 303         int err;
 304         dmu_buf_t *headdbuf;
 305         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 306         objset_t *mos = dp->dp_meta_objset;
 307 
 308         if (ds->ds_snapname[0])
 309                 return (0);
 310         if (ds->ds_phys->ds_next_snap_obj == 0)
 311                 return (0);
 312 
 313         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
 314             FTAG, &headdbuf);
 315         if (err)
 316                 return (err);
 317         headphys = headdbuf->db_data;
 318         err = zap_value_search(dp->dp_meta_objset,
 319             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
 320         dmu_buf_rele(headdbuf, FTAG);
 321         return (err);
 322 }
 323 
 324 static int
 325 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 326 {
 327         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 328         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 329         matchtype_t mt;
 330         int err;
 331 
 332         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 333                 mt = MT_FIRST;
 334         else
 335                 mt = MT_EXACT;
 336 
 337         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 338             value, mt, NULL, 0, NULL);
 339         if (err == ENOTSUP && mt == MT_FIRST)
 340                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
 341         return (err);
 342 }
 343 
 344 static int
 345 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
 346 {
 347         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 348         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 349         matchtype_t mt;
 350         int err;
 351 
 352         dsl_dir_snap_cmtime_update(ds->ds_dir);
 353 
 354         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 355                 mt = MT_FIRST;
 356         else
 357                 mt = MT_EXACT;
 358 
 359         err = zap_remove_norm(mos, snapobj, name, mt, tx);
 360         if (err == ENOTSUP && mt == MT_FIRST)
 361                 err = zap_remove(mos, snapobj, name, tx);
 362         return (err);
 363 }
 364 
 365 static int
 366 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 367     dsl_dataset_t **dsp)
 368 {
 369         objset_t *mos = dp->dp_meta_objset;
 370         dmu_buf_t *dbuf;
 371         dsl_dataset_t *ds;
 372         int err;
 373         dmu_object_info_t doi;
 374 
 375         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
 376             dsl_pool_sync_context(dp));
 377 
 378         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
 379         if (err)
 380                 return (err);
 381 
 382         /* Make sure dsobj has the correct object type. */
 383         dmu_object_info_from_db(dbuf, &doi);
 384         if (doi.doi_type != DMU_OT_DSL_DATASET)
 385                 return (EINVAL);
 386 
 387         ds = dmu_buf_get_user(dbuf);
 388         if (ds == NULL) {
 389                 dsl_dataset_t *winner;
 390 
 391                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
 392                 ds->ds_dbuf = dbuf;
 393                 ds->ds_object = dsobj;
 394                 ds->ds_phys = dbuf->db_data;
 395 
 396                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
 397                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 398                 mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
 399 
 400                 rw_init(&ds->ds_rwlock, 0, 0, 0);
 401                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 402 
 403                 bplist_create(&ds->ds_pending_deadlist);
 404                 dsl_deadlist_open(&ds->ds_deadlist,
 405                     mos, ds->ds_phys->ds_deadlist_obj);
 406 
 407                 list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
 408                     offsetof(dmu_sendarg_t, dsa_link));
 409 
 410                 if (err == 0) {
 411                         err = dsl_dir_open_obj(dp,
 412                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
 413                 }
 414                 if (err) {
 415                         mutex_destroy(&ds->ds_lock);
 416                         mutex_destroy(&ds->ds_opening_lock);
 417                         rw_destroy(&ds->ds_rwlock);
 418                         cv_destroy(&ds->ds_exclusive_cv);
 419                         bplist_destroy(&ds->ds_pending_deadlist);
 420                         dsl_deadlist_close(&ds->ds_deadlist);
 421                         kmem_free(ds, sizeof (dsl_dataset_t));
 422                         dmu_buf_rele(dbuf, tag);
 423                         return (err);
 424                 }
 425 
 426                 if (!dsl_dataset_is_snapshot(ds)) {
 427                         ds->ds_snapname[0] = '\0';
 428                         if (ds->ds_phys->ds_prev_snap_obj) {
 429                                 err = dsl_dataset_get_ref(dp,
 430                                     ds->ds_phys->ds_prev_snap_obj,
 431                                     ds, &ds->ds_prev);
 432                         }
 433                 } else {
 434                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
 435                                 err = dsl_dataset_get_snapname(ds);
 436                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
 437                                 err = zap_count(
 438                                     ds->ds_dir->dd_pool->dp_meta_objset,
 439                                     ds->ds_phys->ds_userrefs_obj,
 440                                     &ds->ds_userrefs);
 441                         }
 442                 }
 443 
 444                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
 445                         /*
 446                          * In sync context, we're called with either no lock
 447                          * or with the write lock.  If we're not syncing,
 448                          * we're always called with the read lock held.
 449                          */
 450                         boolean_t need_lock =
 451                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
 452                             dsl_pool_sync_context(dp);
 453 
 454                         if (need_lock)
 455                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 456 
 457                         err = dsl_prop_get_ds(ds,
 458                             "refreservation", sizeof (uint64_t), 1,
 459                             &ds->ds_reserved, NULL);
 460                         if (err == 0) {
 461                                 err = dsl_prop_get_ds(ds,
 462                                     "refquota", sizeof (uint64_t), 1,
 463                                     &ds->ds_quota, NULL);
 464                         }
 465 
 466                         if (need_lock)
 467                                 rw_exit(&dp->dp_config_rwlock);
 468                 } else {
 469                         ds->ds_reserved = ds->ds_quota = 0;
 470                 }
 471 
 472                 if (err == 0) {
 473                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
 474                             dsl_dataset_evict);
 475                 }
 476                 if (err || winner) {
 477                         bplist_destroy(&ds->ds_pending_deadlist);
 478                         dsl_deadlist_close(&ds->ds_deadlist);
 479                         if (ds->ds_prev)
 480                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 481                         dsl_dir_close(ds->ds_dir, ds);
 482                         mutex_destroy(&ds->ds_lock);
 483                         mutex_destroy(&ds->ds_opening_lock);
 484                         rw_destroy(&ds->ds_rwlock);
 485                         cv_destroy(&ds->ds_exclusive_cv);
 486                         kmem_free(ds, sizeof (dsl_dataset_t));
 487                         if (err) {
 488                                 dmu_buf_rele(dbuf, tag);
 489                                 return (err);
 490                         }
 491                         ds = winner;
 492                 } else {
 493                         ds->ds_fsid_guid =
 494                             unique_insert(ds->ds_phys->ds_fsid_guid);
 495                 }
 496         }
 497         ASSERT3P(ds->ds_dbuf, ==, dbuf);
 498         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
 499         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
 500             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
 501             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
 502         mutex_enter(&ds->ds_lock);
 503         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 504                 mutex_exit(&ds->ds_lock);
 505                 dmu_buf_rele(ds->ds_dbuf, tag);
 506                 return (ENOENT);
 507         }
 508         mutex_exit(&ds->ds_lock);
 509         *dsp = ds;
 510         return (0);
 511 }
 512 
 513 static int
 514 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
 515 {
 516         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 517 
 518         /*
 519          * In syncing context we don't want the rwlock lock: there
 520          * may be an existing writer waiting for sync phase to
 521          * finish.  We don't need to worry about such writers, since
 522          * sync phase is single-threaded, so the writer can't be
 523          * doing anything while we are active.
 524          */
 525         if (dsl_pool_sync_context(dp)) {
 526                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 527                 return (0);
 528         }
 529 
 530         /*
 531          * Normal users will hold the ds_rwlock as a READER until they
 532          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
 533          * drop their READER lock after they set the ds_owner field.
 534          *
 535          * If the dataset is being destroyed, the destroy thread will
 536          * obtain a WRITER lock for exclusive access after it's done its
 537          * open-context work and then change the ds_owner to
 538          * dsl_reaper once destruction is assured.  So threads
 539          * may block here temporarily, until the "destructability" of
 540          * the dataset is determined.
 541          */
 542         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
 543         mutex_enter(&ds->ds_lock);
 544         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
 545                 int rc;
 546 
 547                 rw_exit(&dp->dp_config_rwlock);
 548                 rc = cv_wait_sig(&ds->ds_exclusive_cv, &ds->ds_lock);
 549                 if (!rc || DSL_DATASET_IS_DESTROYED(ds)) {
 550                         mutex_exit(&ds->ds_lock);
 551                         dsl_dataset_drop_ref(ds, tag);
 552                         rw_enter(&dp->dp_config_rwlock, RW_READER);
 553                         return (rc ? ENOENT : EINTR);
 554                 }
 555                 /*
 556                  * The dp_config_rwlock lives above the ds_lock. And
 557                  * we need to check DSL_DATASET_IS_DESTROYED() while
 558                  * holding the ds_lock, so we have to drop and reacquire
 559                  * the ds_lock here.
 560                  */
 561                 mutex_exit(&ds->ds_lock);
 562                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 563                 mutex_enter(&ds->ds_lock);
 564         }
 565         mutex_exit(&ds->ds_lock);
 566         return (0);
 567 }
 568 
 569 int
 570 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 571     dsl_dataset_t **dsp)
 572 {
 573         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
 574 
 575         if (err)
 576                 return (err);
 577         return (dsl_dataset_hold_ref(*dsp, tag));
 578 }
 579 
 580 int
 581 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
 582     void *tag, dsl_dataset_t **dsp)
 583 {
 584         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
 585         if (err)
 586                 return (err);
 587         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 588                 dsl_dataset_rele(*dsp, tag);
 589                 *dsp = NULL;
 590                 return (EBUSY);
 591         }
 592         return (0);
 593 }
 594 
 595 int
 596 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 597 {
 598         dsl_dir_t *dd;
 599         dsl_pool_t *dp;
 600         const char *snapname;
 601         uint64_t obj;
 602         int err = 0;
 603 
 604         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 605         if (err)
 606                 return (err);
 607 
 608         dp = dd->dd_pool;
 609         obj = dd->dd_phys->dd_head_dataset_obj;
 610         rw_enter(&dp->dp_config_rwlock, RW_READER);
 611         if (obj)
 612                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
 613         else
 614                 err = ENOENT;
 615         if (err)
 616                 goto out;
 617 
 618         err = dsl_dataset_hold_ref(*dsp, tag);
 619 
 620         /* we may be looking for a snapshot */
 621         if (err == 0 && snapname != NULL) {
 622                 dsl_dataset_t *ds = NULL;
 623 
 624                 if (*snapname++ != '@') {
 625                         dsl_dataset_rele(*dsp, tag);
 626                         err = ENOENT;
 627                         goto out;
 628                 }
 629 
 630                 dprintf("looking for snapshot '%s'\n", snapname);
 631                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
 632                 if (err == 0)
 633                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
 634                 dsl_dataset_rele(*dsp, tag);
 635 
 636                 ASSERT3U((err == 0), ==, (ds != NULL));
 637 
 638                 if (ds) {
 639                         mutex_enter(&ds->ds_lock);
 640                         if (ds->ds_snapname[0] == 0)
 641                                 (void) strlcpy(ds->ds_snapname, snapname,
 642                                     sizeof (ds->ds_snapname));
 643                         mutex_exit(&ds->ds_lock);
 644                         err = dsl_dataset_hold_ref(ds, tag);
 645                         *dsp = err ? NULL : ds;
 646                 }
 647         }
 648 out:
 649         rw_exit(&dp->dp_config_rwlock);
 650         dsl_dir_close(dd, FTAG);
 651         return (err);
 652 }
 653 
 654 int
 655 dsl_dataset_own(const char *name, boolean_t inconsistentok,
 656     void *tag, dsl_dataset_t **dsp)
 657 {
 658         int err = dsl_dataset_hold(name, tag, dsp);
 659         if (err)
 660                 return (err);
 661         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 662                 dsl_dataset_rele(*dsp, tag);
 663                 return (EBUSY);
 664         }
 665         return (0);
 666 }
 667 
 668 void
 669 dsl_dataset_name(dsl_dataset_t *ds, char *name)
 670 {
 671         if (ds == NULL) {
 672                 (void) strcpy(name, "mos");
 673         } else {
 674                 dsl_dir_name(ds->ds_dir, name);
 675                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 676                 if (ds->ds_snapname[0]) {
 677                         (void) strcat(name, "@");
 678                         /*
 679                          * We use a "recursive" mutex so that we
 680                          * can call dprintf_ds() with ds_lock held.
 681                          */
 682                         if (!MUTEX_HELD(&ds->ds_lock)) {
 683                                 mutex_enter(&ds->ds_lock);
 684                                 (void) strcat(name, ds->ds_snapname);
 685                                 mutex_exit(&ds->ds_lock);
 686                         } else {
 687                                 (void) strcat(name, ds->ds_snapname);
 688                         }
 689                 }
 690         }
 691 }
 692 
 693 static int
 694 dsl_dataset_namelen(dsl_dataset_t *ds)
 695 {
 696         int result;
 697 
 698         if (ds == NULL) {
 699                 result = 3;     /* "mos" */
 700         } else {
 701                 result = dsl_dir_namelen(ds->ds_dir);
 702                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 703                 if (ds->ds_snapname[0]) {
 704                         ++result;       /* adding one for the @-sign */
 705                         if (!MUTEX_HELD(&ds->ds_lock)) {
 706                                 mutex_enter(&ds->ds_lock);
 707                                 result += strlen(ds->ds_snapname);
 708                                 mutex_exit(&ds->ds_lock);
 709                         } else {
 710                                 result += strlen(ds->ds_snapname);
 711                         }
 712                 }
 713         }
 714 
 715         return (result);
 716 }
 717 
 718 void
 719 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 720 {
 721         dmu_buf_rele(ds->ds_dbuf, tag);
 722 }
 723 
 724 void
 725 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 726 {
 727         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
 728                 rw_exit(&ds->ds_rwlock);
 729         }
 730         dsl_dataset_drop_ref(ds, tag);
 731 }
 732 
 733 void
 734 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
 735 {
 736         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
 737             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
 738 
 739         mutex_enter(&ds->ds_lock);
 740         ds->ds_owner = NULL;
 741         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
 742                 rw_exit(&ds->ds_rwlock);
 743                 cv_broadcast(&ds->ds_exclusive_cv);
 744         }
 745         mutex_exit(&ds->ds_lock);
 746         if (ds->ds_dbuf)
 747                 dsl_dataset_drop_ref(ds, tag);
 748         else
 749                 dsl_dataset_evict(NULL, ds);
 750 }
 751 
 752 boolean_t
 753 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
 754 {
 755         boolean_t gotit = FALSE;
 756 
 757         mutex_enter(&ds->ds_lock);
 758         if (ds->ds_owner == NULL &&
 759             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
 760                 ds->ds_owner = tag;
 761                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
 762                         rw_exit(&ds->ds_rwlock);
 763                 gotit = TRUE;
 764         }
 765         mutex_exit(&ds->ds_lock);
 766         return (gotit);
 767 }
 768 
 769 void
 770 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
 771 {
 772         ASSERT3P(owner, ==, ds->ds_owner);
 773         if (!RW_WRITE_HELD(&ds->ds_rwlock))
 774                 rw_enter(&ds->ds_rwlock, RW_WRITER);
 775 }
 776 
 777 uint64_t
 778 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
 779     uint64_t flags, dmu_tx_t *tx)
 780 {
 781         dsl_pool_t *dp = dd->dd_pool;
 782         dmu_buf_t *dbuf;
 783         dsl_dataset_phys_t *dsphys;
 784         uint64_t dsobj;
 785         objset_t *mos = dp->dp_meta_objset;
 786 
 787         if (origin == NULL)
 788                 origin = dp->dp_origin_snap;
 789 
 790         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
 791         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
 792         ASSERT(dmu_tx_is_syncing(tx));
 793         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 794 
 795         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
 796             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
 797         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 798         dmu_buf_will_dirty(dbuf, tx);
 799         dsphys = dbuf->db_data;
 800         bzero(dsphys, sizeof (dsl_dataset_phys_t));
 801         dsphys->ds_dir_obj = dd->dd_object;
 802         dsphys->ds_flags = flags;
 803         dsphys->ds_fsid_guid = unique_create();
 804         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
 805             sizeof (dsphys->ds_guid));
 806         dsphys->ds_snapnames_zapobj =
 807             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
 808             DMU_OT_NONE, 0, tx);
 809         dsphys->ds_creation_time = gethrestime_sec();
 810         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
 811 
 812         if (origin == NULL) {
 813                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
 814         } else {
 815                 dsl_dataset_t *ohds;
 816 
 817                 dsphys->ds_prev_snap_obj = origin->ds_object;
 818                 dsphys->ds_prev_snap_txg =
 819                     origin->ds_phys->ds_creation_txg;
 820                 dsphys->ds_referenced_bytes =
 821                     origin->ds_phys->ds_referenced_bytes;
 822                 dsphys->ds_compressed_bytes =
 823                     origin->ds_phys->ds_compressed_bytes;
 824                 dsphys->ds_uncompressed_bytes =
 825                     origin->ds_phys->ds_uncompressed_bytes;
 826                 dsphys->ds_bp = origin->ds_phys->ds_bp;
 827                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
 828 
 829                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
 830                 origin->ds_phys->ds_num_children++;
 831 
 832                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 833                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
 834                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
 835                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
 836                 dsl_dataset_rele(ohds, FTAG);
 837 
 838                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
 839                         if (origin->ds_phys->ds_next_clones_obj == 0) {
 840                                 origin->ds_phys->ds_next_clones_obj =
 841                                     zap_create(mos,
 842                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 843                         }
 844                         VERIFY(0 == zap_add_int(mos,
 845                             origin->ds_phys->ds_next_clones_obj,
 846                             dsobj, tx));
 847                 }
 848 
 849                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
 850                 dd->dd_phys->dd_origin_obj = origin->ds_object;
 851                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
 852                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
 853                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 854                                 origin->ds_dir->dd_phys->dd_clones =
 855                                     zap_create(mos,
 856                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 857                         }
 858                         VERIFY3U(0, ==, zap_add_int(mos,
 859                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 860                 }
 861         }
 862 
 863         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
 864                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
 865 
 866         dmu_buf_rele(dbuf, FTAG);
 867 
 868         dmu_buf_will_dirty(dd->dd_dbuf, tx);
 869         dd->dd_phys->dd_head_dataset_obj = dsobj;
 870 
 871         return (dsobj);
 872 }
 873 
 874 uint64_t
 875 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
 876     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
 877 {
 878         dsl_pool_t *dp = pdd->dd_pool;
 879         uint64_t dsobj, ddobj;
 880         dsl_dir_t *dd;
 881 
 882         ASSERT(lastname[0] != '@');
 883 
 884         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
 885         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
 886 
 887         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
 888 
 889         dsl_deleg_set_create_perms(dd, tx, cr);
 890 
 891         dsl_dir_close(dd, FTAG);
 892 
 893         /*
 894          * If we are creating a clone, make sure we zero out any stale
 895          * data from the origin snapshots zil header.
 896          */
 897         if (origin != NULL) {
 898                 dsl_dataset_t *ds;
 899                 objset_t *os;
 900 
 901                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 902                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
 903                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
 904                 dsl_dataset_dirty(ds, tx);
 905                 dsl_dataset_rele(ds, FTAG);
 906         }
 907 
 908         return (dsobj);
 909 }
 910 
 911 /*
 912  * The snapshots must all be in the same pool.
 913  */
 914 int
 915 dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer, char *failed)
 916 {
 917         int err;
 918         dsl_sync_task_t *dst;
 919         spa_t *spa;
 920         nvpair_t *pair;
 921         dsl_sync_task_group_t *dstg;
 922 
 923         pair = nvlist_next_nvpair(snaps, NULL);
 924         if (pair == NULL)
 925                 return (0);
 926 
 927         err = spa_open(nvpair_name(pair), &spa, FTAG);
 928         if (err)
 929                 return (err);
 930         dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 931 
 932         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
 933             pair = nvlist_next_nvpair(snaps, pair)) {
 934                 dsl_dataset_t *ds;
 935 
 936                 err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
 937                 if (err == 0) {
 938                         struct dsl_ds_destroyarg *dsda;
 939 
 940                         dsl_dataset_make_exclusive(ds, dstg);
 941                         dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
 942                             KM_SLEEP);
 943                         dsda->ds = ds;
 944                         dsda->defer = defer;
 945                         dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 946                             dsl_dataset_destroy_sync, dsda, dstg, 0);
 947                 } else if (err == ENOENT) {
 948                         err = 0;
 949                 } else {
 950                         (void) strcpy(failed, nvpair_name(pair));
 951                         break;
 952                 }
 953         }
 954 
 955         if (err == 0)
 956                 err = dsl_sync_task_group_wait(dstg);
 957 
 958         for (dst = list_head(&dstg->dstg_tasks); dst;
 959             dst = list_next(&dstg->dstg_tasks, dst)) {
 960                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
 961                 dsl_dataset_t *ds = dsda->ds;
 962 
 963                 /*
 964                  * Return the file system name that triggered the error
 965                  */
 966                 if (dst->dst_err) {
 967                         dsl_dataset_name(ds, failed);
 968                 }
 969                 ASSERT3P(dsda->rm_origin, ==, NULL);
 970                 dsl_dataset_disown(ds, dstg);
 971                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
 972         }
 973 
 974         dsl_sync_task_group_destroy(dstg);
 975         spa_close(spa, FTAG);
 976         return (err);
 977 
 978 }
 979 
 980 static boolean_t
 981 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
 982 {
 983         boolean_t might_destroy = B_FALSE;
 984 
 985         mutex_enter(&ds->ds_lock);
 986         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
 987             DS_IS_DEFER_DESTROY(ds))
 988                 might_destroy = B_TRUE;
 989         mutex_exit(&ds->ds_lock);
 990 
 991         return (might_destroy);
 992 }
 993 
 994 /*
 995  * If we're removing a clone, and these three conditions are true:
 996  *      1) the clone's origin has no other children
 997  *      2) the clone's origin has no user references
 998  *      3) the clone's origin has been marked for deferred destruction
 999  * Then, prepare to remove the origin as part of this sync task group.
1000  */
1001 static int
1002 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1003 {
1004         dsl_dataset_t *ds = dsda->ds;
1005         dsl_dataset_t *origin = ds->ds_prev;
1006 
1007         if (dsl_dataset_might_destroy_origin(origin)) {
1008                 char *name;
1009                 int namelen;
1010                 int error;
1011 
1012                 namelen = dsl_dataset_namelen(origin) + 1;
1013                 name = kmem_alloc(namelen, KM_SLEEP);
1014                 dsl_dataset_name(origin, name);
1015 #ifdef _KERNEL
1016                 error = zfs_unmount_snap(name, NULL);
1017                 if (error) {
1018                         kmem_free(name, namelen);
1019                         return (error);
1020                 }
1021 #endif
1022                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1023                 kmem_free(name, namelen);
1024                 if (error)
1025                         return (error);
1026                 dsda->rm_origin = origin;
1027                 dsl_dataset_make_exclusive(origin, tag);
1028         }
1029 
1030         return (0);
1031 }
1032 
1033 /*
1034  * ds must be opened as OWNER.  On return (whether successful or not),
1035  * ds will be closed and caller can no longer dereference it.
1036  */
1037 int
1038 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1039 {
1040         int err;
1041         dsl_sync_task_group_t *dstg;
1042         objset_t *os;
1043         dsl_dir_t *dd;
1044         uint64_t obj;
1045         struct dsl_ds_destroyarg dsda = { 0 };
1046         dsl_dataset_t dummy_ds = { 0 };
1047 
1048         dsda.ds = ds;
1049 
1050         if (dsl_dataset_is_snapshot(ds)) {
1051                 /* Destroying a snapshot is simpler */
1052                 dsl_dataset_make_exclusive(ds, tag);
1053 
1054                 dsda.defer = defer;
1055                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1056                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1057                     &dsda, tag, 0);
1058                 ASSERT3P(dsda.rm_origin, ==, NULL);
1059                 goto out;
1060         } else if (defer) {
1061                 err = EINVAL;
1062                 goto out;
1063         }
1064 
1065         dd = ds->ds_dir;
1066         dummy_ds.ds_dir = dd;
1067         dummy_ds.ds_object = ds->ds_object;
1068 
1069         /*
1070          * Check for errors and mark this ds as inconsistent, in
1071          * case we crash while freeing the objects.
1072          */
1073         err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1074             dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1075         if (err)
1076                 goto out;
1077 
1078         err = dmu_objset_from_ds(ds, &os);
1079         if (err)
1080                 goto out;
1081 
1082         /*
1083          * If async destruction is not enabled try to remove all objects
1084          * while in the open context so that there is less work to do in
1085          * the syncing context.
1086          */
1087         if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1088             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1089                 for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1090                     ds->ds_phys->ds_prev_snap_txg)) {
1091                         /*
1092                          * Ignore errors, if there is not enough disk space
1093                          * we will deal with it in dsl_dataset_destroy_sync().
1094                          */
1095                         (void) dmu_free_object(os, obj);
1096                 }
1097                 if (err != ESRCH)
1098                         goto out;
1099         }
1100 
1101         /*
1102          * Only the ZIL knows how to free log blocks.
1103          */
1104         zil_destroy(dmu_objset_zil(os), B_FALSE);
1105 
1106         /*
1107          * Sync out all in-flight IO.
1108          */
1109         txg_wait_synced(dd->dd_pool, 0);
1110 
1111         /*
1112          * If we managed to free all the objects in open
1113          * context, the user space accounting should be zero.
1114          */
1115         if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1116             dmu_objset_userused_enabled(os)) {
1117                 uint64_t count;
1118 
1119                 ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1120                     count == 0);
1121                 ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1122                     count == 0);
1123         }
1124 
1125         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1126         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1127         rw_exit(&dd->dd_pool->dp_config_rwlock);
1128 
1129         if (err)
1130                 goto out;
1131 
1132         /*
1133          * Blow away the dsl_dir + head dataset.
1134          */
1135         dsl_dataset_make_exclusive(ds, tag);
1136         /*
1137          * If we're removing a clone, we might also need to remove its
1138          * origin.
1139          */
1140         do {
1141                 dsda.need_prep = B_FALSE;
1142                 if (dsl_dir_is_clone(dd)) {
1143                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1144                         if (err) {
1145                                 dsl_dir_close(dd, FTAG);
1146                                 goto out;
1147                         }
1148                 }
1149 
1150                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1151                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1152                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1153                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1154                     dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1155                 err = dsl_sync_task_group_wait(dstg);
1156                 dsl_sync_task_group_destroy(dstg);
1157 
1158                 /*
1159                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1160                  * on the origin snap, in which case we can get EBUSY if we
1161                  * needed to destroy the origin snap but were not ready to
1162                  * do so.
1163                  */
1164                 if (dsda.need_prep) {
1165                         ASSERT(err == EBUSY);
1166                         ASSERT(dsl_dir_is_clone(dd));
1167                         ASSERT(dsda.rm_origin == NULL);
1168                 }
1169         } while (dsda.need_prep);
1170 
1171         if (dsda.rm_origin != NULL)
1172                 dsl_dataset_disown(dsda.rm_origin, tag);
1173 
1174         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1175         if (err)
1176                 dsl_dir_close(dd, FTAG);
1177 out:
1178         dsl_dataset_disown(ds, tag);
1179         return (err);
1180 }
1181 
1182 blkptr_t *
1183 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1184 {
1185         return (&ds->ds_phys->ds_bp);
1186 }
1187 
1188 void
1189 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1190 {
1191         ASSERT(dmu_tx_is_syncing(tx));
1192         /* If it's the meta-objset, set dp_meta_rootbp */
1193         if (ds == NULL) {
1194                 tx->tx_pool->dp_meta_rootbp = *bp;
1195         } else {
1196                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1197                 ds->ds_phys->ds_bp = *bp;
1198         }
1199 }
1200 
1201 spa_t *
1202 dsl_dataset_get_spa(dsl_dataset_t *ds)
1203 {
1204         return (ds->ds_dir->dd_pool->dp_spa);
1205 }
1206 
1207 void
1208 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1209 {
1210         dsl_pool_t *dp;
1211 
1212         if (ds == NULL) /* this is the meta-objset */
1213                 return;
1214 
1215         ASSERT(ds->ds_objset != NULL);
1216 
1217         if (ds->ds_phys->ds_next_snap_obj != 0)
1218                 panic("dirtying snapshot!");
1219 
1220         dp = ds->ds_dir->dd_pool;
1221 
1222         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1223                 /* up the hold count until we can be written out */
1224                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1225         }
1226 }
1227 
1228 /*
1229  * The unique space in the head dataset can be calculated by subtracting
1230  * the space used in the most recent snapshot, that is still being used
1231  * in this file system, from the space currently in use.  To figure out
1232  * the space in the most recent snapshot still in use, we need to take
1233  * the total space used in the snapshot and subtract out the space that
1234  * has been freed up since the snapshot was taken.
1235  */
1236 static void
1237 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1238 {
1239         uint64_t mrs_used;
1240         uint64_t dlused, dlcomp, dluncomp;
1241 
1242         ASSERT(!dsl_dataset_is_snapshot(ds));
1243 
1244         if (ds->ds_phys->ds_prev_snap_obj != 0)
1245                 mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1246         else
1247                 mrs_used = 0;
1248 
1249         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1250 
1251         ASSERT3U(dlused, <=, mrs_used);
1252         ds->ds_phys->ds_unique_bytes =
1253             ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1254 
1255         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1256             SPA_VERSION_UNIQUE_ACCURATE)
1257                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1258 }
1259 
1260 struct killarg {
1261         dsl_dataset_t *ds;
1262         dmu_tx_t *tx;
1263 };
1264 
1265 /* ARGSUSED */
1266 static int
1267 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1268     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1269 {
1270         struct killarg *ka = arg;
1271         dmu_tx_t *tx = ka->tx;
1272 
1273         if (bp == NULL)
1274                 return (0);
1275 
1276         if (zb->zb_level == ZB_ZIL_LEVEL) {
1277                 ASSERT(zilog != NULL);
1278                 /*
1279                  * It's a block in the intent log.  It has no
1280                  * accounting, so just free it.
1281                  */
1282                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1283         } else {
1284                 ASSERT(zilog == NULL);
1285                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1286                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1287         }
1288 
1289         return (0);
1290 }
1291 
1292 /* ARGSUSED */
1293 static int
1294 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1295 {
1296         dsl_dataset_t *ds = arg1;
1297         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1298         uint64_t count;
1299         int err;
1300 
1301         /*
1302          * Can't delete a head dataset if there are snapshots of it.
1303          * (Except if the only snapshots are from the branch we cloned
1304          * from.)
1305          */
1306         if (ds->ds_prev != NULL &&
1307             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1308                 return (EBUSY);
1309 
1310         /*
1311          * This is really a dsl_dir thing, but check it here so that
1312          * we'll be less likely to leave this dataset inconsistent &
1313          * nearly destroyed.
1314          */
1315         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1316         if (err)
1317                 return (err);
1318         if (count != 0)
1319                 return (EEXIST);
1320 
1321         return (0);
1322 }
1323 
1324 /* ARGSUSED */
1325 static void
1326 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1327 {
1328         dsl_dataset_t *ds = arg1;
1329         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1330 
1331         /* Mark it as inconsistent on-disk, in case we crash */
1332         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1333         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1334 
1335         spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1336             "dataset = %llu", ds->ds_object);
1337 }
1338 
1339 static int
1340 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1341     dmu_tx_t *tx)
1342 {
1343         dsl_dataset_t *ds = dsda->ds;
1344         dsl_dataset_t *ds_prev = ds->ds_prev;
1345 
1346         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1347                 struct dsl_ds_destroyarg ndsda = {0};
1348 
1349                 /*
1350                  * If we're not prepared to remove the origin, don't remove
1351                  * the clone either.
1352                  */
1353                 if (dsda->rm_origin == NULL) {
1354                         dsda->need_prep = B_TRUE;
1355                         return (EBUSY);
1356                 }
1357 
1358                 ndsda.ds = ds_prev;
1359                 ndsda.is_origin_rm = B_TRUE;
1360                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1361         }
1362 
1363         /*
1364          * If we're not going to remove the origin after all,
1365          * undo the open context setup.
1366          */
1367         if (dsda->rm_origin != NULL) {
1368                 dsl_dataset_disown(dsda->rm_origin, tag);
1369                 dsda->rm_origin = NULL;
1370         }
1371 
1372         return (0);
1373 }
1374 
1375 /*
1376  * If you add new checks here, you may need to add
1377  * additional checks to the "temporary" case in
1378  * snapshot_check() in dmu_objset.c.
1379  */
1380 /* ARGSUSED */
1381 int
1382 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1383 {
1384         struct dsl_ds_destroyarg *dsda = arg1;
1385         dsl_dataset_t *ds = dsda->ds;
1386 
1387         /* we have an owner hold, so noone else can destroy us */
1388         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1389 
1390         /*
1391          * Only allow deferred destroy on pools that support it.
1392          * NOTE: deferred destroy is only supported on snapshots.
1393          */
1394         if (dsda->defer) {
1395                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1396                     SPA_VERSION_USERREFS)
1397                         return (ENOTSUP);
1398                 ASSERT(dsl_dataset_is_snapshot(ds));
1399                 return (0);
1400         }
1401 
1402         /*
1403          * Can't delete a head dataset if there are snapshots of it.
1404          * (Except if the only snapshots are from the branch we cloned
1405          * from.)
1406          */
1407         if (ds->ds_prev != NULL &&
1408             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1409                 return (EBUSY);
1410 
1411         /*
1412          * If we made changes this txg, traverse_dsl_dataset won't find
1413          * them.  Try again.
1414          */
1415         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1416                 return (EAGAIN);
1417 
1418         if (dsl_dataset_is_snapshot(ds)) {
1419                 /*
1420                  * If this snapshot has an elevated user reference count,
1421                  * we can't destroy it yet.
1422                  */
1423                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1424                         return (EBUSY);
1425 
1426                 mutex_enter(&ds->ds_lock);
1427                 /*
1428                  * Can't delete a branch point. However, if we're destroying
1429                  * a clone and removing its origin due to it having a user
1430                  * hold count of 0 and having been marked for deferred destroy,
1431                  * it's OK for the origin to have a single clone.
1432                  */
1433                 if (ds->ds_phys->ds_num_children >
1434                     (dsda->is_origin_rm ? 2 : 1)) {
1435                         mutex_exit(&ds->ds_lock);
1436                         return (EEXIST);
1437                 }
1438                 mutex_exit(&ds->ds_lock);
1439         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1440                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1441         }
1442 
1443         /* XXX we should do some i/o error checking... */
1444         return (0);
1445 }
1446 
1447 struct refsarg {
1448         kmutex_t lock;
1449         boolean_t gone;
1450         kcondvar_t cv;
1451 };
1452 
1453 /* ARGSUSED */
1454 static void
1455 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1456 {
1457         struct refsarg *arg = argv;
1458 
1459         mutex_enter(&arg->lock);
1460         arg->gone = TRUE;
1461         cv_signal(&arg->cv);
1462         mutex_exit(&arg->lock);
1463 }
1464 
1465 static void
1466 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1467 {
1468         struct refsarg arg;
1469 
1470         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1471         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1472         arg.gone = FALSE;
1473         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1474             dsl_dataset_refs_gone);
1475         dmu_buf_rele(ds->ds_dbuf, tag);
1476         mutex_enter(&arg.lock);
1477         while (!arg.gone)
1478                 cv_wait(&arg.cv, &arg.lock);
1479         ASSERT(arg.gone);
1480         mutex_exit(&arg.lock);
1481         ds->ds_dbuf = NULL;
1482         ds->ds_phys = NULL;
1483         mutex_destroy(&arg.lock);
1484         cv_destroy(&arg.cv);
1485 }
1486 
1487 static void
1488 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1489 {
1490         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1491         uint64_t count;
1492         int err;
1493 
1494         ASSERT(ds->ds_phys->ds_num_children >= 2);
1495         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1496         /*
1497          * The err should not be ENOENT, but a bug in a previous version
1498          * of the code could cause upgrade_clones_cb() to not set
1499          * ds_next_snap_obj when it should, leading to a missing entry.
1500          * If we knew that the pool was created after
1501          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1502          * ENOENT.  However, at least we can check that we don't have
1503          * too many entries in the next_clones_obj even after failing to
1504          * remove this one.
1505          */
1506         if (err != ENOENT) {
1507                 VERIFY3U(err, ==, 0);
1508         }
1509         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1510             &count));
1511         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1512 }
1513 
1514 static void
1515 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1516 {
1517         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1518         zap_cursor_t zc;
1519         zap_attribute_t za;
1520 
1521         /*
1522          * If it is the old version, dd_clones doesn't exist so we can't
1523          * find the clones, but deadlist_remove_key() is a no-op so it
1524          * doesn't matter.
1525          */
1526         if (ds->ds_dir->dd_phys->dd_clones == 0)
1527                 return;
1528 
1529         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1530             zap_cursor_retrieve(&zc, &za) == 0;
1531             zap_cursor_advance(&zc)) {
1532                 dsl_dataset_t *clone;
1533 
1534                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1535                     za.za_first_integer, FTAG, &clone));
1536                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1537                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1538                             mintxg, tx);
1539                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1540                 }
1541                 dsl_dataset_rele(clone, FTAG);
1542         }
1543         zap_cursor_fini(&zc);
1544 }
1545 
1546 struct process_old_arg {
1547         dsl_dataset_t *ds;
1548         dsl_dataset_t *ds_prev;
1549         boolean_t after_branch_point;
1550         zio_t *pio;
1551         uint64_t used, comp, uncomp;
1552 };
1553 
1554 static int
1555 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1556 {
1557         struct process_old_arg *poa = arg;
1558         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1559 
1560         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1561                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1562                 if (poa->ds_prev && !poa->after_branch_point &&
1563                     bp->blk_birth >
1564                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1565                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1566                             bp_get_dsize_sync(dp->dp_spa, bp);
1567                 }
1568         } else {
1569                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1570                 poa->comp += BP_GET_PSIZE(bp);
1571                 poa->uncomp += BP_GET_UCSIZE(bp);
1572                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1573         }
1574         return (0);
1575 }
1576 
1577 static void
1578 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1579     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1580 {
1581         struct process_old_arg poa = { 0 };
1582         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1583         objset_t *mos = dp->dp_meta_objset;
1584 
1585         ASSERT(ds->ds_deadlist.dl_oldfmt);
1586         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1587 
1588         poa.ds = ds;
1589         poa.ds_prev = ds_prev;
1590         poa.after_branch_point = after_branch_point;
1591         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1592         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1593             process_old_cb, &poa, tx));
1594         VERIFY3U(zio_wait(poa.pio), ==, 0);
1595         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1596 
1597         /* change snapused */
1598         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1599             -poa.used, -poa.comp, -poa.uncomp, tx);
1600 
1601         /* swap next's deadlist to our deadlist */
1602         dsl_deadlist_close(&ds->ds_deadlist);
1603         dsl_deadlist_close(&ds_next->ds_deadlist);
1604         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1605             ds->ds_phys->ds_deadlist_obj);
1606         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1607         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1608             ds_next->ds_phys->ds_deadlist_obj);
1609 }
1610 
1611 static int
1612 old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1613 {
1614         int err;
1615         struct killarg ka;
1616 
1617         /*
1618          * Free everything that we point to (that's born after
1619          * the previous snapshot, if we are a clone)
1620          *
1621          * NB: this should be very quick, because we already
1622          * freed all the objects in open context.
1623          */
1624         ka.ds = ds;
1625         ka.tx = tx;
1626         err = traverse_dataset(ds,
1627             ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1628             kill_blkptr, &ka);
1629         ASSERT3U(err, ==, 0);
1630         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1631 
1632         return (err);
1633 }
1634 
1635 void
1636 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1637 {
1638         struct dsl_ds_destroyarg *dsda = arg1;
1639         dsl_dataset_t *ds = dsda->ds;
1640         int err;
1641         int after_branch_point = FALSE;
1642         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1643         objset_t *mos = dp->dp_meta_objset;
1644         dsl_dataset_t *ds_prev = NULL;
1645         boolean_t wont_destroy;
1646         uint64_t obj;
1647 
1648         wont_destroy = (dsda->defer &&
1649             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1650 
1651         ASSERT(ds->ds_owner || wont_destroy);
1652         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1653         ASSERT(ds->ds_prev == NULL ||
1654             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1655         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1656 
1657         if (wont_destroy) {
1658                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1659                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1660                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1661                 return;
1662         }
1663 
1664         /* signal any waiters that this dataset is going away */
1665         mutex_enter(&ds->ds_lock);
1666         ds->ds_owner = dsl_reaper;
1667         cv_broadcast(&ds->ds_exclusive_cv);
1668         mutex_exit(&ds->ds_lock);
1669 
1670         /* Remove our reservation */
1671         if (ds->ds_reserved != 0) {
1672                 dsl_prop_setarg_t psa;
1673                 uint64_t value = 0;
1674 
1675                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1676                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1677                     &value);
1678                 psa.psa_effective_value = 0;    /* predict default value */
1679 
1680                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1681                 ASSERT3U(ds->ds_reserved, ==, 0);
1682         }
1683 
1684         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1685 
1686         dsl_scan_ds_destroyed(ds, tx);
1687 
1688         obj = ds->ds_object;
1689 
1690         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1691                 if (ds->ds_prev) {
1692                         ds_prev = ds->ds_prev;
1693                 } else {
1694                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1695                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1696                 }
1697                 after_branch_point =
1698                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1699 
1700                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1701                 if (after_branch_point &&
1702                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1703                         remove_from_next_clones(ds_prev, obj, tx);
1704                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1705                                 VERIFY(0 == zap_add_int(mos,
1706                                     ds_prev->ds_phys->ds_next_clones_obj,
1707                                     ds->ds_phys->ds_next_snap_obj, tx));
1708                         }
1709                 }
1710                 if (after_branch_point &&
1711                     ds->ds_phys->ds_next_snap_obj == 0) {
1712                         /* This clone is toast. */
1713                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1714                         ds_prev->ds_phys->ds_num_children--;
1715 
1716                         /*
1717                          * If the clone's origin has no other clones, no
1718                          * user holds, and has been marked for deferred
1719                          * deletion, then we should have done the necessary
1720                          * destroy setup for it.
1721                          */
1722                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1723                             ds_prev->ds_userrefs == 0 &&
1724                             DS_IS_DEFER_DESTROY(ds_prev)) {
1725                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1726                         } else {
1727                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1728                         }
1729                 } else if (!after_branch_point) {
1730                         ds_prev->ds_phys->ds_next_snap_obj =
1731                             ds->ds_phys->ds_next_snap_obj;
1732                 }
1733         }
1734 
1735         if (dsl_dataset_is_snapshot(ds)) {
1736                 dsl_dataset_t *ds_next;
1737                 uint64_t old_unique;
1738                 uint64_t used = 0, comp = 0, uncomp = 0;
1739 
1740                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1741                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1742                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1743 
1744                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1745 
1746                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1747                 ds_next->ds_phys->ds_prev_snap_obj =
1748                     ds->ds_phys->ds_prev_snap_obj;
1749                 ds_next->ds_phys->ds_prev_snap_txg =
1750                     ds->ds_phys->ds_prev_snap_txg;
1751                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1752                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1753 
1754 
1755                 if (ds_next->ds_deadlist.dl_oldfmt) {
1756                         process_old_deadlist(ds, ds_prev, ds_next,
1757                             after_branch_point, tx);
1758                 } else {
1759                         /* Adjust prev's unique space. */
1760                         if (ds_prev && !after_branch_point) {
1761                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1762                                     ds_prev->ds_phys->ds_prev_snap_txg,
1763                                     ds->ds_phys->ds_prev_snap_txg,
1764                                     &used, &comp, &uncomp);
1765                                 ds_prev->ds_phys->ds_unique_bytes += used;
1766                         }
1767 
1768                         /* Adjust snapused. */
1769                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1770                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1771                             &used, &comp, &uncomp);
1772                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1773                             -used, -comp, -uncomp, tx);
1774 
1775                         /* Move blocks to be freed to pool's free list. */
1776                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1777                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1778                             tx);
1779                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1780                             DD_USED_HEAD, used, comp, uncomp, tx);
1781 
1782                         /* Merge our deadlist into next's and free it. */
1783                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1784                             ds->ds_phys->ds_deadlist_obj, tx);
1785                 }
1786                 dsl_deadlist_close(&ds->ds_deadlist);
1787                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1788 
1789                 /* Collapse range in clone heads */
1790                 dsl_dataset_remove_clones_key(ds,
1791                     ds->ds_phys->ds_creation_txg, tx);
1792 
1793                 if (dsl_dataset_is_snapshot(ds_next)) {
1794                         dsl_dataset_t *ds_nextnext;
1795 
1796                         /*
1797                          * Update next's unique to include blocks which
1798                          * were previously shared by only this snapshot
1799                          * and it.  Those blocks will be born after the
1800                          * prev snap and before this snap, and will have
1801                          * died after the next snap and before the one
1802                          * after that (ie. be on the snap after next's
1803                          * deadlist).
1804                          */
1805                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1806                             ds_next->ds_phys->ds_next_snap_obj,
1807                             FTAG, &ds_nextnext));
1808                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1809                             ds->ds_phys->ds_prev_snap_txg,
1810                             ds->ds_phys->ds_creation_txg,
1811                             &used, &comp, &uncomp);
1812                         ds_next->ds_phys->ds_unique_bytes += used;
1813                         dsl_dataset_rele(ds_nextnext, FTAG);
1814                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1815 
1816                         /* Collapse range in this head. */
1817                         dsl_dataset_t *hds;
1818                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1819                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1820                             FTAG, &hds));
1821                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1822                             ds->ds_phys->ds_creation_txg, tx);
1823                         dsl_dataset_rele(hds, FTAG);
1824 
1825                 } else {
1826                         ASSERT3P(ds_next->ds_prev, ==, ds);
1827                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1828                         ds_next->ds_prev = NULL;
1829                         if (ds_prev) {
1830                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1831                                     ds->ds_phys->ds_prev_snap_obj,
1832                                     ds_next, &ds_next->ds_prev));
1833                         }
1834 
1835                         dsl_dataset_recalc_head_uniq(ds_next);
1836 
1837                         /*
1838                          * Reduce the amount of our unconsmed refreservation
1839                          * being charged to our parent by the amount of
1840                          * new unique data we have gained.
1841                          */
1842                         if (old_unique < ds_next->ds_reserved) {
1843                                 int64_t mrsdelta;
1844                                 uint64_t new_unique =
1845                                     ds_next->ds_phys->ds_unique_bytes;
1846 
1847                                 ASSERT(old_unique <= new_unique);
1848                                 mrsdelta = MIN(new_unique - old_unique,
1849                                     ds_next->ds_reserved - old_unique);
1850                                 dsl_dir_diduse_space(ds->ds_dir,
1851                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1852                         }
1853                 }
1854                 dsl_dataset_rele(ds_next, FTAG);
1855         } else {
1856                 zfeature_info_t *async_destroy =
1857                     &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1858 
1859                 /*
1860                  * There's no next snapshot, so this is a head dataset.
1861                  * Destroy the deadlist.  Unless it's a clone, the
1862                  * deadlist should be empty.  (If it's a clone, it's
1863                  * safe to ignore the deadlist contents.)
1864                  */
1865                 dsl_deadlist_close(&ds->ds_deadlist);
1866                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1867                 ds->ds_phys->ds_deadlist_obj = 0;
1868 
1869                 if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1870                         err = old_synchronous_dataset_destroy(ds, tx);
1871                 } else {
1872                         /*
1873                          * Move the bptree into the pool's list of trees to
1874                          * clean up and update space accounting information.
1875                          */
1876                         uint64_t used, comp, uncomp;
1877 
1878                         ASSERT(err == 0 || err == EBUSY);
1879                         if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1880                                 spa_feature_incr(dp->dp_spa, async_destroy, tx);
1881                                 dp->dp_bptree_obj = bptree_alloc(
1882                                     dp->dp_meta_objset, tx);
1883                                 VERIFY(zap_add(dp->dp_meta_objset,
1884                                     DMU_POOL_DIRECTORY_OBJECT,
1885                                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1886                                     &dp->dp_bptree_obj, tx) == 0);
1887                         }
1888 
1889                         used = ds->ds_dir->dd_phys->dd_used_bytes;
1890                         comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1891                         uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1892 
1893                         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1894                             ds->ds_phys->ds_unique_bytes == used);
1895 
1896                         bptree_add(dp->dp_meta_objset, dp->dp_bptree_obj,
1897                             &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1898                             used, comp, uncomp, tx);
1899                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1900                             -used, -comp, -uncomp, tx);
1901                         dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1902                             used, comp, uncomp, tx);
1903                 }
1904 
1905                 if (ds->ds_prev != NULL) {
1906                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1907                                 VERIFY3U(0, ==, zap_remove_int(mos,
1908                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1909                                     ds->ds_object, tx));
1910                         }
1911                         dsl_dataset_rele(ds->ds_prev, ds);
1912                         ds->ds_prev = ds_prev = NULL;
1913                 }
1914         }
1915 
1916         /*
1917          * This must be done after the dsl_traverse(), because it will
1918          * re-open the objset.
1919          */
1920         if (ds->ds_objset) {
1921                 dmu_objset_evict(ds->ds_objset);
1922                 ds->ds_objset = NULL;
1923         }
1924 
1925         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1926                 /* Erase the link in the dir */
1927                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1928                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1929                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1930                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1931                 ASSERT(err == 0);
1932         } else {
1933                 /* remove from snapshot namespace */
1934                 dsl_dataset_t *ds_head;
1935                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1936                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1937                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1938                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1939 #ifdef ZFS_DEBUG
1940                 {
1941                         uint64_t val;
1942 
1943                         err = dsl_dataset_snap_lookup(ds_head,
1944                             ds->ds_snapname, &val);
1945                         ASSERT3U(err, ==, 0);
1946                         ASSERT3U(val, ==, obj);
1947                 }
1948 #endif
1949                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1950                 ASSERT(err == 0);
1951                 dsl_dataset_rele(ds_head, FTAG);
1952         }
1953 
1954         if (ds_prev && ds->ds_prev != ds_prev)
1955                 dsl_dataset_rele(ds_prev, FTAG);
1956 
1957         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1958         spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
1959             "dataset = %llu", ds->ds_object);
1960 
1961         if (ds->ds_phys->ds_next_clones_obj != 0) {
1962                 uint64_t count;
1963                 ASSERT(0 == zap_count(mos,
1964                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1965                 VERIFY(0 == dmu_object_free(mos,
1966                     ds->ds_phys->ds_next_clones_obj, tx));
1967         }
1968         if (ds->ds_phys->ds_props_obj != 0)
1969                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1970         if (ds->ds_phys->ds_userrefs_obj != 0)
1971                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1972         dsl_dir_close(ds->ds_dir, ds);
1973         ds->ds_dir = NULL;
1974         dsl_dataset_drain_refs(ds, tag);
1975         VERIFY(0 == dmu_object_free(mos, obj, tx));
1976 
1977         if (dsda->rm_origin) {
1978                 /*
1979                  * Remove the origin of the clone we just destroyed.
1980                  */
1981                 struct dsl_ds_destroyarg ndsda = {0};
1982 
1983                 ndsda.ds = dsda->rm_origin;
1984                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
1985         }
1986 }
1987 
1988 static int
1989 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1990 {
1991         uint64_t asize;
1992 
1993         if (!dmu_tx_is_syncing(tx))
1994                 return (0);
1995 
1996         /*
1997          * If there's an fs-only reservation, any blocks that might become
1998          * owned by the snapshot dataset must be accommodated by space
1999          * outside of the reservation.
2000          */
2001         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2002         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2003         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2004                 return (ENOSPC);
2005 
2006         /*
2007          * Propogate any reserved space for this snapshot to other
2008          * snapshot checks in this sync group.
2009          */
2010         if (asize > 0)
2011                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2012 
2013         return (0);
2014 }
2015 
2016 int
2017 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
2018 {
2019         dsl_dataset_t *ds = arg1;
2020         const char *snapname = arg2;
2021         int err;
2022         uint64_t value;
2023 
2024         /*
2025          * We don't allow multiple snapshots of the same txg.  If there
2026          * is already one, try again.
2027          */
2028         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2029                 return (EAGAIN);
2030 
2031         /*
2032          * Check for conflicting name snapshot name.
2033          */
2034         err = dsl_dataset_snap_lookup(ds, snapname, &value);
2035         if (err == 0)
2036                 return (EEXIST);
2037         if (err != ENOENT)
2038                 return (err);
2039 
2040         /*
2041          * Check that the dataset's name is not too long.  Name consists
2042          * of the dataset's length + 1 for the @-sign + snapshot name's length
2043          */
2044         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2045                 return (ENAMETOOLONG);
2046 
2047         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2048         if (err)
2049                 return (err);
2050 
2051         ds->ds_trysnap_txg = tx->tx_txg;
2052         return (0);
2053 }
2054 
2055 void
2056 dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2057 {
2058         dsl_dataset_t *ds = arg1;
2059         const char *snapname = arg2;
2060         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2061         dmu_buf_t *dbuf;
2062         dsl_dataset_phys_t *dsphys;
2063         uint64_t dsobj, crtxg;
2064         objset_t *mos = dp->dp_meta_objset;
2065         int err;
2066 
2067         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2068 
2069         /*
2070          * The origin's ds_creation_txg has to be < TXG_INITIAL
2071          */
2072         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2073                 crtxg = 1;
2074         else
2075                 crtxg = tx->tx_txg;
2076 
2077         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2078             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2079         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2080         dmu_buf_will_dirty(dbuf, tx);
2081         dsphys = dbuf->db_data;
2082         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2083         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2084         dsphys->ds_fsid_guid = unique_create();
2085         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2086             sizeof (dsphys->ds_guid));
2087         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2088         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2089         dsphys->ds_next_snap_obj = ds->ds_object;
2090         dsphys->ds_num_children = 1;
2091         dsphys->ds_creation_time = gethrestime_sec();
2092         dsphys->ds_creation_txg = crtxg;
2093         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2094         dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2095         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2096         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2097         dsphys->ds_flags = ds->ds_phys->ds_flags;
2098         dsphys->ds_bp = ds->ds_phys->ds_bp;
2099         dmu_buf_rele(dbuf, FTAG);
2100 
2101         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2102         if (ds->ds_prev) {
2103                 uint64_t next_clones_obj =
2104                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2105                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2106                     ds->ds_object ||
2107                     ds->ds_prev->ds_phys->ds_num_children > 1);
2108                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2109                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2110                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2111                             ds->ds_prev->ds_phys->ds_creation_txg);
2112                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2113                 } else if (next_clones_obj != 0) {
2114                         remove_from_next_clones(ds->ds_prev,
2115                             dsphys->ds_next_snap_obj, tx);
2116                         VERIFY3U(0, ==, zap_add_int(mos,
2117                             next_clones_obj, dsobj, tx));
2118                 }
2119         }
2120 
2121         /*
2122          * If we have a reference-reservation on this dataset, we will
2123          * need to increase the amount of refreservation being charged
2124          * since our unique space is going to zero.
2125          */
2126         if (ds->ds_reserved) {
2127                 int64_t delta;
2128                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2129                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2130                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2131                     delta, 0, 0, tx);
2132         }
2133 
2134         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2135         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2136             ds->ds_dir->dd_myname, snapname, dsobj,
2137             ds->ds_phys->ds_prev_snap_txg);
2138         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2139             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2140         dsl_deadlist_close(&ds->ds_deadlist);
2141         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2142         dsl_deadlist_add_key(&ds->ds_deadlist,
2143             ds->ds_phys->ds_prev_snap_txg, tx);
2144 
2145         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2146         ds->ds_phys->ds_prev_snap_obj = dsobj;
2147         ds->ds_phys->ds_prev_snap_txg = crtxg;
2148         ds->ds_phys->ds_unique_bytes = 0;
2149         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2150                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2151 
2152         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2153             snapname, 8, 1, &dsobj, tx);
2154         ASSERT(err == 0);
2155 
2156         if (ds->ds_prev)
2157                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2158         VERIFY(0 == dsl_dataset_get_ref(dp,
2159             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2160 
2161         dsl_scan_ds_snapshotted(ds, tx);
2162 
2163         dsl_dir_snap_cmtime_update(ds->ds_dir);
2164 
2165         spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2166             "dataset = %llu", dsobj);
2167 }
2168 
2169 void
2170 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2171 {
2172         ASSERT(dmu_tx_is_syncing(tx));
2173         ASSERT(ds->ds_objset != NULL);
2174         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2175 
2176         /*
2177          * in case we had to change ds_fsid_guid when we opened it,
2178          * sync it out now.
2179          */
2180         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2181         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2182 
2183         dsl_dir_dirty(ds->ds_dir, tx);
2184         dmu_objset_sync(ds->ds_objset, zio, tx);
2185 }
2186 
2187 static void
2188 get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2189 {
2190         uint64_t count = 0;
2191         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2192         zap_cursor_t zc;
2193         zap_attribute_t za;
2194         nvlist_t *propval;
2195         nvlist_t *val;
2196 
2197         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2198         VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2199         VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2200 
2201         /*
2202          * There may me missing entries in ds_next_clones_obj
2203          * due to a bug in a previous version of the code.
2204          * Only trust it if it has the right number of entries.
2205          */
2206         if (ds->ds_phys->ds_next_clones_obj != 0) {
2207                 ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2208                     &count));
2209         }
2210         if (count != ds->ds_phys->ds_num_children - 1) {
2211                 goto fail;
2212         }
2213         for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2214             zap_cursor_retrieve(&zc, &za) == 0;
2215             zap_cursor_advance(&zc)) {
2216                 dsl_dataset_t *clone;
2217                 char buf[ZFS_MAXNAMELEN];
2218                 /*
2219                  * Even though we hold the dp_config_rwlock, the dataset
2220                  * may fail to open, returning ENOENT.  If there is a
2221                  * thread concurrently attempting to destroy this
2222                  * dataset, it will have the ds_rwlock held for
2223                  * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2224                  * dsl_dataset_hold_ref() will fail its
2225                  * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2226                  * dp_config_rwlock, and wait for the destroy progress
2227                  * and signal ds_exclusive_cv.  If the destroy was
2228                  * successful, we will see that
2229                  * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2230                  */
2231                 if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2232                     za.za_first_integer, FTAG, &clone) != 0)
2233                         continue;
2234                 dsl_dir_name(clone->ds_dir, buf);
2235                 VERIFY(nvlist_add_boolean(val, buf) == 0);
2236                 dsl_dataset_rele(clone, FTAG);
2237         }
2238         zap_cursor_fini(&zc);
2239         VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2240         VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2241             propval) == 0);
2242 fail:
2243         nvlist_free(val);
2244         nvlist_free(propval);
2245         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2246 }
2247 
2248 void
2249 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2250 {
2251         uint64_t refd, avail, uobjs, aobjs, ratio;
2252 
2253         dsl_dir_stats(ds->ds_dir, nv);
2254 
2255         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2256         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2257         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2258 
2259         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2260             ds->ds_phys->ds_creation_time);
2261         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2262             ds->ds_phys->ds_creation_txg);
2263         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2264             ds->ds_quota);
2265         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2266             ds->ds_reserved);
2267         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2268             ds->ds_phys->ds_guid);
2269         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2270             ds->ds_phys->ds_unique_bytes);
2271         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2272             ds->ds_object);
2273         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2274             ds->ds_userrefs);
2275         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2276             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2277 
2278         if (ds->ds_phys->ds_prev_snap_obj != 0) {
2279                 uint64_t written, comp, uncomp;
2280                 dsl_pool_t *dp = ds->ds_dir->dd_pool;
2281                 dsl_dataset_t *prev;
2282 
2283                 rw_enter(&dp->dp_config_rwlock, RW_READER);
2284                 int err = dsl_dataset_hold_obj(dp,
2285                     ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2286                 rw_exit(&dp->dp_config_rwlock);
2287                 if (err == 0) {
2288                         err = dsl_dataset_space_written(prev, ds, &written,
2289                             &comp, &uncomp);
2290                         dsl_dataset_rele(prev, FTAG);
2291                         if (err == 0) {
2292                                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2293                                     written);
2294                         }
2295                 }
2296         }
2297 
2298         ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2299             (ds->ds_phys->ds_uncompressed_bytes * 100 /
2300             ds->ds_phys->ds_compressed_bytes);
2301         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2302 
2303         if (ds->ds_phys->ds_next_snap_obj) {
2304                 /*
2305                  * This is a snapshot; override the dd's space used with
2306                  * our unique space and compression ratio.
2307                  */
2308                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2309                     ds->ds_phys->ds_unique_bytes);
2310                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2311 
2312                 get_clones_stat(ds, nv);
2313         }
2314 }
2315 
2316 void
2317 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2318 {
2319         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2320         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2321         stat->dds_guid = ds->ds_phys->ds_guid;
2322         if (ds->ds_phys->ds_next_snap_obj) {
2323                 stat->dds_is_snapshot = B_TRUE;
2324                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2325         } else {
2326                 stat->dds_is_snapshot = B_FALSE;
2327                 stat->dds_num_clones = 0;
2328         }
2329 
2330         /* clone origin is really a dsl_dir thing... */
2331         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2332         if (dsl_dir_is_clone(ds->ds_dir)) {
2333                 dsl_dataset_t *ods;
2334 
2335                 VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2336                     ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2337                 dsl_dataset_name(ods, stat->dds_origin);
2338                 dsl_dataset_drop_ref(ods, FTAG);
2339         } else {
2340                 stat->dds_origin[0] = '\0';
2341         }
2342         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2343 }
2344 
2345 uint64_t
2346 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2347 {
2348         return (ds->ds_fsid_guid);
2349 }
2350 
2351 void
2352 dsl_dataset_space(dsl_dataset_t *ds,
2353     uint64_t *refdbytesp, uint64_t *availbytesp,
2354     uint64_t *usedobjsp, uint64_t *availobjsp)
2355 {
2356         *refdbytesp = ds->ds_phys->ds_referenced_bytes;
2357         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2358         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2359                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2360         if (ds->ds_quota != 0) {
2361                 /*
2362                  * Adjust available bytes according to refquota
2363                  */
2364                 if (*refdbytesp < ds->ds_quota)
2365                         *availbytesp = MIN(*availbytesp,
2366                             ds->ds_quota - *refdbytesp);
2367                 else
2368                         *availbytesp = 0;
2369         }
2370         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2371         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2372 }
2373 
2374 boolean_t
2375 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2376 {
2377         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2378 
2379         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2380             dsl_pool_sync_context(dp));
2381         if (ds->ds_prev == NULL)
2382                 return (B_FALSE);
2383         if (ds->ds_phys->ds_bp.blk_birth >
2384             ds->ds_prev->ds_phys->ds_creation_txg) {
2385                 objset_t *os, *os_prev;
2386                 /*
2387                  * It may be that only the ZIL differs, because it was
2388                  * reset in the head.  Don't count that as being
2389                  * modified.
2390                  */
2391                 if (dmu_objset_from_ds(ds, &os) != 0)
2392                         return (B_TRUE);
2393                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2394                         return (B_TRUE);
2395                 return (bcmp(&os->os_phys->os_meta_dnode,
2396                     &os_prev->os_phys->os_meta_dnode,
2397                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2398         }
2399         return (B_FALSE);
2400 }
2401 
2402 /* ARGSUSED */
2403 static int
2404 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2405 {
2406         dsl_dataset_t *ds = arg1;
2407         char *newsnapname = arg2;
2408         dsl_dir_t *dd = ds->ds_dir;
2409         dsl_dataset_t *hds;
2410         uint64_t val;
2411         int err;
2412 
2413         err = dsl_dataset_hold_obj(dd->dd_pool,
2414             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2415         if (err)
2416                 return (err);
2417 
2418         /* new name better not be in use */
2419         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2420         dsl_dataset_rele(hds, FTAG);
2421 
2422         if (err == 0)
2423                 err = EEXIST;
2424         else if (err == ENOENT)
2425                 err = 0;
2426 
2427         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2428         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2429                 err = ENAMETOOLONG;
2430 
2431         return (err);
2432 }
2433 
2434 static void
2435 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2436 {
2437         dsl_dataset_t *ds = arg1;
2438         const char *newsnapname = arg2;
2439         dsl_dir_t *dd = ds->ds_dir;
2440         objset_t *mos = dd->dd_pool->dp_meta_objset;
2441         dsl_dataset_t *hds;
2442         int err;
2443 
2444         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2445 
2446         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2447             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2448 
2449         VERIFY(0 == dsl_dataset_get_snapname(ds));
2450         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2451         ASSERT3U(err, ==, 0);
2452         mutex_enter(&ds->ds_lock);
2453         (void) strcpy(ds->ds_snapname, newsnapname);
2454         mutex_exit(&ds->ds_lock);
2455         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2456             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2457         ASSERT3U(err, ==, 0);
2458 
2459         spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2460             "dataset = %llu", ds->ds_object);
2461         dsl_dataset_rele(hds, FTAG);
2462 }
2463 
2464 struct renamesnaparg {
2465         dsl_sync_task_group_t *dstg;
2466         char failed[MAXPATHLEN];
2467         char *oldsnap;
2468         char *newsnap;
2469 };
2470 
2471 static int
2472 dsl_snapshot_rename_one(const char *name, void *arg)
2473 {
2474         struct renamesnaparg *ra = arg;
2475         dsl_dataset_t *ds = NULL;
2476         char *snapname;
2477         int err;
2478 
2479         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2480         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2481 
2482         /*
2483          * For recursive snapshot renames the parent won't be changing
2484          * so we just pass name for both the to/from argument.
2485          */
2486         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2487         if (err != 0) {
2488                 strfree(snapname);
2489                 return (err == ENOENT ? 0 : err);
2490         }
2491 
2492 #ifdef _KERNEL
2493         /*
2494          * For all filesystems undergoing rename, we'll need to unmount it.
2495          */
2496         (void) zfs_unmount_snap(snapname, NULL);
2497 #endif
2498         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2499         strfree(snapname);
2500         if (err != 0)
2501                 return (err == ENOENT ? 0 : err);
2502 
2503         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2504             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2505 
2506         return (0);
2507 }
2508 
2509 static int
2510 dsl_recursive_rename(char *oldname, const char *newname)
2511 {
2512         int err;
2513         struct renamesnaparg *ra;
2514         dsl_sync_task_t *dst;
2515         spa_t *spa;
2516         char *cp, *fsname = spa_strdup(oldname);
2517         int len = strlen(oldname) + 1;
2518 
2519         /* truncate the snapshot name to get the fsname */
2520         cp = strchr(fsname, '@');
2521         *cp = '\0';
2522 
2523         err = spa_open(fsname, &spa, FTAG);
2524         if (err) {
2525                 kmem_free(fsname, len);
2526                 return (err);
2527         }
2528         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2529         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2530 
2531         ra->oldsnap = strchr(oldname, '@') + 1;
2532         ra->newsnap = strchr(newname, '@') + 1;
2533         *ra->failed = '\0';
2534 
2535         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2536             DS_FIND_CHILDREN);
2537         kmem_free(fsname, len);
2538 
2539         if (err == 0) {
2540                 err = dsl_sync_task_group_wait(ra->dstg);
2541         }
2542 
2543         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2544             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2545                 dsl_dataset_t *ds = dst->dst_arg1;
2546                 if (dst->dst_err) {
2547                         dsl_dir_name(ds->ds_dir, ra->failed);
2548                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2549                         (void) strlcat(ra->failed, ra->newsnap,
2550                             sizeof (ra->failed));
2551                 }
2552                 dsl_dataset_rele(ds, ra->dstg);
2553         }
2554 
2555         if (err)
2556                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2557 
2558         dsl_sync_task_group_destroy(ra->dstg);
2559         kmem_free(ra, sizeof (struct renamesnaparg));
2560         spa_close(spa, FTAG);
2561         return (err);
2562 }
2563 
2564 static int
2565 dsl_valid_rename(const char *oldname, void *arg)
2566 {
2567         int delta = *(int *)arg;
2568 
2569         if (strlen(oldname) + delta >= MAXNAMELEN)
2570                 return (ENAMETOOLONG);
2571 
2572         return (0);
2573 }
2574 
2575 #pragma weak dmu_objset_rename = dsl_dataset_rename
2576 int
2577 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2578 {
2579         dsl_dir_t *dd;
2580         dsl_dataset_t *ds;
2581         const char *tail;
2582         int err;
2583 
2584         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2585         if (err)
2586                 return (err);
2587 
2588         if (tail == NULL) {
2589                 int delta = strlen(newname) - strlen(oldname);
2590 
2591                 /* if we're growing, validate child name lengths */
2592                 if (delta > 0)
2593                         err = dmu_objset_find(oldname, dsl_valid_rename,
2594                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2595 
2596                 if (err == 0)
2597                         err = dsl_dir_rename(dd, newname);
2598                 dsl_dir_close(dd, FTAG);
2599                 return (err);
2600         }
2601 
2602         if (tail[0] != '@') {
2603                 /* the name ended in a nonexistent component */
2604                 dsl_dir_close(dd, FTAG);
2605                 return (ENOENT);
2606         }
2607 
2608         dsl_dir_close(dd, FTAG);
2609 
2610         /* new name must be snapshot in same filesystem */
2611         tail = strchr(newname, '@');
2612         if (tail == NULL)
2613                 return (EINVAL);
2614         tail++;
2615         if (strncmp(oldname, newname, tail - newname) != 0)
2616                 return (EXDEV);
2617 
2618         if (recursive) {
2619                 err = dsl_recursive_rename(oldname, newname);
2620         } else {
2621                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2622                 if (err)
2623                         return (err);
2624 
2625                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2626                     dsl_dataset_snapshot_rename_check,
2627                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2628 
2629                 dsl_dataset_rele(ds, FTAG);
2630         }
2631 
2632         return (err);
2633 }
2634 
2635 struct promotenode {
2636         list_node_t link;
2637         dsl_dataset_t *ds;
2638 };
2639 
2640 struct promotearg {
2641         list_t shared_snaps, origin_snaps, clone_snaps;
2642         dsl_dataset_t *origin_origin;
2643         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2644         char *err_ds;
2645 };
2646 
2647 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2648 static boolean_t snaplist_unstable(list_t *l);
2649 
2650 static int
2651 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2652 {
2653         dsl_dataset_t *hds = arg1;
2654         struct promotearg *pa = arg2;
2655         struct promotenode *snap = list_head(&pa->shared_snaps);
2656         dsl_dataset_t *origin_ds = snap->ds;
2657         int err;
2658         uint64_t unused;
2659 
2660         /* Check that it is a real clone */
2661         if (!dsl_dir_is_clone(hds->ds_dir))
2662                 return (EINVAL);
2663 
2664         /* Since this is so expensive, don't do the preliminary check */
2665         if (!dmu_tx_is_syncing(tx))
2666                 return (0);
2667 
2668         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2669                 return (EXDEV);
2670 
2671         /* compute origin's new unique space */
2672         snap = list_tail(&pa->clone_snaps);
2673         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2674         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2675             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2676             &pa->unique, &unused, &unused);
2677 
2678         /*
2679          * Walk the snapshots that we are moving
2680          *
2681          * Compute space to transfer.  Consider the incremental changes
2682          * to used for each snapshot:
2683          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2684          * So each snapshot gave birth to:
2685          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2686          * So a sequence would look like:
2687          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2688          * Which simplifies to:
2689          * uN + kN + kN-1 + ... + k1 + k0
2690          * Note however, if we stop before we reach the ORIGIN we get:
2691          * uN + kN + kN-1 + ... + kM - uM-1
2692          */
2693         pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2694         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2695         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2696         for (snap = list_head(&pa->shared_snaps); snap;
2697             snap = list_next(&pa->shared_snaps, snap)) {
2698                 uint64_t val, dlused, dlcomp, dluncomp;
2699                 dsl_dataset_t *ds = snap->ds;
2700 
2701                 /* Check that the snapshot name does not conflict */
2702                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2703                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2704                 if (err == 0) {
2705                         err = EEXIST;
2706                         goto out;
2707                 }
2708                 if (err != ENOENT)
2709                         goto out;
2710 
2711                 /* The very first snapshot does not have a deadlist */
2712                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2713                         continue;
2714 
2715                 dsl_deadlist_space(&ds->ds_deadlist,
2716                     &dlused, &dlcomp, &dluncomp);
2717                 pa->used += dlused;
2718                 pa->comp += dlcomp;
2719                 pa->uncomp += dluncomp;
2720         }
2721 
2722         /*
2723          * If we are a clone of a clone then we never reached ORIGIN,
2724          * so we need to subtract out the clone origin's used space.
2725          */
2726         if (pa->origin_origin) {
2727                 pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2728                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2729                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2730         }
2731 
2732         /* Check that there is enough space here */
2733         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2734             pa->used);
2735         if (err)
2736                 return (err);
2737 
2738         /*
2739          * Compute the amounts of space that will be used by snapshots
2740          * after the promotion (for both origin and clone).  For each,
2741          * it is the amount of space that will be on all of their
2742          * deadlists (that was not born before their new origin).
2743          */
2744         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2745                 uint64_t space;
2746 
2747                 /*
2748                  * Note, typically this will not be a clone of a clone,
2749                  * so dd_origin_txg will be < TXG_INITIAL, so
2750                  * these snaplist_space() -> dsl_deadlist_space_range()
2751                  * calls will be fast because they do not have to
2752                  * iterate over all bps.
2753                  */
2754                 snap = list_head(&pa->origin_snaps);
2755                 err = snaplist_space(&pa->shared_snaps,
2756                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2757                 if (err)
2758                         return (err);
2759 
2760                 err = snaplist_space(&pa->clone_snaps,
2761                     snap->ds->ds_dir->dd_origin_txg, &space);
2762                 if (err)
2763                         return (err);
2764                 pa->cloneusedsnap += space;
2765         }
2766         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2767                 err = snaplist_space(&pa->origin_snaps,
2768                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2769                 if (err)
2770                         return (err);
2771         }
2772 
2773         return (0);
2774 out:
2775         pa->err_ds =  snap->ds->ds_snapname;
2776         return (err);
2777 }
2778 
2779 static void
2780 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2781 {
2782         dsl_dataset_t *hds = arg1;
2783         struct promotearg *pa = arg2;
2784         struct promotenode *snap = list_head(&pa->shared_snaps);
2785         dsl_dataset_t *origin_ds = snap->ds;
2786         dsl_dataset_t *origin_head;
2787         dsl_dir_t *dd = hds->ds_dir;
2788         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2789         dsl_dir_t *odd = NULL;
2790         uint64_t oldnext_obj;
2791         int64_t delta;
2792 
2793         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2794 
2795         snap = list_head(&pa->origin_snaps);
2796         origin_head = snap->ds;
2797 
2798         /*
2799          * We need to explicitly open odd, since origin_ds's dd will be
2800          * changing.
2801          */
2802         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2803             NULL, FTAG, &odd));
2804 
2805         /* change origin's next snap */
2806         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2807         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2808         snap = list_tail(&pa->clone_snaps);
2809         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2810         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2811 
2812         /* change the origin's next clone */
2813         if (origin_ds->ds_phys->ds_next_clones_obj) {
2814                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2815                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2816                     origin_ds->ds_phys->ds_next_clones_obj,
2817                     oldnext_obj, tx));
2818         }
2819 
2820         /* change origin */
2821         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2822         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2823         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2824         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2825         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2826         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2827         origin_head->ds_dir->dd_origin_txg =
2828             origin_ds->ds_phys->ds_creation_txg;
2829 
2830         /* change dd_clone entries */
2831         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2832                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2833                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2834                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2835                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2836                     hds->ds_object, tx));
2837 
2838                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2839                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2840                     origin_head->ds_object, tx));
2841                 if (dd->dd_phys->dd_clones == 0) {
2842                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2843                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2844                 }
2845                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2846                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2847 
2848         }
2849 
2850         /* move snapshots to this dir */
2851         for (snap = list_head(&pa->shared_snaps); snap;
2852             snap = list_next(&pa->shared_snaps, snap)) {
2853                 dsl_dataset_t *ds = snap->ds;
2854 
2855                 /* unregister props as dsl_dir is changing */
2856                 if (ds->ds_objset) {
2857                         dmu_objset_evict(ds->ds_objset);
2858                         ds->ds_objset = NULL;
2859                 }
2860                 /* move snap name entry */
2861                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2862                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2863                     ds->ds_snapname, tx));
2864                 VERIFY(0 == zap_add(dp->dp_meta_objset,
2865                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2866                     8, 1, &ds->ds_object, tx));
2867 
2868                 /* change containing dsl_dir */
2869                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2870                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2871                 ds->ds_phys->ds_dir_obj = dd->dd_object;
2872                 ASSERT3P(ds->ds_dir, ==, odd);
2873                 dsl_dir_close(ds->ds_dir, ds);
2874                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2875                     NULL, ds, &ds->ds_dir));
2876 
2877                 /* move any clone references */
2878                 if (ds->ds_phys->ds_next_clones_obj &&
2879                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2880                         zap_cursor_t zc;
2881                         zap_attribute_t za;
2882 
2883                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
2884                             ds->ds_phys->ds_next_clones_obj);
2885                             zap_cursor_retrieve(&zc, &za) == 0;
2886                             zap_cursor_advance(&zc)) {
2887                                 dsl_dataset_t *cnds;
2888                                 uint64_t o;
2889 
2890                                 if (za.za_first_integer == oldnext_obj) {
2891                                         /*
2892                                          * We've already moved the
2893                                          * origin's reference.
2894                                          */
2895                                         continue;
2896                                 }
2897 
2898                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2899                                     za.za_first_integer, FTAG, &cnds));
2900                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2901 
2902                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2903                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
2904                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
2905                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
2906                                 dsl_dataset_rele(cnds, FTAG);
2907                         }
2908                         zap_cursor_fini(&zc);
2909                 }
2910 
2911                 ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2912         }
2913 
2914         /*
2915          * Change space accounting.
2916          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2917          * both be valid, or both be 0 (resulting in delta == 0).  This
2918          * is true for each of {clone,origin} independently.
2919          */
2920 
2921         delta = pa->cloneusedsnap -
2922             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2923         ASSERT3S(delta, >=, 0);
2924         ASSERT3U(pa->used, >=, delta);
2925         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2926         dsl_dir_diduse_space(dd, DD_USED_HEAD,
2927             pa->used - delta, pa->comp, pa->uncomp, tx);
2928 
2929         delta = pa->originusedsnap -
2930             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2931         ASSERT3S(delta, <=, 0);
2932         ASSERT3U(pa->used, >=, -delta);
2933         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2934         dsl_dir_diduse_space(odd, DD_USED_HEAD,
2935             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2936 
2937         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2938 
2939         /* log history record */
2940         spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2941             "dataset = %llu", hds->ds_object);
2942 
2943         dsl_dir_close(odd, FTAG);
2944 }
2945 
2946 static char *snaplist_tag = "snaplist";
2947 /*
2948  * Make a list of dsl_dataset_t's for the snapshots between first_obj
2949  * (exclusive) and last_obj (inclusive).  The list will be in reverse
2950  * order (last_obj will be the list_head()).  If first_obj == 0, do all
2951  * snapshots back to this dataset's origin.
2952  */
2953 static int
2954 snaplist_make(dsl_pool_t *dp, boolean_t own,
2955     uint64_t first_obj, uint64_t last_obj, list_t *l)
2956 {
2957         uint64_t obj = last_obj;
2958 
2959         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2960 
2961         list_create(l, sizeof (struct promotenode),
2962             offsetof(struct promotenode, link));
2963 
2964         while (obj != first_obj) {
2965                 dsl_dataset_t *ds;
2966                 struct promotenode *snap;
2967                 int err;
2968 
2969                 if (own) {
2970                         err = dsl_dataset_own_obj(dp, obj,
2971                             0, snaplist_tag, &ds);
2972                         if (err == 0)
2973                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
2974                 } else {
2975                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2976                 }
2977                 if (err == ENOENT) {
2978                         /* lost race with snapshot destroy */
2979                         struct promotenode *last = list_tail(l);
2980                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
2981                         obj = last->ds->ds_phys->ds_prev_snap_obj;
2982                         continue;
2983                 } else if (err) {
2984                         return (err);
2985                 }
2986 
2987                 if (first_obj == 0)
2988                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
2989 
2990                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
2991                 snap->ds = ds;
2992                 list_insert_tail(l, snap);
2993                 obj = ds->ds_phys->ds_prev_snap_obj;
2994         }
2995 
2996         return (0);
2997 }
2998 
2999 static int
3000 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3001 {
3002         struct promotenode *snap;
3003 
3004         *spacep = 0;
3005         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3006                 uint64_t used, comp, uncomp;
3007                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3008                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
3009                 *spacep += used;
3010         }
3011         return (0);
3012 }
3013 
3014 static void
3015 snaplist_destroy(list_t *l, boolean_t own)
3016 {
3017         struct promotenode *snap;
3018 
3019         if (!l || !list_link_active(&l->list_head))
3020                 return;
3021 
3022         while ((snap = list_tail(l)) != NULL) {
3023                 list_remove(l, snap);
3024                 if (own)
3025                         dsl_dataset_disown(snap->ds, snaplist_tag);
3026                 else
3027                         dsl_dataset_rele(snap->ds, snaplist_tag);
3028                 kmem_free(snap, sizeof (struct promotenode));
3029         }
3030         list_destroy(l);
3031 }
3032 
3033 /*
3034  * Promote a clone.  Nomenclature note:
3035  * "clone" or "cds": the original clone which is being promoted
3036  * "origin" or "ods": the snapshot which is originally clone's origin
3037  * "origin head" or "ohds": the dataset which is the head
3038  * (filesystem/volume) for the origin
3039  * "origin origin": the origin of the origin's filesystem (typically
3040  * NULL, indicating that the clone is not a clone of a clone).
3041  */
3042 int
3043 dsl_dataset_promote(const char *name, char *conflsnap)
3044 {
3045         dsl_dataset_t *ds;
3046         dsl_dir_t *dd;
3047         dsl_pool_t *dp;
3048         dmu_object_info_t doi;
3049         struct promotearg pa = { 0 };
3050         struct promotenode *snap;
3051         int err;
3052 
3053         err = dsl_dataset_hold(name, FTAG, &ds);
3054         if (err)
3055                 return (err);
3056         dd = ds->ds_dir;
3057         dp = dd->dd_pool;
3058 
3059         err = dmu_object_info(dp->dp_meta_objset,
3060             ds->ds_phys->ds_snapnames_zapobj, &doi);
3061         if (err) {
3062                 dsl_dataset_rele(ds, FTAG);
3063                 return (err);
3064         }
3065 
3066         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3067                 dsl_dataset_rele(ds, FTAG);
3068                 return (EINVAL);
3069         }
3070 
3071         /*
3072          * We are going to inherit all the snapshots taken before our
3073          * origin (i.e., our new origin will be our parent's origin).
3074          * Take ownership of them so that we can rename them into our
3075          * namespace.
3076          */
3077         rw_enter(&dp->dp_config_rwlock, RW_READER);
3078 
3079         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3080             &pa.shared_snaps);
3081         if (err != 0)
3082                 goto out;
3083 
3084         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3085         if (err != 0)
3086                 goto out;
3087 
3088         snap = list_head(&pa.shared_snaps);
3089         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3090         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3091             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3092         if (err != 0)
3093                 goto out;
3094 
3095         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3096                 err = dsl_dataset_hold_obj(dp,
3097                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
3098                     FTAG, &pa.origin_origin);
3099                 if (err != 0)
3100                         goto out;
3101         }
3102 
3103 out:
3104         rw_exit(&dp->dp_config_rwlock);
3105 
3106         /*
3107          * Add in 128x the snapnames zapobj size, since we will be moving
3108          * a bunch of snapnames to the promoted ds, and dirtying their
3109          * bonus buffers.
3110          */
3111         if (err == 0) {
3112                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3113                     dsl_dataset_promote_sync, ds, &pa,
3114                     2 + 2 * doi.doi_physical_blocks_512);
3115                 if (err && pa.err_ds && conflsnap)
3116                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3117         }
3118 
3119         snaplist_destroy(&pa.shared_snaps, B_TRUE);
3120         snaplist_destroy(&pa.clone_snaps, B_FALSE);
3121         snaplist_destroy(&pa.origin_snaps, B_FALSE);
3122         if (pa.origin_origin)
3123                 dsl_dataset_rele(pa.origin_origin, FTAG);
3124         dsl_dataset_rele(ds, FTAG);
3125         return (err);
3126 }
3127 
3128 struct cloneswaparg {
3129         dsl_dataset_t *cds; /* clone dataset */
3130         dsl_dataset_t *ohds; /* origin's head dataset */
3131         boolean_t force;
3132         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3133 };
3134 
3135 /* ARGSUSED */
3136 static int
3137 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3138 {
3139         struct cloneswaparg *csa = arg1;
3140 
3141         /* they should both be heads */
3142         if (dsl_dataset_is_snapshot(csa->cds) ||
3143             dsl_dataset_is_snapshot(csa->ohds))
3144                 return (EINVAL);
3145 
3146         /* the branch point should be just before them */
3147         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3148                 return (EINVAL);
3149 
3150         /* cds should be the clone (unless they are unrelated) */
3151         if (csa->cds->ds_prev != NULL &&
3152             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3153             csa->ohds->ds_object !=
3154             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3155                 return (EINVAL);
3156 
3157         /* the clone should be a child of the origin */
3158         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3159                 return (EINVAL);
3160 
3161         /* ohds shouldn't be modified unless 'force' */
3162         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3163                 return (ETXTBSY);
3164 
3165         /* adjust amount of any unconsumed refreservation */
3166         csa->unused_refres_delta =
3167             (int64_t)MIN(csa->ohds->ds_reserved,
3168             csa->ohds->ds_phys->ds_unique_bytes) -
3169             (int64_t)MIN(csa->ohds->ds_reserved,
3170             csa->cds->ds_phys->ds_unique_bytes);
3171 
3172         if (csa->unused_refres_delta > 0 &&
3173             csa->unused_refres_delta >
3174             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3175                 return (ENOSPC);
3176 
3177         if (csa->ohds->ds_quota != 0 &&
3178             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3179                 return (EDQUOT);
3180 
3181         return (0);
3182 }
3183 
3184 /* ARGSUSED */
3185 static void
3186 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3187 {
3188         struct cloneswaparg *csa = arg1;
3189         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3190 
3191         ASSERT(csa->cds->ds_reserved == 0);
3192         ASSERT(csa->ohds->ds_quota == 0 ||
3193             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3194 
3195         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3196         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3197 
3198         if (csa->cds->ds_objset != NULL) {
3199                 dmu_objset_evict(csa->cds->ds_objset);
3200                 csa->cds->ds_objset = NULL;
3201         }
3202 
3203         if (csa->ohds->ds_objset != NULL) {
3204                 dmu_objset_evict(csa->ohds->ds_objset);
3205                 csa->ohds->ds_objset = NULL;
3206         }
3207 
3208         /*
3209          * Reset origin's unique bytes, if it exists.
3210          */
3211         if (csa->cds->ds_prev) {
3212                 dsl_dataset_t *origin = csa->cds->ds_prev;
3213                 uint64_t comp, uncomp;
3214 
3215                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3216                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3217                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3218                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3219         }
3220 
3221         /* swap blkptrs */
3222         {
3223                 blkptr_t tmp;
3224                 tmp = csa->ohds->ds_phys->ds_bp;
3225                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3226                 csa->cds->ds_phys->ds_bp = tmp;
3227         }
3228 
3229         /* set dd_*_bytes */
3230         {
3231                 int64_t dused, dcomp, duncomp;
3232                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3233                 uint64_t odl_used, odl_comp, odl_uncomp;
3234 
3235                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3236                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3237 
3238                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3239                     &cdl_used, &cdl_comp, &cdl_uncomp);
3240                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3241                     &odl_used, &odl_comp, &odl_uncomp);
3242 
3243                 dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3244                     (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3245                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3246                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3247                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3248                     cdl_uncomp -
3249                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3250 
3251                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3252                     dused, dcomp, duncomp, tx);
3253                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3254                     -dused, -dcomp, -duncomp, tx);
3255 
3256                 /*
3257                  * The difference in the space used by snapshots is the
3258                  * difference in snapshot space due to the head's
3259                  * deadlist (since that's the only thing that's
3260                  * changing that affects the snapused).
3261                  */
3262                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3263                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3264                     &cdl_used, &cdl_comp, &cdl_uncomp);
3265                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3266                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3267                     &odl_used, &odl_comp, &odl_uncomp);
3268                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3269                     DD_USED_HEAD, DD_USED_SNAP, tx);
3270         }
3271 
3272         /* swap ds_*_bytes */
3273         SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3274             csa->cds->ds_phys->ds_referenced_bytes);
3275         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3276             csa->cds->ds_phys->ds_compressed_bytes);
3277         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3278             csa->cds->ds_phys->ds_uncompressed_bytes);
3279         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3280             csa->cds->ds_phys->ds_unique_bytes);
3281 
3282         /* apply any parent delta for change in unconsumed refreservation */
3283         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3284             csa->unused_refres_delta, 0, 0, tx);
3285 
3286         /*
3287          * Swap deadlists.
3288          */
3289         dsl_deadlist_close(&csa->cds->ds_deadlist);
3290         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3291         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3292             csa->cds->ds_phys->ds_deadlist_obj);
3293         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3294             csa->cds->ds_phys->ds_deadlist_obj);
3295         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3296             csa->ohds->ds_phys->ds_deadlist_obj);
3297 
3298         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3299 }
3300 
3301 /*
3302  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3303  * recv" into an existing fs to swizzle the file system to the new
3304  * version, and by "zfs rollback".  Can also be used to swap two
3305  * independent head datasets if neither has any snapshots.
3306  */
3307 int
3308 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3309     boolean_t force)
3310 {
3311         struct cloneswaparg csa;
3312         int error;
3313 
3314         ASSERT(clone->ds_owner);
3315         ASSERT(origin_head->ds_owner);
3316 retry:
3317         /*
3318          * Need exclusive access for the swap. If we're swapping these
3319          * datasets back after an error, we already hold the locks.
3320          */
3321         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3322                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3323         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3324             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3325                 rw_exit(&clone->ds_rwlock);
3326                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3327                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3328                         rw_exit(&origin_head->ds_rwlock);
3329                         goto retry;
3330                 }
3331         }
3332         csa.cds = clone;
3333         csa.ohds = origin_head;
3334         csa.force = force;
3335         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3336             dsl_dataset_clone_swap_check,
3337             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3338         return (error);
3339 }
3340 
3341 /*
3342  * Given a pool name and a dataset object number in that pool,
3343  * return the name of that dataset.
3344  */
3345 int
3346 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3347 {
3348         spa_t *spa;
3349         dsl_pool_t *dp;
3350         dsl_dataset_t *ds;
3351         int error;
3352 
3353         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3354                 return (error);
3355         dp = spa_get_dsl(spa);
3356         rw_enter(&dp->dp_config_rwlock, RW_READER);
3357         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3358                 dsl_dataset_name(ds, buf);
3359                 dsl_dataset_rele(ds, FTAG);
3360         }
3361         rw_exit(&dp->dp_config_rwlock);
3362         spa_close(spa, FTAG);
3363 
3364         return (error);
3365 }
3366 
3367 int
3368 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3369     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3370 {
3371         int error = 0;
3372 
3373         ASSERT3S(asize, >, 0);
3374 
3375         /*
3376          * *ref_rsrv is the portion of asize that will come from any
3377          * unconsumed refreservation space.
3378          */
3379         *ref_rsrv = 0;
3380 
3381         mutex_enter(&ds->ds_lock);
3382         /*
3383          * Make a space adjustment for reserved bytes.
3384          */
3385         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3386                 ASSERT3U(*used, >=,
3387                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3388                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3389                 *ref_rsrv =
3390                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3391         }
3392 
3393         if (!check_quota || ds->ds_quota == 0) {
3394                 mutex_exit(&ds->ds_lock);
3395                 return (0);
3396         }
3397         /*
3398          * If they are requesting more space, and our current estimate
3399          * is over quota, they get to try again unless the actual
3400          * on-disk is over quota and there are no pending changes (which
3401          * may free up space for us).
3402          */
3403         if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3404                 if (inflight > 0 ||
3405                     ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3406                         error = ERESTART;
3407                 else
3408                         error = EDQUOT;
3409         }
3410         mutex_exit(&ds->ds_lock);
3411 
3412         return (error);
3413 }
3414 
3415 /* ARGSUSED */
3416 static int
3417 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3418 {
3419         dsl_dataset_t *ds = arg1;
3420         dsl_prop_setarg_t *psa = arg2;
3421         int err;
3422 
3423         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3424                 return (ENOTSUP);
3425 
3426         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3427                 return (err);
3428 
3429         if (psa->psa_effective_value == 0)
3430                 return (0);
3431 
3432         if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3433             psa->psa_effective_value < ds->ds_reserved)
3434                 return (ENOSPC);
3435 
3436         return (0);
3437 }
3438 
3439 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3440 
3441 void
3442 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3443 {
3444         dsl_dataset_t *ds = arg1;
3445         dsl_prop_setarg_t *psa = arg2;
3446         uint64_t effective_value = psa->psa_effective_value;
3447 
3448         dsl_prop_set_sync(ds, psa, tx);
3449         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3450 
3451         if (ds->ds_quota != effective_value) {
3452                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3453                 ds->ds_quota = effective_value;
3454 
3455                 spa_history_log_internal(LOG_DS_REFQUOTA,
3456                     ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu ",
3457                     (longlong_t)ds->ds_quota, ds->ds_object);
3458         }
3459 }
3460 
3461 int
3462 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3463 {
3464         dsl_dataset_t *ds;
3465         dsl_prop_setarg_t psa;
3466         int err;
3467 
3468         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3469 
3470         err = dsl_dataset_hold(dsname, FTAG, &ds);
3471         if (err)
3472                 return (err);
3473 
3474         /*
3475          * If someone removes a file, then tries to set the quota, we
3476          * want to make sure the file freeing takes effect.
3477          */
3478         txg_wait_open(ds->ds_dir->dd_pool, 0);
3479 
3480         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3481             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3482             ds, &psa, 0);
3483 
3484         dsl_dataset_rele(ds, FTAG);
3485         return (err);
3486 }
3487 
3488 static int
3489 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3490 {
3491         dsl_dataset_t *ds = arg1;
3492         dsl_prop_setarg_t *psa = arg2;
3493         uint64_t effective_value;
3494         uint64_t unique;
3495         int err;
3496 
3497         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3498             SPA_VERSION_REFRESERVATION)
3499                 return (ENOTSUP);
3500 
3501         if (dsl_dataset_is_snapshot(ds))
3502                 return (EINVAL);
3503 
3504         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3505                 return (err);
3506 
3507         effective_value = psa->psa_effective_value;
3508 
3509         /*
3510          * If we are doing the preliminary check in open context, the
3511          * space estimates may be inaccurate.
3512          */
3513         if (!dmu_tx_is_syncing(tx))
3514                 return (0);
3515 
3516         mutex_enter(&ds->ds_lock);
3517         if (!DS_UNIQUE_IS_ACCURATE(ds))
3518                 dsl_dataset_recalc_head_uniq(ds);
3519         unique = ds->ds_phys->ds_unique_bytes;
3520         mutex_exit(&ds->ds_lock);
3521 
3522         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3523                 uint64_t delta = MAX(unique, effective_value) -
3524                     MAX(unique, ds->ds_reserved);
3525 
3526                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3527                         return (ENOSPC);
3528                 if (ds->ds_quota > 0 &&
3529                     effective_value > ds->ds_quota)
3530                         return (ENOSPC);
3531         }
3532 
3533         return (0);
3534 }
3535 
3536 static void
3537 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3538 {
3539         dsl_dataset_t *ds = arg1;
3540         dsl_prop_setarg_t *psa = arg2;
3541         uint64_t effective_value = psa->psa_effective_value;
3542         uint64_t unique;
3543         int64_t delta;
3544 
3545         dsl_prop_set_sync(ds, psa, tx);
3546         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3547 
3548         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3549 
3550         mutex_enter(&ds->ds_dir->dd_lock);
3551         mutex_enter(&ds->ds_lock);
3552         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3553         unique = ds->ds_phys->ds_unique_bytes;
3554         delta = MAX(0, (int64_t)(effective_value - unique)) -
3555             MAX(0, (int64_t)(ds->ds_reserved - unique));
3556         ds->ds_reserved = effective_value;
3557         mutex_exit(&ds->ds_lock);
3558 
3559         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3560         mutex_exit(&ds->ds_dir->dd_lock);
3561 
3562         spa_history_log_internal(LOG_DS_REFRESERV,
3563             ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu",
3564             (longlong_t)effective_value, ds->ds_object);
3565 }
3566 
3567 int
3568 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3569     uint64_t reservation)
3570 {
3571         dsl_dataset_t *ds;
3572         dsl_prop_setarg_t psa;
3573         int err;
3574 
3575         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3576             &reservation);
3577 
3578         err = dsl_dataset_hold(dsname, FTAG, &ds);
3579         if (err)
3580                 return (err);
3581 
3582         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3583             dsl_dataset_set_reservation_check,
3584             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3585 
3586         dsl_dataset_rele(ds, FTAG);
3587         return (err);
3588 }
3589 
3590 typedef struct zfs_hold_cleanup_arg {
3591         dsl_pool_t *dp;
3592         uint64_t dsobj;
3593         char htag[MAXNAMELEN];
3594 } zfs_hold_cleanup_arg_t;
3595 
3596 static void
3597 dsl_dataset_user_release_onexit(void *arg)
3598 {
3599         zfs_hold_cleanup_arg_t *ca = arg;
3600 
3601         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3602             B_TRUE);
3603         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3604 }
3605 
3606 void
3607 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3608     minor_t minor)
3609 {
3610         zfs_hold_cleanup_arg_t *ca;
3611 
3612         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3613         ca->dp = ds->ds_dir->dd_pool;
3614         ca->dsobj = ds->ds_object;
3615         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3616         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3617             dsl_dataset_user_release_onexit, ca, NULL));
3618 }
3619 
3620 /*
3621  * If you add new checks here, you may need to add
3622  * additional checks to the "temporary" case in
3623  * snapshot_check() in dmu_objset.c.
3624  */
3625 static int
3626 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3627 {
3628         dsl_dataset_t *ds = arg1;
3629         struct dsl_ds_holdarg *ha = arg2;
3630         char *htag = ha->htag;
3631         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3632         int error = 0;
3633 
3634         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3635                 return (ENOTSUP);
3636 
3637         if (!dsl_dataset_is_snapshot(ds))
3638                 return (EINVAL);
3639 
3640         /* tags must be unique */
3641         mutex_enter(&ds->ds_lock);
3642         if (ds->ds_phys->ds_userrefs_obj) {
3643                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3644                     8, 1, tx);
3645                 if (error == 0)
3646                         error = EEXIST;
3647                 else if (error == ENOENT)
3648                         error = 0;
3649         }
3650         mutex_exit(&ds->ds_lock);
3651 
3652         if (error == 0 && ha->temphold &&
3653             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3654                 error = E2BIG;
3655 
3656         return (error);
3657 }
3658 
3659 void
3660 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3661 {
3662         dsl_dataset_t *ds = arg1;
3663         struct dsl_ds_holdarg *ha = arg2;
3664         char *htag = ha->htag;
3665         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3666         objset_t *mos = dp->dp_meta_objset;
3667         uint64_t now = gethrestime_sec();
3668         uint64_t zapobj;
3669 
3670         mutex_enter(&ds->ds_lock);
3671         if (ds->ds_phys->ds_userrefs_obj == 0) {
3672                 /*
3673                  * This is the first user hold for this dataset.  Create
3674                  * the userrefs zap object.
3675                  */
3676                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3677                 zapobj = ds->ds_phys->ds_userrefs_obj =
3678                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3679         } else {
3680                 zapobj = ds->ds_phys->ds_userrefs_obj;
3681         }
3682         ds->ds_userrefs++;
3683         mutex_exit(&ds->ds_lock);
3684 
3685         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3686 
3687         if (ha->temphold) {
3688                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3689                     htag, &now, tx));
3690         }
3691 
3692         spa_history_log_internal(LOG_DS_USER_HOLD,
3693             dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3694             (int)ha->temphold, ds->ds_object);
3695 }
3696 
3697 static int
3698 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3699 {
3700         struct dsl_ds_holdarg *ha = arg;
3701         dsl_dataset_t *ds;
3702         int error;
3703         char *name;
3704 
3705         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3706         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3707         error = dsl_dataset_hold(name, ha->dstg, &ds);
3708         strfree(name);
3709         if (error == 0) {
3710                 ha->gotone = B_TRUE;
3711                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3712                     dsl_dataset_user_hold_sync, ds, ha, 0);
3713         } else if (error == ENOENT && ha->recursive) {
3714                 error = 0;
3715         } else {
3716                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3717         }
3718         return (error);
3719 }
3720 
3721 int
3722 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3723     boolean_t temphold)
3724 {
3725         struct dsl_ds_holdarg *ha;
3726         int error;
3727 
3728         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3729         ha->htag = htag;
3730         ha->temphold = temphold;
3731         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3732             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3733             ds, ha, 0);
3734         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3735 
3736         return (error);
3737 }
3738 
3739 int
3740 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3741     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3742 {
3743         struct dsl_ds_holdarg *ha;
3744         dsl_sync_task_t *dst;
3745         spa_t *spa;
3746         int error;
3747         minor_t minor = 0;
3748 
3749         if (cleanup_fd != -1) {
3750                 /* Currently we only support cleanup-on-exit of tempholds. */
3751                 if (!temphold)
3752                         return (EINVAL);
3753                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3754                 if (error)
3755                         return (error);
3756         }
3757 
3758         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3759 
3760         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3761 
3762         error = spa_open(dsname, &spa, FTAG);
3763         if (error) {
3764                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3765                 if (cleanup_fd != -1)
3766                         zfs_onexit_fd_rele(cleanup_fd);
3767                 return (error);
3768         }
3769 
3770         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3771         ha->htag = htag;
3772         ha->snapname = snapname;
3773         ha->recursive = recursive;
3774         ha->temphold = temphold;
3775 
3776         if (recursive) {
3777                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3778                     ha, DS_FIND_CHILDREN);
3779         } else {
3780                 error = dsl_dataset_user_hold_one(dsname, ha);
3781         }
3782         if (error == 0)
3783                 error = dsl_sync_task_group_wait(ha->dstg);
3784 
3785         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3786             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3787                 dsl_dataset_t *ds = dst->dst_arg1;
3788 
3789                 if (dst->dst_err) {
3790                         dsl_dataset_name(ds, ha->failed);
3791                         *strchr(ha->failed, '@') = '\0';
3792                 } else if (error == 0 && minor != 0 && temphold) {
3793                         /*
3794                          * If this hold is to be released upon process exit,
3795                          * register that action now.
3796                          */
3797                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3798                 }
3799                 dsl_dataset_rele(ds, ha->dstg);
3800         }
3801 
3802         if (error == 0 && recursive && !ha->gotone)
3803                 error = ENOENT;
3804 
3805         if (error)
3806                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3807 
3808         dsl_sync_task_group_destroy(ha->dstg);
3809 
3810         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3811         spa_close(spa, FTAG);
3812         if (cleanup_fd != -1)
3813                 zfs_onexit_fd_rele(cleanup_fd);
3814         return (error);
3815 }
3816 
3817 struct dsl_ds_releasearg {
3818         dsl_dataset_t *ds;
3819         const char *htag;
3820         boolean_t own;          /* do we own or just hold ds? */
3821 };
3822 
3823 static int
3824 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3825     boolean_t *might_destroy)
3826 {
3827         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3828         uint64_t zapobj;
3829         uint64_t tmp;
3830         int error;
3831 
3832         *might_destroy = B_FALSE;
3833 
3834         mutex_enter(&ds->ds_lock);
3835         zapobj = ds->ds_phys->ds_userrefs_obj;
3836         if (zapobj == 0) {
3837                 /* The tag can't possibly exist */
3838                 mutex_exit(&ds->ds_lock);
3839                 return (ESRCH);
3840         }
3841 
3842         /* Make sure the tag exists */
3843         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3844         if (error) {
3845                 mutex_exit(&ds->ds_lock);
3846                 if (error == ENOENT)
3847                         error = ESRCH;
3848                 return (error);
3849         }
3850 
3851         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3852             DS_IS_DEFER_DESTROY(ds))
3853                 *might_destroy = B_TRUE;
3854 
3855         mutex_exit(&ds->ds_lock);
3856         return (0);
3857 }
3858 
3859 static int
3860 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3861 {
3862         struct dsl_ds_releasearg *ra = arg1;
3863         dsl_dataset_t *ds = ra->ds;
3864         boolean_t might_destroy;
3865         int error;
3866 
3867         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3868                 return (ENOTSUP);
3869 
3870         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3871         if (error)
3872                 return (error);
3873 
3874         if (might_destroy) {
3875                 struct dsl_ds_destroyarg dsda = {0};
3876 
3877                 if (dmu_tx_is_syncing(tx)) {
3878                         /*
3879                          * If we're not prepared to remove the snapshot,
3880                          * we can't allow the release to happen right now.
3881                          */
3882                         if (!ra->own)
3883                                 return (EBUSY);
3884                 }
3885                 dsda.ds = ds;
3886                 dsda.releasing = B_TRUE;
3887                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
3888         }
3889 
3890         return (0);
3891 }
3892 
3893 static void
3894 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3895 {
3896         struct dsl_ds_releasearg *ra = arg1;
3897         dsl_dataset_t *ds = ra->ds;
3898         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3899         objset_t *mos = dp->dp_meta_objset;
3900         uint64_t zapobj;
3901         uint64_t dsobj = ds->ds_object;
3902         uint64_t refs;
3903         int error;
3904 
3905         mutex_enter(&ds->ds_lock);
3906         ds->ds_userrefs--;
3907         refs = ds->ds_userrefs;
3908         mutex_exit(&ds->ds_lock);
3909         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3910         VERIFY(error == 0 || error == ENOENT);
3911         zapobj = ds->ds_phys->ds_userrefs_obj;
3912         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3913         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3914             DS_IS_DEFER_DESTROY(ds)) {
3915                 struct dsl_ds_destroyarg dsda = {0};
3916 
3917                 ASSERT(ra->own);
3918                 dsda.ds = ds;
3919                 dsda.releasing = B_TRUE;
3920                 /* We already did the destroy_check */
3921                 dsl_dataset_destroy_sync(&dsda, tag, tx);
3922         }
3923 
3924         spa_history_log_internal(LOG_DS_USER_RELEASE,
3925             dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3926             ra->htag, (longlong_t)refs, dsobj);
3927 }
3928 
3929 static int
3930 dsl_dataset_user_release_one(const char *dsname, void *arg)
3931 {
3932         struct dsl_ds_holdarg *ha = arg;
3933         struct dsl_ds_releasearg *ra;
3934         dsl_dataset_t *ds;
3935         int error;
3936         void *dtag = ha->dstg;
3937         char *name;
3938         boolean_t own = B_FALSE;
3939         boolean_t might_destroy;
3940 
3941         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3942         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3943         error = dsl_dataset_hold(name, dtag, &ds);
3944         strfree(name);
3945         if (error == ENOENT && ha->recursive)
3946                 return (0);
3947         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3948         if (error)
3949                 return (error);
3950 
3951         ha->gotone = B_TRUE;
3952 
3953         ASSERT(dsl_dataset_is_snapshot(ds));
3954 
3955         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3956         if (error) {
3957                 dsl_dataset_rele(ds, dtag);
3958                 return (error);
3959         }
3960 
3961         if (might_destroy) {
3962 #ifdef _KERNEL
3963                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3964                 error = zfs_unmount_snap(name, NULL);
3965                 strfree(name);
3966                 if (error) {
3967                         dsl_dataset_rele(ds, dtag);
3968                         return (error);
3969                 }
3970 #endif
3971                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3972                         dsl_dataset_rele(ds, dtag);
3973                         return (EBUSY);
3974                 } else {
3975                         own = B_TRUE;
3976                         dsl_dataset_make_exclusive(ds, dtag);
3977                 }
3978         }
3979 
3980         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
3981         ra->ds = ds;
3982         ra->htag = ha->htag;
3983         ra->own = own;
3984         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
3985             dsl_dataset_user_release_sync, ra, dtag, 0);
3986 
3987         return (0);
3988 }
3989 
3990 int
3991 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
3992     boolean_t recursive)
3993 {
3994         struct dsl_ds_holdarg *ha;
3995         dsl_sync_task_t *dst;
3996         spa_t *spa;
3997         int error;
3998 
3999 top:
4000         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4001 
4002         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4003 
4004         error = spa_open(dsname, &spa, FTAG);
4005         if (error) {
4006                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4007                 return (error);
4008         }
4009 
4010         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4011         ha->htag = htag;
4012         ha->snapname = snapname;
4013         ha->recursive = recursive;
4014         if (recursive) {
4015                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4016                     ha, DS_FIND_CHILDREN);
4017         } else {
4018                 error = dsl_dataset_user_release_one(dsname, ha);
4019         }
4020         if (error == 0)
4021                 error = dsl_sync_task_group_wait(ha->dstg);
4022 
4023         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4024             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4025                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
4026                 dsl_dataset_t *ds = ra->ds;
4027 
4028                 if (dst->dst_err)
4029                         dsl_dataset_name(ds, ha->failed);
4030 
4031                 if (ra->own)
4032                         dsl_dataset_disown(ds, ha->dstg);
4033                 else
4034                         dsl_dataset_rele(ds, ha->dstg);
4035 
4036                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4037         }
4038 
4039         if (error == 0 && recursive && !ha->gotone)
4040                 error = ENOENT;
4041 
4042         if (error && error != EBUSY)
4043                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4044 
4045         dsl_sync_task_group_destroy(ha->dstg);
4046         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4047         spa_close(spa, FTAG);
4048 
4049         /*
4050          * We can get EBUSY if we were racing with deferred destroy and
4051          * dsl_dataset_user_release_check() hadn't done the necessary
4052          * open context setup.  We can also get EBUSY if we're racing
4053          * with destroy and that thread is the ds_owner.  Either way
4054          * the busy condition should be transient, and we should retry
4055          * the release operation.
4056          */
4057         if (error == EBUSY)
4058                 goto top;
4059 
4060         return (error);
4061 }
4062 
4063 /*
4064  * Called at spa_load time (with retry == B_FALSE) to release a stale
4065  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4066  */
4067 int
4068 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4069     boolean_t retry)
4070 {
4071         dsl_dataset_t *ds;
4072         char *snap;
4073         char *name;
4074         int namelen;
4075         int error;
4076 
4077         do {
4078                 rw_enter(&dp->dp_config_rwlock, RW_READER);
4079                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4080                 rw_exit(&dp->dp_config_rwlock);
4081                 if (error)
4082                         return (error);
4083                 namelen = dsl_dataset_namelen(ds)+1;
4084                 name = kmem_alloc(namelen, KM_SLEEP);
4085                 dsl_dataset_name(ds, name);
4086                 dsl_dataset_rele(ds, FTAG);
4087 
4088                 snap = strchr(name, '@');
4089                 *snap = '\0';
4090                 ++snap;
4091                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4092                 kmem_free(name, namelen);
4093 
4094                 /*
4095                  * The object can't have been destroyed because we have a hold,
4096                  * but it might have been renamed, resulting in ENOENT.  Retry
4097                  * if we've been requested to do so.
4098                  *
4099                  * It would be nice if we could use the dsobj all the way
4100                  * through and avoid ENOENT entirely.  But we might need to
4101                  * unmount the snapshot, and there's currently no way to lookup
4102                  * a vfsp using a ZFS object id.
4103                  */
4104         } while ((error == ENOENT) && retry);
4105 
4106         return (error);
4107 }
4108 
4109 int
4110 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4111 {
4112         dsl_dataset_t *ds;
4113         int err;
4114 
4115         err = dsl_dataset_hold(dsname, FTAG, &ds);
4116         if (err)
4117                 return (err);
4118 
4119         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4120         if (ds->ds_phys->ds_userrefs_obj != 0) {
4121                 zap_attribute_t *za;
4122                 zap_cursor_t zc;
4123 
4124                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4125                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4126                     ds->ds_phys->ds_userrefs_obj);
4127                     zap_cursor_retrieve(&zc, za) == 0;
4128                     zap_cursor_advance(&zc)) {
4129                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4130                             za->za_first_integer));
4131                 }
4132                 zap_cursor_fini(&zc);
4133                 kmem_free(za, sizeof (zap_attribute_t));
4134         }
4135         dsl_dataset_rele(ds, FTAG);
4136         return (0);
4137 }
4138 
4139 /*
4140  * Note, this function is used as the callback for dmu_objset_find().  We
4141  * always return 0 so that we will continue to find and process
4142  * inconsistent datasets, even if we encounter an error trying to
4143  * process one of them.
4144  */
4145 /* ARGSUSED */
4146 int
4147 dsl_destroy_inconsistent(const char *dsname, void *arg)
4148 {
4149         dsl_dataset_t *ds;
4150 
4151         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4152                 if (DS_IS_INCONSISTENT(ds))
4153                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4154                 else
4155                         dsl_dataset_disown(ds, FTAG);
4156         }
4157         return (0);
4158 }
4159 
4160 /*
4161  * Return (in *usedp) the amount of space written in new that is not
4162  * present in oldsnap.  New may be a snapshot or the head.  Old must be
4163  * a snapshot before new, in new's filesystem (or its origin).  If not then
4164  * fail and return EINVAL.
4165  *
4166  * The written space is calculated by considering two components:  First, we
4167  * ignore any freed space, and calculate the written as new's used space
4168  * minus old's used space.  Next, we add in the amount of space that was freed
4169  * between the two snapshots, thus reducing new's used space relative to old's.
4170  * Specifically, this is the space that was born before old->ds_creation_txg,
4171  * and freed before new (ie. on new's deadlist or a previous deadlist).
4172  *
4173  * space freed                         [---------------------]
4174  * snapshots                       ---O-------O--------O-------O------
4175  *                                         oldsnap            new
4176  */
4177 int
4178 dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4179     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4180 {
4181         int err = 0;
4182         uint64_t snapobj;
4183         dsl_pool_t *dp = new->ds_dir->dd_pool;
4184 
4185         *usedp = 0;
4186         *usedp += new->ds_phys->ds_referenced_bytes;
4187         *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4188 
4189         *compp = 0;
4190         *compp += new->ds_phys->ds_compressed_bytes;
4191         *compp -= oldsnap->ds_phys->ds_compressed_bytes;
4192 
4193         *uncompp = 0;
4194         *uncompp += new->ds_phys->ds_uncompressed_bytes;
4195         *uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4196 
4197         rw_enter(&dp->dp_config_rwlock, RW_READER);
4198         snapobj = new->ds_object;
4199         while (snapobj != oldsnap->ds_object) {
4200                 dsl_dataset_t *snap;
4201                 uint64_t used, comp, uncomp;
4202 
4203                 if (snapobj == new->ds_object) {
4204                         snap = new;
4205                 } else {
4206                         err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4207                         if (err != 0)
4208                                 break;
4209                 }
4210 
4211                 if (snap->ds_phys->ds_prev_snap_txg ==
4212                     oldsnap->ds_phys->ds_creation_txg) {
4213                         /*
4214                          * The blocks in the deadlist can not be born after
4215                          * ds_prev_snap_txg, so get the whole deadlist space,
4216                          * which is more efficient (especially for old-format
4217                          * deadlists).  Unfortunately the deadlist code
4218                          * doesn't have enough information to make this
4219                          * optimization itself.
4220                          */
4221                         dsl_deadlist_space(&snap->ds_deadlist,
4222                             &used, &comp, &uncomp);
4223                 } else {
4224                         dsl_deadlist_space_range(&snap->ds_deadlist,
4225                             0, oldsnap->ds_phys->ds_creation_txg,
4226                             &used, &comp, &uncomp);
4227                 }
4228                 *usedp += used;
4229                 *compp += comp;
4230                 *uncompp += uncomp;
4231 
4232                 /*
4233                  * If we get to the beginning of the chain of snapshots
4234                  * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4235                  * was not a snapshot of/before new.
4236                  */
4237                 snapobj = snap->ds_phys->ds_prev_snap_obj;
4238                 if (snap != new)
4239                         dsl_dataset_rele(snap, FTAG);
4240                 if (snapobj == 0) {
4241                         err = EINVAL;
4242                         break;
4243                 }
4244 
4245         }
4246         rw_exit(&dp->dp_config_rwlock);
4247         return (err);
4248 }
4249 
4250 /*
4251  * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4252  * lastsnap, and all snapshots in between are deleted.
4253  *
4254  * blocks that would be freed            [---------------------------]
4255  * snapshots                       ---O-------O--------O-------O--------O
4256  *                                        firstsnap        lastsnap
4257  *
4258  * This is the set of blocks that were born after the snap before firstsnap,
4259  * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4260  * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4261  * We calculate this by iterating over the relevant deadlists (from the snap
4262  * after lastsnap, backward to the snap after firstsnap), summing up the
4263  * space on the deadlist that was born after the snap before firstsnap.
4264  */
4265 int
4266 dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4267     dsl_dataset_t *lastsnap,
4268     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4269 {
4270         int err = 0;
4271         uint64_t snapobj;
4272         dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4273 
4274         ASSERT(dsl_dataset_is_snapshot(firstsnap));
4275         ASSERT(dsl_dataset_is_snapshot(lastsnap));
4276 
4277         /*
4278          * Check that the snapshots are in the same dsl_dir, and firstsnap
4279          * is before lastsnap.
4280          */
4281         if (firstsnap->ds_dir != lastsnap->ds_dir ||
4282             firstsnap->ds_phys->ds_creation_txg >
4283             lastsnap->ds_phys->ds_creation_txg)
4284                 return (EINVAL);
4285 
4286         *usedp = *compp = *uncompp = 0;
4287 
4288         rw_enter(&dp->dp_config_rwlock, RW_READER);
4289         snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4290         while (snapobj != firstsnap->ds_object) {
4291                 dsl_dataset_t *ds;
4292                 uint64_t used, comp, uncomp;
4293 
4294                 err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4295                 if (err != 0)
4296                         break;
4297 
4298                 dsl_deadlist_space_range(&ds->ds_deadlist,
4299                     firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4300                     &used, &comp, &uncomp);
4301                 *usedp += used;
4302                 *compp += comp;
4303                 *uncompp += uncomp;
4304 
4305                 snapobj = ds->ds_phys->ds_prev_snap_obj;
4306                 ASSERT3U(snapobj, !=, 0);
4307                 dsl_dataset_rele(ds, FTAG);
4308         }
4309         rw_exit(&dp->dp_config_rwlock);
4310         return (err);
4311 }