1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  25  */
  26 
  27 #include <sys/dmu_objset.h>
  28 #include <sys/dsl_dataset.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_prop.h>
  31 #include <sys/dsl_synctask.h>
  32 #include <sys/dmu_traverse.h>
  33 #include <sys/dmu_impl.h>
  34 #include <sys/dmu_tx.h>
  35 #include <sys/arc.h>
  36 #include <sys/zio.h>
  37 #include <sys/zap.h>
  38 #include <sys/zfeature.h>
  39 #include <sys/unique.h>
  40 #include <sys/zfs_context.h>
  41 #include <sys/zfs_ioctl.h>
  42 #include <sys/spa.h>
  43 #include <sys/zfs_znode.h>
  44 #include <sys/zfs_onexit.h>
  45 #include <sys/zvol.h>
  46 #include <sys/dsl_scan.h>
  47 #include <sys/dsl_deadlist.h>
  48 
  49 static char *dsl_reaper = "the grim reaper";
  50 
  51 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
  52 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
  53 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
  54 
  55 #define SWITCH64(x, y) \
  56         { \
  57                 uint64_t __tmp = (x); \
  58                 (x) = (y); \
  59                 (y) = __tmp; \
  60         }
  61 
  62 #define DS_REF_MAX      (1ULL << 62)
  63 
  64 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
  65 
  66 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
  67 
  68 
  69 /*
  70  * Figure out how much of this delta should be propogated to the dsl_dir
  71  * layer.  If there's a refreservation, that space has already been
  72  * partially accounted for in our ancestors.
  73  */
  74 static int64_t
  75 parent_delta(dsl_dataset_t *ds, int64_t delta)
  76 {
  77         uint64_t old_bytes, new_bytes;
  78 
  79         if (ds->ds_reserved == 0)
  80                 return (delta);
  81 
  82         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
  83         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
  84 
  85         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
  86         return (new_bytes - old_bytes);
  87 }
  88 
  89 void
  90 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
  91 {
  92         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
  93         int compressed = BP_GET_PSIZE(bp);
  94         int uncompressed = BP_GET_UCSIZE(bp);
  95         int64_t delta;
  96 
  97         dprintf_bp(bp, "ds=%p", ds);
  98 
  99         ASSERT(dmu_tx_is_syncing(tx));
 100         /* It could have been compressed away to nothing */
 101         if (BP_IS_HOLE(bp))
 102                 return;
 103         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
 104         ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
 105         if (ds == NULL) {
 106                 /*
 107                  * Account for the meta-objset space in its placeholder
 108                  * dsl_dir.
 109                  */
 110                 ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
 111                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
 112                     used, compressed, uncompressed, tx);
 113                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
 114                 return;
 115         }
 116         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 117 
 118         mutex_enter(&ds->ds_dir->dd_lock);
 119         mutex_enter(&ds->ds_lock);
 120         delta = parent_delta(ds, used);
 121         ds->ds_phys->ds_referenced_bytes += used;
 122         ds->ds_phys->ds_compressed_bytes += compressed;
 123         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
 124         ds->ds_phys->ds_unique_bytes += used;
 125         mutex_exit(&ds->ds_lock);
 126         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
 127             compressed, uncompressed, tx);
 128         dsl_dir_transfer_space(ds->ds_dir, used - delta,
 129             DD_USED_REFRSRV, DD_USED_HEAD, tx);
 130         mutex_exit(&ds->ds_dir->dd_lock);
 131 }
 132 
 133 int
 134 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
 135     boolean_t async)
 136 {
 137         if (BP_IS_HOLE(bp))
 138                 return (0);
 139 
 140         ASSERT(dmu_tx_is_syncing(tx));
 141         ASSERT(bp->blk_birth <= tx->tx_txg);
 142 
 143         int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
 144         int compressed = BP_GET_PSIZE(bp);
 145         int uncompressed = BP_GET_UCSIZE(bp);
 146 
 147         ASSERT(used > 0);
 148         if (ds == NULL) {
 149                 /*
 150                  * Account for the meta-objset space in its placeholder
 151                  * dataset.
 152                  */
 153                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 154 
 155                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
 156                     -used, -compressed, -uncompressed, tx);
 157                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
 158                 return (used);
 159         }
 160         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
 161 
 162         ASSERT(!dsl_dataset_is_snapshot(ds));
 163         dmu_buf_will_dirty(ds->ds_dbuf, tx);
 164 
 165         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
 166                 int64_t delta;
 167 
 168                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
 169                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
 170 
 171                 mutex_enter(&ds->ds_dir->dd_lock);
 172                 mutex_enter(&ds->ds_lock);
 173                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
 174                     !DS_UNIQUE_IS_ACCURATE(ds));
 175                 delta = parent_delta(ds, -used);
 176                 ds->ds_phys->ds_unique_bytes -= used;
 177                 mutex_exit(&ds->ds_lock);
 178                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
 179                     delta, -compressed, -uncompressed, tx);
 180                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
 181                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
 182                 mutex_exit(&ds->ds_dir->dd_lock);
 183         } else {
 184                 dprintf_bp(bp, "putting on dead list: %s", "");
 185                 if (async) {
 186                         /*
 187                          * We are here as part of zio's write done callback,
 188                          * which means we're a zio interrupt thread.  We can't
 189                          * call dsl_deadlist_insert() now because it may block
 190                          * waiting for I/O.  Instead, put bp on the deferred
 191                          * queue and let dsl_pool_sync() finish the job.
 192                          */
 193                         bplist_append(&ds->ds_pending_deadlist, bp);
 194                 } else {
 195                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
 196                 }
 197                 ASSERT3U(ds->ds_prev->ds_object, ==,
 198                     ds->ds_phys->ds_prev_snap_obj);
 199                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
 200                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
 201                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
 202                     ds->ds_object && bp->blk_birth >
 203                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
 204                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
 205                         mutex_enter(&ds->ds_prev->ds_lock);
 206                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
 207                         mutex_exit(&ds->ds_prev->ds_lock);
 208                 }
 209                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
 210                         dsl_dir_transfer_space(ds->ds_dir, used,
 211                             DD_USED_HEAD, DD_USED_SNAP, tx);
 212                 }
 213         }
 214         mutex_enter(&ds->ds_lock);
 215         ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
 216         ds->ds_phys->ds_referenced_bytes -= used;
 217         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
 218         ds->ds_phys->ds_compressed_bytes -= compressed;
 219         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
 220         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
 221         mutex_exit(&ds->ds_lock);
 222 
 223         return (used);
 224 }
 225 
 226 uint64_t
 227 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
 228 {
 229         uint64_t trysnap = 0;
 230 
 231         if (ds == NULL)
 232                 return (0);
 233         /*
 234          * The snapshot creation could fail, but that would cause an
 235          * incorrect FALSE return, which would only result in an
 236          * overestimation of the amount of space that an operation would
 237          * consume, which is OK.
 238          *
 239          * There's also a small window where we could miss a pending
 240          * snapshot, because we could set the sync task in the quiescing
 241          * phase.  So this should only be used as a guess.
 242          */
 243         if (ds->ds_trysnap_txg >
 244             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
 245                 trysnap = ds->ds_trysnap_txg;
 246         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
 247 }
 248 
 249 boolean_t
 250 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
 251     uint64_t blk_birth)
 252 {
 253         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
 254                 return (B_FALSE);
 255 
 256         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
 257 
 258         return (B_TRUE);
 259 }
 260 
 261 /* ARGSUSED */
 262 static void
 263 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
 264 {
 265         dsl_dataset_t *ds = dsv;
 266 
 267         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 268 
 269         unique_remove(ds->ds_fsid_guid);
 270 
 271         if (ds->ds_objset != NULL)
 272                 dmu_objset_evict(ds->ds_objset);
 273 
 274         if (ds->ds_prev) {
 275                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 276                 ds->ds_prev = NULL;
 277         }
 278 
 279         bplist_destroy(&ds->ds_pending_deadlist);
 280         if (db != NULL) {
 281                 dsl_deadlist_close(&ds->ds_deadlist);
 282         } else {
 283                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
 284                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
 285         }
 286         if (ds->ds_dir)
 287                 dsl_dir_close(ds->ds_dir, ds);
 288 
 289         ASSERT(!list_link_active(&ds->ds_synced_link));
 290 
 291         mutex_destroy(&ds->ds_lock);
 292         mutex_destroy(&ds->ds_recvlock);
 293         mutex_destroy(&ds->ds_opening_lock);
 294         rw_destroy(&ds->ds_rwlock);
 295         cv_destroy(&ds->ds_exclusive_cv);
 296 
 297         kmem_free(ds, sizeof (dsl_dataset_t));
 298 }
 299 
 300 static int
 301 dsl_dataset_get_snapname(dsl_dataset_t *ds)
 302 {
 303         dsl_dataset_phys_t *headphys;
 304         int err;
 305         dmu_buf_t *headdbuf;
 306         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 307         objset_t *mos = dp->dp_meta_objset;
 308 
 309         if (ds->ds_snapname[0])
 310                 return (0);
 311         if (ds->ds_phys->ds_next_snap_obj == 0)
 312                 return (0);
 313 
 314         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
 315             FTAG, &headdbuf);
 316         if (err)
 317                 return (err);
 318         headphys = headdbuf->db_data;
 319         err = zap_value_search(dp->dp_meta_objset,
 320             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
 321         dmu_buf_rele(headdbuf, FTAG);
 322         return (err);
 323 }
 324 
 325 static int
 326 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 327 {
 328         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 329         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 330         matchtype_t mt;
 331         int err;
 332 
 333         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 334                 mt = MT_FIRST;
 335         else
 336                 mt = MT_EXACT;
 337 
 338         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 339             value, mt, NULL, 0, NULL);
 340         if (err == ENOTSUP && mt == MT_FIRST)
 341                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
 342         return (err);
 343 }
 344 
 345 static int
 346 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
 347 {
 348         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 349         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 350         matchtype_t mt;
 351         int err;
 352 
 353         dsl_dir_snap_cmtime_update(ds->ds_dir);
 354 
 355         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 356                 mt = MT_FIRST;
 357         else
 358                 mt = MT_EXACT;
 359 
 360         err = zap_remove_norm(mos, snapobj, name, mt, tx);
 361         if (err == ENOTSUP && mt == MT_FIRST)
 362                 err = zap_remove(mos, snapobj, name, tx);
 363         return (err);
 364 }
 365 
 366 static int
 367 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 368     dsl_dataset_t **dsp)
 369 {
 370         objset_t *mos = dp->dp_meta_objset;
 371         dmu_buf_t *dbuf;
 372         dsl_dataset_t *ds;
 373         int err;
 374         dmu_object_info_t doi;
 375 
 376         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
 377             dsl_pool_sync_context(dp));
 378 
 379         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
 380         if (err)
 381                 return (err);
 382 
 383         /* Make sure dsobj has the correct object type. */
 384         dmu_object_info_from_db(dbuf, &doi);
 385         if (doi.doi_type != DMU_OT_DSL_DATASET)
 386                 return (EINVAL);
 387 
 388         ds = dmu_buf_get_user(dbuf);
 389         if (ds == NULL) {
 390                 dsl_dataset_t *winner;
 391 
 392                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
 393                 ds->ds_dbuf = dbuf;
 394                 ds->ds_object = dsobj;
 395                 ds->ds_phys = dbuf->db_data;
 396 
 397                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
 398                 mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
 399                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 400                 mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
 401 
 402                 rw_init(&ds->ds_rwlock, 0, 0, 0);
 403                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 404 
 405                 bplist_create(&ds->ds_pending_deadlist);
 406                 dsl_deadlist_open(&ds->ds_deadlist,
 407                     mos, ds->ds_phys->ds_deadlist_obj);
 408 
 409                 list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
 410                     offsetof(dmu_sendarg_t, dsa_link));
 411 
 412                 if (err == 0) {
 413                         err = dsl_dir_open_obj(dp,
 414                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
 415                 }
 416                 if (err) {
 417                         mutex_destroy(&ds->ds_lock);
 418                         mutex_destroy(&ds->ds_recvlock);
 419                         mutex_destroy(&ds->ds_opening_lock);
 420                         rw_destroy(&ds->ds_rwlock);
 421                         cv_destroy(&ds->ds_exclusive_cv);
 422                         bplist_destroy(&ds->ds_pending_deadlist);
 423                         dsl_deadlist_close(&ds->ds_deadlist);
 424                         kmem_free(ds, sizeof (dsl_dataset_t));
 425                         dmu_buf_rele(dbuf, tag);
 426                         return (err);
 427                 }
 428 
 429                 if (!dsl_dataset_is_snapshot(ds)) {
 430                         ds->ds_snapname[0] = '\0';
 431                         if (ds->ds_phys->ds_prev_snap_obj) {
 432                                 err = dsl_dataset_get_ref(dp,
 433                                     ds->ds_phys->ds_prev_snap_obj,
 434                                     ds, &ds->ds_prev);
 435                         }
 436                 } else {
 437                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
 438                                 err = dsl_dataset_get_snapname(ds);
 439                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
 440                                 err = zap_count(
 441                                     ds->ds_dir->dd_pool->dp_meta_objset,
 442                                     ds->ds_phys->ds_userrefs_obj,
 443                                     &ds->ds_userrefs);
 444                         }
 445                 }
 446 
 447                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
 448                         /*
 449                          * In sync context, we're called with either no lock
 450                          * or with the write lock.  If we're not syncing,
 451                          * we're always called with the read lock held.
 452                          */
 453                         boolean_t need_lock =
 454                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
 455                             dsl_pool_sync_context(dp);
 456 
 457                         if (need_lock)
 458                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 459 
 460                         err = dsl_prop_get_ds(ds,
 461                             "refreservation", sizeof (uint64_t), 1,
 462                             &ds->ds_reserved, NULL);
 463                         if (err == 0) {
 464                                 err = dsl_prop_get_ds(ds,
 465                                     "refquota", sizeof (uint64_t), 1,
 466                                     &ds->ds_quota, NULL);
 467                         }
 468 
 469                         if (need_lock)
 470                                 rw_exit(&dp->dp_config_rwlock);
 471                 } else {
 472                         ds->ds_reserved = ds->ds_quota = 0;
 473                 }
 474 
 475                 if (err == 0) {
 476                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
 477                             dsl_dataset_evict);
 478                 }
 479                 if (err || winner) {
 480                         bplist_destroy(&ds->ds_pending_deadlist);
 481                         dsl_deadlist_close(&ds->ds_deadlist);
 482                         if (ds->ds_prev)
 483                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
 484                         dsl_dir_close(ds->ds_dir, ds);
 485                         mutex_destroy(&ds->ds_lock);
 486                         mutex_destroy(&ds->ds_recvlock);
 487                         mutex_destroy(&ds->ds_opening_lock);
 488                         rw_destroy(&ds->ds_rwlock);
 489                         cv_destroy(&ds->ds_exclusive_cv);
 490                         kmem_free(ds, sizeof (dsl_dataset_t));
 491                         if (err) {
 492                                 dmu_buf_rele(dbuf, tag);
 493                                 return (err);
 494                         }
 495                         ds = winner;
 496                 } else {
 497                         ds->ds_fsid_guid =
 498                             unique_insert(ds->ds_phys->ds_fsid_guid);
 499                 }
 500         }
 501         ASSERT3P(ds->ds_dbuf, ==, dbuf);
 502         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
 503         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
 504             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
 505             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
 506         mutex_enter(&ds->ds_lock);
 507         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 508                 mutex_exit(&ds->ds_lock);
 509                 dmu_buf_rele(ds->ds_dbuf, tag);
 510                 return (ENOENT);
 511         }
 512         mutex_exit(&ds->ds_lock);
 513         *dsp = ds;
 514         return (0);
 515 }
 516 
 517 static int
 518 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
 519 {
 520         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 521 
 522         /*
 523          * In syncing context we don't want the rwlock lock: there
 524          * may be an existing writer waiting for sync phase to
 525          * finish.  We don't need to worry about such writers, since
 526          * sync phase is single-threaded, so the writer can't be
 527          * doing anything while we are active.
 528          */
 529         if (dsl_pool_sync_context(dp)) {
 530                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 531                 return (0);
 532         }
 533 
 534         /*
 535          * Normal users will hold the ds_rwlock as a READER until they
 536          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
 537          * drop their READER lock after they set the ds_owner field.
 538          *
 539          * If the dataset is being destroyed, the destroy thread will
 540          * obtain a WRITER lock for exclusive access after it's done its
 541          * open-context work and then change the ds_owner to
 542          * dsl_reaper once destruction is assured.  So threads
 543          * may block here temporarily, until the "destructability" of
 544          * the dataset is determined.
 545          */
 546         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
 547         mutex_enter(&ds->ds_lock);
 548         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
 549                 rw_exit(&dp->dp_config_rwlock);
 550                 cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
 551                 if (DSL_DATASET_IS_DESTROYED(ds)) {
 552                         mutex_exit(&ds->ds_lock);
 553                         dsl_dataset_drop_ref(ds, tag);
 554                         rw_enter(&dp->dp_config_rwlock, RW_READER);
 555                         return (ENOENT);
 556                 }
 557                 /*
 558                  * The dp_config_rwlock lives above the ds_lock. And
 559                  * we need to check DSL_DATASET_IS_DESTROYED() while
 560                  * holding the ds_lock, so we have to drop and reacquire
 561                  * the ds_lock here.
 562                  */
 563                 mutex_exit(&ds->ds_lock);
 564                 rw_enter(&dp->dp_config_rwlock, RW_READER);
 565                 mutex_enter(&ds->ds_lock);
 566         }
 567         mutex_exit(&ds->ds_lock);
 568         return (0);
 569 }
 570 
 571 int
 572 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
 573     dsl_dataset_t **dsp)
 574 {
 575         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
 576 
 577         if (err)
 578                 return (err);
 579         return (dsl_dataset_hold_ref(*dsp, tag));
 580 }
 581 
 582 int
 583 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
 584     void *tag, dsl_dataset_t **dsp)
 585 {
 586         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
 587         if (err)
 588                 return (err);
 589         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 590                 dsl_dataset_rele(*dsp, tag);
 591                 *dsp = NULL;
 592                 return (EBUSY);
 593         }
 594         return (0);
 595 }
 596 
 597 int
 598 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 599 {
 600         dsl_dir_t *dd;
 601         dsl_pool_t *dp;
 602         const char *snapname;
 603         uint64_t obj;
 604         int err = 0;
 605 
 606         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 607         if (err)
 608                 return (err);
 609 
 610         dp = dd->dd_pool;
 611         obj = dd->dd_phys->dd_head_dataset_obj;
 612         rw_enter(&dp->dp_config_rwlock, RW_READER);
 613         if (obj)
 614                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
 615         else
 616                 err = ENOENT;
 617         if (err)
 618                 goto out;
 619 
 620         err = dsl_dataset_hold_ref(*dsp, tag);
 621 
 622         /* we may be looking for a snapshot */
 623         if (err == 0 && snapname != NULL) {
 624                 dsl_dataset_t *ds = NULL;
 625 
 626                 if (*snapname++ != '@') {
 627                         dsl_dataset_rele(*dsp, tag);
 628                         err = ENOENT;
 629                         goto out;
 630                 }
 631 
 632                 dprintf("looking for snapshot '%s'\n", snapname);
 633                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
 634                 if (err == 0)
 635                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
 636                 dsl_dataset_rele(*dsp, tag);
 637 
 638                 ASSERT3U((err == 0), ==, (ds != NULL));
 639 
 640                 if (ds) {
 641                         mutex_enter(&ds->ds_lock);
 642                         if (ds->ds_snapname[0] == 0)
 643                                 (void) strlcpy(ds->ds_snapname, snapname,
 644                                     sizeof (ds->ds_snapname));
 645                         mutex_exit(&ds->ds_lock);
 646                         err = dsl_dataset_hold_ref(ds, tag);
 647                         *dsp = err ? NULL : ds;
 648                 }
 649         }
 650 out:
 651         rw_exit(&dp->dp_config_rwlock);
 652         dsl_dir_close(dd, FTAG);
 653         return (err);
 654 }
 655 
 656 int
 657 dsl_dataset_own(const char *name, boolean_t inconsistentok,
 658     void *tag, dsl_dataset_t **dsp)
 659 {
 660         int err = dsl_dataset_hold(name, tag, dsp);
 661         if (err)
 662                 return (err);
 663         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
 664                 dsl_dataset_rele(*dsp, tag);
 665                 return (EBUSY);
 666         }
 667         return (0);
 668 }
 669 
 670 void
 671 dsl_dataset_name(dsl_dataset_t *ds, char *name)
 672 {
 673         if (ds == NULL) {
 674                 (void) strcpy(name, "mos");
 675         } else {
 676                 dsl_dir_name(ds->ds_dir, name);
 677                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 678                 if (ds->ds_snapname[0]) {
 679                         (void) strcat(name, "@");
 680                         /*
 681                          * We use a "recursive" mutex so that we
 682                          * can call dprintf_ds() with ds_lock held.
 683                          */
 684                         if (!MUTEX_HELD(&ds->ds_lock)) {
 685                                 mutex_enter(&ds->ds_lock);
 686                                 (void) strcat(name, ds->ds_snapname);
 687                                 mutex_exit(&ds->ds_lock);
 688                         } else {
 689                                 (void) strcat(name, ds->ds_snapname);
 690                         }
 691                 }
 692         }
 693 }
 694 
 695 static int
 696 dsl_dataset_namelen(dsl_dataset_t *ds)
 697 {
 698         int result;
 699 
 700         if (ds == NULL) {
 701                 result = 3;     /* "mos" */
 702         } else {
 703                 result = dsl_dir_namelen(ds->ds_dir);
 704                 VERIFY(0 == dsl_dataset_get_snapname(ds));
 705                 if (ds->ds_snapname[0]) {
 706                         ++result;       /* adding one for the @-sign */
 707                         if (!MUTEX_HELD(&ds->ds_lock)) {
 708                                 mutex_enter(&ds->ds_lock);
 709                                 result += strlen(ds->ds_snapname);
 710                                 mutex_exit(&ds->ds_lock);
 711                         } else {
 712                                 result += strlen(ds->ds_snapname);
 713                         }
 714                 }
 715         }
 716 
 717         return (result);
 718 }
 719 
 720 void
 721 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 722 {
 723         dmu_buf_rele(ds->ds_dbuf, tag);
 724 }
 725 
 726 void
 727 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 728 {
 729         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
 730                 rw_exit(&ds->ds_rwlock);
 731         }
 732         dsl_dataset_drop_ref(ds, tag);
 733 }
 734 
 735 void
 736 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
 737 {
 738         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
 739             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
 740 
 741         mutex_enter(&ds->ds_lock);
 742         ds->ds_owner = NULL;
 743         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
 744                 rw_exit(&ds->ds_rwlock);
 745                 cv_broadcast(&ds->ds_exclusive_cv);
 746         }
 747         mutex_exit(&ds->ds_lock);
 748         if (ds->ds_dbuf)
 749                 dsl_dataset_drop_ref(ds, tag);
 750         else
 751                 dsl_dataset_evict(NULL, ds);
 752 }
 753 
 754 boolean_t
 755 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
 756 {
 757         boolean_t gotit = FALSE;
 758 
 759         mutex_enter(&ds->ds_lock);
 760         if (ds->ds_owner == NULL &&
 761             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
 762                 ds->ds_owner = tag;
 763                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
 764                         rw_exit(&ds->ds_rwlock);
 765                 gotit = TRUE;
 766         }
 767         mutex_exit(&ds->ds_lock);
 768         return (gotit);
 769 }
 770 
 771 void
 772 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
 773 {
 774         ASSERT3P(owner, ==, ds->ds_owner);
 775         if (!RW_WRITE_HELD(&ds->ds_rwlock))
 776                 rw_enter(&ds->ds_rwlock, RW_WRITER);
 777 }
 778 
 779 uint64_t
 780 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
 781     uint64_t flags, dmu_tx_t *tx)
 782 {
 783         dsl_pool_t *dp = dd->dd_pool;
 784         dmu_buf_t *dbuf;
 785         dsl_dataset_phys_t *dsphys;
 786         uint64_t dsobj;
 787         objset_t *mos = dp->dp_meta_objset;
 788 
 789         if (origin == NULL)
 790                 origin = dp->dp_origin_snap;
 791 
 792         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
 793         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
 794         ASSERT(dmu_tx_is_syncing(tx));
 795         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 796 
 797         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
 798             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
 799         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 800         dmu_buf_will_dirty(dbuf, tx);
 801         dsphys = dbuf->db_data;
 802         bzero(dsphys, sizeof (dsl_dataset_phys_t));
 803         dsphys->ds_dir_obj = dd->dd_object;
 804         dsphys->ds_flags = flags;
 805         dsphys->ds_fsid_guid = unique_create();
 806         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
 807             sizeof (dsphys->ds_guid));
 808         dsphys->ds_snapnames_zapobj =
 809             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
 810             DMU_OT_NONE, 0, tx);
 811         dsphys->ds_creation_time = gethrestime_sec();
 812         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
 813 
 814         if (origin == NULL) {
 815                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
 816         } else {
 817                 dsl_dataset_t *ohds;
 818 
 819                 dsphys->ds_prev_snap_obj = origin->ds_object;
 820                 dsphys->ds_prev_snap_txg =
 821                     origin->ds_phys->ds_creation_txg;
 822                 dsphys->ds_referenced_bytes =
 823                     origin->ds_phys->ds_referenced_bytes;
 824                 dsphys->ds_compressed_bytes =
 825                     origin->ds_phys->ds_compressed_bytes;
 826                 dsphys->ds_uncompressed_bytes =
 827                     origin->ds_phys->ds_uncompressed_bytes;
 828                 dsphys->ds_bp = origin->ds_phys->ds_bp;
 829                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
 830 
 831                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
 832                 origin->ds_phys->ds_num_children++;
 833 
 834                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 835                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
 836                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
 837                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
 838                 dsl_dataset_rele(ohds, FTAG);
 839 
 840                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
 841                         if (origin->ds_phys->ds_next_clones_obj == 0) {
 842                                 origin->ds_phys->ds_next_clones_obj =
 843                                     zap_create(mos,
 844                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 845                         }
 846                         VERIFY(0 == zap_add_int(mos,
 847                             origin->ds_phys->ds_next_clones_obj,
 848                             dsobj, tx));
 849                 }
 850 
 851                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
 852                 dd->dd_phys->dd_origin_obj = origin->ds_object;
 853                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
 854                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
 855                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 856                                 origin->ds_dir->dd_phys->dd_clones =
 857                                     zap_create(mos,
 858                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 859                         }
 860                         VERIFY3U(0, ==, zap_add_int(mos,
 861                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 862                 }
 863         }
 864 
 865         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
 866                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
 867 
 868         dmu_buf_rele(dbuf, FTAG);
 869 
 870         dmu_buf_will_dirty(dd->dd_dbuf, tx);
 871         dd->dd_phys->dd_head_dataset_obj = dsobj;
 872 
 873         return (dsobj);
 874 }
 875 
 876 uint64_t
 877 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
 878     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
 879 {
 880         dsl_pool_t *dp = pdd->dd_pool;
 881         uint64_t dsobj, ddobj;
 882         dsl_dir_t *dd;
 883 
 884         ASSERT(lastname[0] != '@');
 885 
 886         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
 887         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
 888 
 889         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
 890 
 891         dsl_deleg_set_create_perms(dd, tx, cr);
 892 
 893         dsl_dir_close(dd, FTAG);
 894 
 895         /*
 896          * If we are creating a clone, make sure we zero out any stale
 897          * data from the origin snapshots zil header.
 898          */
 899         if (origin != NULL) {
 900                 dsl_dataset_t *ds;
 901                 objset_t *os;
 902 
 903                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 904                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
 905                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
 906                 dsl_dataset_dirty(ds, tx);
 907                 dsl_dataset_rele(ds, FTAG);
 908         }
 909 
 910         return (dsobj);
 911 }
 912 
 913 /*
 914  * The snapshots must all be in the same pool.
 915  */
 916 int
 917 dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer, char *failed)
 918 {
 919         int err;
 920         dsl_sync_task_t *dst;
 921         spa_t *spa;
 922         nvpair_t *pair;
 923         dsl_sync_task_group_t *dstg;
 924 
 925         pair = nvlist_next_nvpair(snaps, NULL);
 926         if (pair == NULL)
 927                 return (0);
 928 
 929         err = spa_open(nvpair_name(pair), &spa, FTAG);
 930         if (err)
 931                 return (err);
 932         dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 933 
 934         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
 935             pair = nvlist_next_nvpair(snaps, pair)) {
 936                 dsl_dataset_t *ds;
 937 
 938                 err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
 939                 if (err == 0) {
 940                         struct dsl_ds_destroyarg *dsda;
 941 
 942                         dsl_dataset_make_exclusive(ds, dstg);
 943                         dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
 944                             KM_SLEEP);
 945                         dsda->ds = ds;
 946                         dsda->defer = defer;
 947                         dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 948                             dsl_dataset_destroy_sync, dsda, dstg, 0);
 949                 } else if (err == ENOENT) {
 950                         err = 0;
 951                 } else {
 952                         (void) strcpy(failed, nvpair_name(pair));
 953                         break;
 954                 }
 955         }
 956 
 957         if (err == 0)
 958                 err = dsl_sync_task_group_wait(dstg);
 959 
 960         for (dst = list_head(&dstg->dstg_tasks); dst;
 961             dst = list_next(&dstg->dstg_tasks, dst)) {
 962                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
 963                 dsl_dataset_t *ds = dsda->ds;
 964 
 965                 /*
 966                  * Return the file system name that triggered the error
 967                  */
 968                 if (dst->dst_err) {
 969                         dsl_dataset_name(ds, failed);
 970                 }
 971                 ASSERT3P(dsda->rm_origin, ==, NULL);
 972                 dsl_dataset_disown(ds, dstg);
 973                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
 974         }
 975 
 976         dsl_sync_task_group_destroy(dstg);
 977         spa_close(spa, FTAG);
 978         return (err);
 979 
 980 }
 981 
 982 static boolean_t
 983 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
 984 {
 985         boolean_t might_destroy = B_FALSE;
 986 
 987         mutex_enter(&ds->ds_lock);
 988         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
 989             DS_IS_DEFER_DESTROY(ds))
 990                 might_destroy = B_TRUE;
 991         mutex_exit(&ds->ds_lock);
 992 
 993         return (might_destroy);
 994 }
 995 
 996 /*
 997  * If we're removing a clone, and these three conditions are true:
 998  *      1) the clone's origin has no other children
 999  *      2) the clone's origin has no user references
1000  *      3) the clone's origin has been marked for deferred destruction
1001  * Then, prepare to remove the origin as part of this sync task group.
1002  */
1003 static int
1004 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1005 {
1006         dsl_dataset_t *ds = dsda->ds;
1007         dsl_dataset_t *origin = ds->ds_prev;
1008 
1009         if (dsl_dataset_might_destroy_origin(origin)) {
1010                 char *name;
1011                 int namelen;
1012                 int error;
1013 
1014                 namelen = dsl_dataset_namelen(origin) + 1;
1015                 name = kmem_alloc(namelen, KM_SLEEP);
1016                 dsl_dataset_name(origin, name);
1017 #ifdef _KERNEL
1018                 error = zfs_unmount_snap(name, NULL);
1019                 if (error) {
1020                         kmem_free(name, namelen);
1021                         return (error);
1022                 }
1023 #endif
1024                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1025                 kmem_free(name, namelen);
1026                 if (error)
1027                         return (error);
1028                 dsda->rm_origin = origin;
1029                 dsl_dataset_make_exclusive(origin, tag);
1030         }
1031 
1032         return (0);
1033 }
1034 
1035 /*
1036  * ds must be opened as OWNER.  On return (whether successful or not),
1037  * ds will be closed and caller can no longer dereference it.
1038  */
1039 int
1040 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1041 {
1042         int err;
1043         dsl_sync_task_group_t *dstg;
1044         objset_t *os;
1045         dsl_dir_t *dd;
1046         uint64_t obj;
1047         struct dsl_ds_destroyarg dsda = { 0 };
1048         dsl_dataset_t dummy_ds = { 0 };
1049 
1050         dsda.ds = ds;
1051 
1052         if (dsl_dataset_is_snapshot(ds)) {
1053                 /* Destroying a snapshot is simpler */
1054                 dsl_dataset_make_exclusive(ds, tag);
1055 
1056                 dsda.defer = defer;
1057                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1058                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1059                     &dsda, tag, 0);
1060                 ASSERT3P(dsda.rm_origin, ==, NULL);
1061                 goto out;
1062         } else if (defer) {
1063                 err = EINVAL;
1064                 goto out;
1065         }
1066 
1067         dd = ds->ds_dir;
1068         dummy_ds.ds_dir = dd;
1069         dummy_ds.ds_object = ds->ds_object;
1070 
1071         /*
1072          * Check for errors and mark this ds as inconsistent, in
1073          * case we crash while freeing the objects.
1074          */
1075         err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1076             dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1077         if (err)
1078                 goto out;
1079 
1080         err = dmu_objset_from_ds(ds, &os);
1081         if (err)
1082                 goto out;
1083 
1084         /*
1085          * If async destruction is not enabled try to remove all objects
1086          * while in the open context so that there is less work to do in
1087          * the syncing context.
1088          */
1089         if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1090             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1091                 for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1092                     ds->ds_phys->ds_prev_snap_txg)) {
1093                         /*
1094                          * Ignore errors, if there is not enough disk space
1095                          * we will deal with it in dsl_dataset_destroy_sync().
1096                          */
1097                         (void) dmu_free_object(os, obj);
1098                 }
1099                 if (err != ESRCH)
1100                         goto out;
1101         }
1102 
1103         /*
1104          * Only the ZIL knows how to free log blocks.
1105          */
1106         zil_destroy(dmu_objset_zil(os), B_FALSE);
1107 
1108         /*
1109          * Sync out all in-flight IO.
1110          */
1111         txg_wait_synced(dd->dd_pool, 0);
1112 
1113         /*
1114          * If we managed to free all the objects in open
1115          * context, the user space accounting should be zero.
1116          */
1117         if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1118             dmu_objset_userused_enabled(os)) {
1119                 uint64_t count;
1120 
1121                 ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1122                     count == 0);
1123                 ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1124                     count == 0);
1125         }
1126 
1127         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1128         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1129         rw_exit(&dd->dd_pool->dp_config_rwlock);
1130 
1131         if (err)
1132                 goto out;
1133 
1134         /*
1135          * Blow away the dsl_dir + head dataset.
1136          */
1137         dsl_dataset_make_exclusive(ds, tag);
1138         /*
1139          * If we're removing a clone, we might also need to remove its
1140          * origin.
1141          */
1142         do {
1143                 dsda.need_prep = B_FALSE;
1144                 if (dsl_dir_is_clone(dd)) {
1145                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1146                         if (err) {
1147                                 dsl_dir_close(dd, FTAG);
1148                                 goto out;
1149                         }
1150                 }
1151 
1152                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1153                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1154                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1155                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1156                     dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1157                 err = dsl_sync_task_group_wait(dstg);
1158                 dsl_sync_task_group_destroy(dstg);
1159 
1160                 /*
1161                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1162                  * on the origin snap, in which case we can get EBUSY if we
1163                  * needed to destroy the origin snap but were not ready to
1164                  * do so.
1165                  */
1166                 if (dsda.need_prep) {
1167                         ASSERT(err == EBUSY);
1168                         ASSERT(dsl_dir_is_clone(dd));
1169                         ASSERT(dsda.rm_origin == NULL);
1170                 }
1171         } while (dsda.need_prep);
1172 
1173         if (dsda.rm_origin != NULL)
1174                 dsl_dataset_disown(dsda.rm_origin, tag);
1175 
1176         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1177         if (err)
1178                 dsl_dir_close(dd, FTAG);
1179 out:
1180         dsl_dataset_disown(ds, tag);
1181         return (err);
1182 }
1183 
1184 blkptr_t *
1185 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1186 {
1187         return (&ds->ds_phys->ds_bp);
1188 }
1189 
1190 void
1191 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1192 {
1193         ASSERT(dmu_tx_is_syncing(tx));
1194         /* If it's the meta-objset, set dp_meta_rootbp */
1195         if (ds == NULL) {
1196                 tx->tx_pool->dp_meta_rootbp = *bp;
1197         } else {
1198                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1199                 ds->ds_phys->ds_bp = *bp;
1200         }
1201 }
1202 
1203 spa_t *
1204 dsl_dataset_get_spa(dsl_dataset_t *ds)
1205 {
1206         return (ds->ds_dir->dd_pool->dp_spa);
1207 }
1208 
1209 void
1210 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1211 {
1212         dsl_pool_t *dp;
1213 
1214         if (ds == NULL) /* this is the meta-objset */
1215                 return;
1216 
1217         ASSERT(ds->ds_objset != NULL);
1218 
1219         if (ds->ds_phys->ds_next_snap_obj != 0)
1220                 panic("dirtying snapshot!");
1221 
1222         dp = ds->ds_dir->dd_pool;
1223 
1224         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1225                 /* up the hold count until we can be written out */
1226                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1227         }
1228 }
1229 
1230 /*
1231  * The unique space in the head dataset can be calculated by subtracting
1232  * the space used in the most recent snapshot, that is still being used
1233  * in this file system, from the space currently in use.  To figure out
1234  * the space in the most recent snapshot still in use, we need to take
1235  * the total space used in the snapshot and subtract out the space that
1236  * has been freed up since the snapshot was taken.
1237  */
1238 static void
1239 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1240 {
1241         uint64_t mrs_used;
1242         uint64_t dlused, dlcomp, dluncomp;
1243 
1244         ASSERT(!dsl_dataset_is_snapshot(ds));
1245 
1246         if (ds->ds_phys->ds_prev_snap_obj != 0)
1247                 mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1248         else
1249                 mrs_used = 0;
1250 
1251         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1252 
1253         ASSERT3U(dlused, <=, mrs_used);
1254         ds->ds_phys->ds_unique_bytes =
1255             ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1256 
1257         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1258             SPA_VERSION_UNIQUE_ACCURATE)
1259                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1260 }
1261 
1262 struct killarg {
1263         dsl_dataset_t *ds;
1264         dmu_tx_t *tx;
1265 };
1266 
1267 /* ARGSUSED */
1268 static int
1269 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1270     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1271 {
1272         struct killarg *ka = arg;
1273         dmu_tx_t *tx = ka->tx;
1274 
1275         if (bp == NULL)
1276                 return (0);
1277 
1278         if (zb->zb_level == ZB_ZIL_LEVEL) {
1279                 ASSERT(zilog != NULL);
1280                 /*
1281                  * It's a block in the intent log.  It has no
1282                  * accounting, so just free it.
1283                  */
1284                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1285         } else {
1286                 ASSERT(zilog == NULL);
1287                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1288                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1289         }
1290 
1291         return (0);
1292 }
1293 
1294 /* ARGSUSED */
1295 static int
1296 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1297 {
1298         dsl_dataset_t *ds = arg1;
1299         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1300         uint64_t count;
1301         int err;
1302 
1303         /*
1304          * Can't delete a head dataset if there are snapshots of it.
1305          * (Except if the only snapshots are from the branch we cloned
1306          * from.)
1307          */
1308         if (ds->ds_prev != NULL &&
1309             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1310                 return (EBUSY);
1311 
1312         /*
1313          * This is really a dsl_dir thing, but check it here so that
1314          * we'll be less likely to leave this dataset inconsistent &
1315          * nearly destroyed.
1316          */
1317         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1318         if (err)
1319                 return (err);
1320         if (count != 0)
1321                 return (EEXIST);
1322 
1323         return (0);
1324 }
1325 
1326 /* ARGSUSED */
1327 static void
1328 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1329 {
1330         dsl_dataset_t *ds = arg1;
1331         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1332 
1333         /* Mark it as inconsistent on-disk, in case we crash */
1334         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1335         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1336 
1337         spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1338             "dataset = %llu", ds->ds_object);
1339 }
1340 
1341 static int
1342 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1343     dmu_tx_t *tx)
1344 {
1345         dsl_dataset_t *ds = dsda->ds;
1346         dsl_dataset_t *ds_prev = ds->ds_prev;
1347 
1348         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1349                 struct dsl_ds_destroyarg ndsda = {0};
1350 
1351                 /*
1352                  * If we're not prepared to remove the origin, don't remove
1353                  * the clone either.
1354                  */
1355                 if (dsda->rm_origin == NULL) {
1356                         dsda->need_prep = B_TRUE;
1357                         return (EBUSY);
1358                 }
1359 
1360                 ndsda.ds = ds_prev;
1361                 ndsda.is_origin_rm = B_TRUE;
1362                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1363         }
1364 
1365         /*
1366          * If we're not going to remove the origin after all,
1367          * undo the open context setup.
1368          */
1369         if (dsda->rm_origin != NULL) {
1370                 dsl_dataset_disown(dsda->rm_origin, tag);
1371                 dsda->rm_origin = NULL;
1372         }
1373 
1374         return (0);
1375 }
1376 
1377 /*
1378  * If you add new checks here, you may need to add
1379  * additional checks to the "temporary" case in
1380  * snapshot_check() in dmu_objset.c.
1381  */
1382 /* ARGSUSED */
1383 int
1384 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1385 {
1386         struct dsl_ds_destroyarg *dsda = arg1;
1387         dsl_dataset_t *ds = dsda->ds;
1388 
1389         /* we have an owner hold, so noone else can destroy us */
1390         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1391 
1392         /*
1393          * Only allow deferred destroy on pools that support it.
1394          * NOTE: deferred destroy is only supported on snapshots.
1395          */
1396         if (dsda->defer) {
1397                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1398                     SPA_VERSION_USERREFS)
1399                         return (ENOTSUP);
1400                 ASSERT(dsl_dataset_is_snapshot(ds));
1401                 return (0);
1402         }
1403 
1404         /*
1405          * Can't delete a head dataset if there are snapshots of it.
1406          * (Except if the only snapshots are from the branch we cloned
1407          * from.)
1408          */
1409         if (ds->ds_prev != NULL &&
1410             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1411                 return (EBUSY);
1412 
1413         /*
1414          * If we made changes this txg, traverse_dsl_dataset won't find
1415          * them.  Try again.
1416          */
1417         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1418                 return (EAGAIN);
1419 
1420         if (dsl_dataset_is_snapshot(ds)) {
1421                 /*
1422                  * If this snapshot has an elevated user reference count,
1423                  * we can't destroy it yet.
1424                  */
1425                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1426                         return (EBUSY);
1427 
1428                 mutex_enter(&ds->ds_lock);
1429                 /*
1430                  * Can't delete a branch point. However, if we're destroying
1431                  * a clone and removing its origin due to it having a user
1432                  * hold count of 0 and having been marked for deferred destroy,
1433                  * it's OK for the origin to have a single clone.
1434                  */
1435                 if (ds->ds_phys->ds_num_children >
1436                     (dsda->is_origin_rm ? 2 : 1)) {
1437                         mutex_exit(&ds->ds_lock);
1438                         return (EEXIST);
1439                 }
1440                 mutex_exit(&ds->ds_lock);
1441         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1442                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1443         }
1444 
1445         /* XXX we should do some i/o error checking... */
1446         return (0);
1447 }
1448 
1449 struct refsarg {
1450         kmutex_t lock;
1451         boolean_t gone;
1452         kcondvar_t cv;
1453 };
1454 
1455 /* ARGSUSED */
1456 static void
1457 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1458 {
1459         struct refsarg *arg = argv;
1460 
1461         mutex_enter(&arg->lock);
1462         arg->gone = TRUE;
1463         cv_signal(&arg->cv);
1464         mutex_exit(&arg->lock);
1465 }
1466 
1467 static void
1468 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1469 {
1470         struct refsarg arg;
1471 
1472         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1473         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1474         arg.gone = FALSE;
1475         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1476             dsl_dataset_refs_gone);
1477         dmu_buf_rele(ds->ds_dbuf, tag);
1478         mutex_enter(&arg.lock);
1479         while (!arg.gone)
1480                 cv_wait(&arg.cv, &arg.lock);
1481         ASSERT(arg.gone);
1482         mutex_exit(&arg.lock);
1483         ds->ds_dbuf = NULL;
1484         ds->ds_phys = NULL;
1485         mutex_destroy(&arg.lock);
1486         cv_destroy(&arg.cv);
1487 }
1488 
1489 static void
1490 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1491 {
1492         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1493         uint64_t count;
1494         int err;
1495 
1496         ASSERT(ds->ds_phys->ds_num_children >= 2);
1497         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1498         /*
1499          * The err should not be ENOENT, but a bug in a previous version
1500          * of the code could cause upgrade_clones_cb() to not set
1501          * ds_next_snap_obj when it should, leading to a missing entry.
1502          * If we knew that the pool was created after
1503          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1504          * ENOENT.  However, at least we can check that we don't have
1505          * too many entries in the next_clones_obj even after failing to
1506          * remove this one.
1507          */
1508         if (err != ENOENT) {
1509                 VERIFY3U(err, ==, 0);
1510         }
1511         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1512             &count));
1513         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1514 }
1515 
1516 static void
1517 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1518 {
1519         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1520         zap_cursor_t zc;
1521         zap_attribute_t za;
1522 
1523         /*
1524          * If it is the old version, dd_clones doesn't exist so we can't
1525          * find the clones, but deadlist_remove_key() is a no-op so it
1526          * doesn't matter.
1527          */
1528         if (ds->ds_dir->dd_phys->dd_clones == 0)
1529                 return;
1530 
1531         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1532             zap_cursor_retrieve(&zc, &za) == 0;
1533             zap_cursor_advance(&zc)) {
1534                 dsl_dataset_t *clone;
1535 
1536                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1537                     za.za_first_integer, FTAG, &clone));
1538                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1539                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1540                             mintxg, tx);
1541                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1542                 }
1543                 dsl_dataset_rele(clone, FTAG);
1544         }
1545         zap_cursor_fini(&zc);
1546 }
1547 
1548 struct process_old_arg {
1549         dsl_dataset_t *ds;
1550         dsl_dataset_t *ds_prev;
1551         boolean_t after_branch_point;
1552         zio_t *pio;
1553         uint64_t used, comp, uncomp;
1554 };
1555 
1556 static int
1557 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1558 {
1559         struct process_old_arg *poa = arg;
1560         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1561 
1562         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1563                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1564                 if (poa->ds_prev && !poa->after_branch_point &&
1565                     bp->blk_birth >
1566                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1567                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1568                             bp_get_dsize_sync(dp->dp_spa, bp);
1569                 }
1570         } else {
1571                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1572                 poa->comp += BP_GET_PSIZE(bp);
1573                 poa->uncomp += BP_GET_UCSIZE(bp);
1574                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1575         }
1576         return (0);
1577 }
1578 
1579 static void
1580 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1581     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1582 {
1583         struct process_old_arg poa = { 0 };
1584         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1585         objset_t *mos = dp->dp_meta_objset;
1586 
1587         ASSERT(ds->ds_deadlist.dl_oldfmt);
1588         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1589 
1590         poa.ds = ds;
1591         poa.ds_prev = ds_prev;
1592         poa.after_branch_point = after_branch_point;
1593         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1594         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1595             process_old_cb, &poa, tx));
1596         VERIFY3U(zio_wait(poa.pio), ==, 0);
1597         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1598 
1599         /* change snapused */
1600         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1601             -poa.used, -poa.comp, -poa.uncomp, tx);
1602 
1603         /* swap next's deadlist to our deadlist */
1604         dsl_deadlist_close(&ds->ds_deadlist);
1605         dsl_deadlist_close(&ds_next->ds_deadlist);
1606         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1607             ds->ds_phys->ds_deadlist_obj);
1608         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1609         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1610             ds_next->ds_phys->ds_deadlist_obj);
1611 }
1612 
1613 static int
1614 old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1615 {
1616         int err;
1617         struct killarg ka;
1618 
1619         /*
1620          * Free everything that we point to (that's born after
1621          * the previous snapshot, if we are a clone)
1622          *
1623          * NB: this should be very quick, because we already
1624          * freed all the objects in open context.
1625          */
1626         ka.ds = ds;
1627         ka.tx = tx;
1628         err = traverse_dataset(ds,
1629             ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1630             kill_blkptr, &ka);
1631         ASSERT3U(err, ==, 0);
1632         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1633 
1634         return (err);
1635 }
1636 
1637 void
1638 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1639 {
1640         struct dsl_ds_destroyarg *dsda = arg1;
1641         dsl_dataset_t *ds = dsda->ds;
1642         int err;
1643         int after_branch_point = FALSE;
1644         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1645         objset_t *mos = dp->dp_meta_objset;
1646         dsl_dataset_t *ds_prev = NULL;
1647         boolean_t wont_destroy;
1648         uint64_t obj;
1649 
1650         wont_destroy = (dsda->defer &&
1651             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1652 
1653         ASSERT(ds->ds_owner || wont_destroy);
1654         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1655         ASSERT(ds->ds_prev == NULL ||
1656             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1657         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1658 
1659         if (wont_destroy) {
1660                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1661                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1662                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1663                 return;
1664         }
1665 
1666         /* signal any waiters that this dataset is going away */
1667         mutex_enter(&ds->ds_lock);
1668         ds->ds_owner = dsl_reaper;
1669         cv_broadcast(&ds->ds_exclusive_cv);
1670         mutex_exit(&ds->ds_lock);
1671 
1672         /* Remove our reservation */
1673         if (ds->ds_reserved != 0) {
1674                 dsl_prop_setarg_t psa;
1675                 uint64_t value = 0;
1676 
1677                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1678                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1679                     &value);
1680                 psa.psa_effective_value = 0;    /* predict default value */
1681 
1682                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1683                 ASSERT3U(ds->ds_reserved, ==, 0);
1684         }
1685 
1686         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1687 
1688         dsl_scan_ds_destroyed(ds, tx);
1689 
1690         obj = ds->ds_object;
1691 
1692         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1693                 if (ds->ds_prev) {
1694                         ds_prev = ds->ds_prev;
1695                 } else {
1696                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1697                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1698                 }
1699                 after_branch_point =
1700                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1701 
1702                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1703                 if (after_branch_point &&
1704                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1705                         remove_from_next_clones(ds_prev, obj, tx);
1706                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1707                                 VERIFY(0 == zap_add_int(mos,
1708                                     ds_prev->ds_phys->ds_next_clones_obj,
1709                                     ds->ds_phys->ds_next_snap_obj, tx));
1710                         }
1711                 }
1712                 if (after_branch_point &&
1713                     ds->ds_phys->ds_next_snap_obj == 0) {
1714                         /* This clone is toast. */
1715                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1716                         ds_prev->ds_phys->ds_num_children--;
1717 
1718                         /*
1719                          * If the clone's origin has no other clones, no
1720                          * user holds, and has been marked for deferred
1721                          * deletion, then we should have done the necessary
1722                          * destroy setup for it.
1723                          */
1724                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1725                             ds_prev->ds_userrefs == 0 &&
1726                             DS_IS_DEFER_DESTROY(ds_prev)) {
1727                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1728                         } else {
1729                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1730                         }
1731                 } else if (!after_branch_point) {
1732                         ds_prev->ds_phys->ds_next_snap_obj =
1733                             ds->ds_phys->ds_next_snap_obj;
1734                 }
1735         }
1736 
1737         if (dsl_dataset_is_snapshot(ds)) {
1738                 dsl_dataset_t *ds_next;
1739                 uint64_t old_unique;
1740                 uint64_t used = 0, comp = 0, uncomp = 0;
1741 
1742                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1743                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1744                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1745 
1746                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1747 
1748                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1749                 ds_next->ds_phys->ds_prev_snap_obj =
1750                     ds->ds_phys->ds_prev_snap_obj;
1751                 ds_next->ds_phys->ds_prev_snap_txg =
1752                     ds->ds_phys->ds_prev_snap_txg;
1753                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1754                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1755 
1756 
1757                 if (ds_next->ds_deadlist.dl_oldfmt) {
1758                         process_old_deadlist(ds, ds_prev, ds_next,
1759                             after_branch_point, tx);
1760                 } else {
1761                         /* Adjust prev's unique space. */
1762                         if (ds_prev && !after_branch_point) {
1763                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1764                                     ds_prev->ds_phys->ds_prev_snap_txg,
1765                                     ds->ds_phys->ds_prev_snap_txg,
1766                                     &used, &comp, &uncomp);
1767                                 ds_prev->ds_phys->ds_unique_bytes += used;
1768                         }
1769 
1770                         /* Adjust snapused. */
1771                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1772                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1773                             &used, &comp, &uncomp);
1774                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1775                             -used, -comp, -uncomp, tx);
1776 
1777                         /* Move blocks to be freed to pool's free list. */
1778                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1779                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1780                             tx);
1781                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1782                             DD_USED_HEAD, used, comp, uncomp, tx);
1783 
1784                         /* Merge our deadlist into next's and free it. */
1785                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1786                             ds->ds_phys->ds_deadlist_obj, tx);
1787                 }
1788                 dsl_deadlist_close(&ds->ds_deadlist);
1789                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1790 
1791                 /* Collapse range in clone heads */
1792                 dsl_dataset_remove_clones_key(ds,
1793                     ds->ds_phys->ds_creation_txg, tx);
1794 
1795                 if (dsl_dataset_is_snapshot(ds_next)) {
1796                         dsl_dataset_t *ds_nextnext;
1797 
1798                         /*
1799                          * Update next's unique to include blocks which
1800                          * were previously shared by only this snapshot
1801                          * and it.  Those blocks will be born after the
1802                          * prev snap and before this snap, and will have
1803                          * died after the next snap and before the one
1804                          * after that (ie. be on the snap after next's
1805                          * deadlist).
1806                          */
1807                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1808                             ds_next->ds_phys->ds_next_snap_obj,
1809                             FTAG, &ds_nextnext));
1810                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1811                             ds->ds_phys->ds_prev_snap_txg,
1812                             ds->ds_phys->ds_creation_txg,
1813                             &used, &comp, &uncomp);
1814                         ds_next->ds_phys->ds_unique_bytes += used;
1815                         dsl_dataset_rele(ds_nextnext, FTAG);
1816                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1817 
1818                         /* Collapse range in this head. */
1819                         dsl_dataset_t *hds;
1820                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1821                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1822                             FTAG, &hds));
1823                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1824                             ds->ds_phys->ds_creation_txg, tx);
1825                         dsl_dataset_rele(hds, FTAG);
1826 
1827                 } else {
1828                         ASSERT3P(ds_next->ds_prev, ==, ds);
1829                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1830                         ds_next->ds_prev = NULL;
1831                         if (ds_prev) {
1832                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1833                                     ds->ds_phys->ds_prev_snap_obj,
1834                                     ds_next, &ds_next->ds_prev));
1835                         }
1836 
1837                         dsl_dataset_recalc_head_uniq(ds_next);
1838 
1839                         /*
1840                          * Reduce the amount of our unconsmed refreservation
1841                          * being charged to our parent by the amount of
1842                          * new unique data we have gained.
1843                          */
1844                         if (old_unique < ds_next->ds_reserved) {
1845                                 int64_t mrsdelta;
1846                                 uint64_t new_unique =
1847                                     ds_next->ds_phys->ds_unique_bytes;
1848 
1849                                 ASSERT(old_unique <= new_unique);
1850                                 mrsdelta = MIN(new_unique - old_unique,
1851                                     ds_next->ds_reserved - old_unique);
1852                                 dsl_dir_diduse_space(ds->ds_dir,
1853                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1854                         }
1855                 }
1856                 dsl_dataset_rele(ds_next, FTAG);
1857         } else {
1858                 zfeature_info_t *async_destroy =
1859                     &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1860 
1861                 /*
1862                  * There's no next snapshot, so this is a head dataset.
1863                  * Destroy the deadlist.  Unless it's a clone, the
1864                  * deadlist should be empty.  (If it's a clone, it's
1865                  * safe to ignore the deadlist contents.)
1866                  */
1867                 dsl_deadlist_close(&ds->ds_deadlist);
1868                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1869                 ds->ds_phys->ds_deadlist_obj = 0;
1870 
1871                 if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1872                         err = old_synchronous_dataset_destroy(ds, tx);
1873                 } else {
1874                         /*
1875                          * Move the bptree into the pool's list of trees to
1876                          * clean up and update space accounting information.
1877                          */
1878                         uint64_t used, comp, uncomp;
1879 
1880                         ASSERT(err == 0 || err == EBUSY);
1881                         if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1882                                 spa_feature_incr(dp->dp_spa, async_destroy, tx);
1883                                 dp->dp_bptree_obj = bptree_alloc(
1884                                     dp->dp_meta_objset, tx);
1885                                 VERIFY(zap_add(dp->dp_meta_objset,
1886                                     DMU_POOL_DIRECTORY_OBJECT,
1887                                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1888                                     &dp->dp_bptree_obj, tx) == 0);
1889                         }
1890 
1891                         used = ds->ds_dir->dd_phys->dd_used_bytes;
1892                         comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1893                         uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1894 
1895                         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1896                             ds->ds_phys->ds_unique_bytes == used);
1897 
1898                         bptree_add(dp->dp_meta_objset, dp->dp_bptree_obj,
1899                             &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1900                             used, comp, uncomp, tx);
1901                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1902                             -used, -comp, -uncomp, tx);
1903                         dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1904                             used, comp, uncomp, tx);
1905                 }
1906 
1907                 if (ds->ds_prev != NULL) {
1908                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1909                                 VERIFY3U(0, ==, zap_remove_int(mos,
1910                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1911                                     ds->ds_object, tx));
1912                         }
1913                         dsl_dataset_rele(ds->ds_prev, ds);
1914                         ds->ds_prev = ds_prev = NULL;
1915                 }
1916         }
1917 
1918         /*
1919          * This must be done after the dsl_traverse(), because it will
1920          * re-open the objset.
1921          */
1922         if (ds->ds_objset) {
1923                 dmu_objset_evict(ds->ds_objset);
1924                 ds->ds_objset = NULL;
1925         }
1926 
1927         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1928                 /* Erase the link in the dir */
1929                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1930                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1931                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1932                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1933                 ASSERT(err == 0);
1934         } else {
1935                 /* remove from snapshot namespace */
1936                 dsl_dataset_t *ds_head;
1937                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1938                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1939                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1940                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1941 #ifdef ZFS_DEBUG
1942                 {
1943                         uint64_t val;
1944 
1945                         err = dsl_dataset_snap_lookup(ds_head,
1946                             ds->ds_snapname, &val);
1947                         ASSERT3U(err, ==, 0);
1948                         ASSERT3U(val, ==, obj);
1949                 }
1950 #endif
1951                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1952                 ASSERT(err == 0);
1953                 dsl_dataset_rele(ds_head, FTAG);
1954         }
1955 
1956         if (ds_prev && ds->ds_prev != ds_prev)
1957                 dsl_dataset_rele(ds_prev, FTAG);
1958 
1959         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1960         spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
1961             "dataset = %llu", ds->ds_object);
1962 
1963         if (ds->ds_phys->ds_next_clones_obj != 0) {
1964                 uint64_t count;
1965                 ASSERT(0 == zap_count(mos,
1966                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1967                 VERIFY(0 == dmu_object_free(mos,
1968                     ds->ds_phys->ds_next_clones_obj, tx));
1969         }
1970         if (ds->ds_phys->ds_props_obj != 0)
1971                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1972         if (ds->ds_phys->ds_userrefs_obj != 0)
1973                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1974         dsl_dir_close(ds->ds_dir, ds);
1975         ds->ds_dir = NULL;
1976         dsl_dataset_drain_refs(ds, tag);
1977         VERIFY(0 == dmu_object_free(mos, obj, tx));
1978 
1979         if (dsda->rm_origin) {
1980                 /*
1981                  * Remove the origin of the clone we just destroyed.
1982                  */
1983                 struct dsl_ds_destroyarg ndsda = {0};
1984 
1985                 ndsda.ds = dsda->rm_origin;
1986                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
1987         }
1988 }
1989 
1990 static int
1991 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1992 {
1993         uint64_t asize;
1994 
1995         if (!dmu_tx_is_syncing(tx))
1996                 return (0);
1997 
1998         /*
1999          * If there's an fs-only reservation, any blocks that might become
2000          * owned by the snapshot dataset must be accommodated by space
2001          * outside of the reservation.
2002          */
2003         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2004         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2005         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2006                 return (ENOSPC);
2007 
2008         /*
2009          * Propogate any reserved space for this snapshot to other
2010          * snapshot checks in this sync group.
2011          */
2012         if (asize > 0)
2013                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2014 
2015         return (0);
2016 }
2017 
2018 int
2019 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
2020 {
2021         dsl_dataset_t *ds = arg1;
2022         const char *snapname = arg2;
2023         int err;
2024         uint64_t value;
2025 
2026         /*
2027          * We don't allow multiple snapshots of the same txg.  If there
2028          * is already one, try again.
2029          */
2030         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2031                 return (EAGAIN);
2032 
2033         /*
2034          * Check for conflicting name snapshot name.
2035          */
2036         err = dsl_dataset_snap_lookup(ds, snapname, &value);
2037         if (err == 0)
2038                 return (EEXIST);
2039         if (err != ENOENT)
2040                 return (err);
2041 
2042         /*
2043          * Check that the dataset's name is not too long.  Name consists
2044          * of the dataset's length + 1 for the @-sign + snapshot name's length
2045          */
2046         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2047                 return (ENAMETOOLONG);
2048 
2049         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2050         if (err)
2051                 return (err);
2052 
2053         ds->ds_trysnap_txg = tx->tx_txg;
2054         return (0);
2055 }
2056 
2057 void
2058 dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2059 {
2060         dsl_dataset_t *ds = arg1;
2061         const char *snapname = arg2;
2062         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2063         dmu_buf_t *dbuf;
2064         dsl_dataset_phys_t *dsphys;
2065         uint64_t dsobj, crtxg;
2066         objset_t *mos = dp->dp_meta_objset;
2067         int err;
2068 
2069         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2070 
2071         /*
2072          * The origin's ds_creation_txg has to be < TXG_INITIAL
2073          */
2074         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2075                 crtxg = 1;
2076         else
2077                 crtxg = tx->tx_txg;
2078 
2079         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2080             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2081         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2082         dmu_buf_will_dirty(dbuf, tx);
2083         dsphys = dbuf->db_data;
2084         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2085         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2086         dsphys->ds_fsid_guid = unique_create();
2087         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2088             sizeof (dsphys->ds_guid));
2089         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2090         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2091         dsphys->ds_next_snap_obj = ds->ds_object;
2092         dsphys->ds_num_children = 1;
2093         dsphys->ds_creation_time = gethrestime_sec();
2094         dsphys->ds_creation_txg = crtxg;
2095         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2096         dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2097         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2098         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2099         dsphys->ds_flags = ds->ds_phys->ds_flags;
2100         dsphys->ds_bp = ds->ds_phys->ds_bp;
2101         dmu_buf_rele(dbuf, FTAG);
2102 
2103         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2104         if (ds->ds_prev) {
2105                 uint64_t next_clones_obj =
2106                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2107                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2108                     ds->ds_object ||
2109                     ds->ds_prev->ds_phys->ds_num_children > 1);
2110                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2111                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2112                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2113                             ds->ds_prev->ds_phys->ds_creation_txg);
2114                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2115                 } else if (next_clones_obj != 0) {
2116                         remove_from_next_clones(ds->ds_prev,
2117                             dsphys->ds_next_snap_obj, tx);
2118                         VERIFY3U(0, ==, zap_add_int(mos,
2119                             next_clones_obj, dsobj, tx));
2120                 }
2121         }
2122 
2123         /*
2124          * If we have a reference-reservation on this dataset, we will
2125          * need to increase the amount of refreservation being charged
2126          * since our unique space is going to zero.
2127          */
2128         if (ds->ds_reserved) {
2129                 int64_t delta;
2130                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2131                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2132                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2133                     delta, 0, 0, tx);
2134         }
2135 
2136         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2137         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2138             ds->ds_dir->dd_myname, snapname, dsobj,
2139             ds->ds_phys->ds_prev_snap_txg);
2140         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2141             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2142         dsl_deadlist_close(&ds->ds_deadlist);
2143         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2144         dsl_deadlist_add_key(&ds->ds_deadlist,
2145             ds->ds_phys->ds_prev_snap_txg, tx);
2146 
2147         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2148         ds->ds_phys->ds_prev_snap_obj = dsobj;
2149         ds->ds_phys->ds_prev_snap_txg = crtxg;
2150         ds->ds_phys->ds_unique_bytes = 0;
2151         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2152                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2153 
2154         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2155             snapname, 8, 1, &dsobj, tx);
2156         ASSERT(err == 0);
2157 
2158         if (ds->ds_prev)
2159                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2160         VERIFY(0 == dsl_dataset_get_ref(dp,
2161             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2162 
2163         dsl_scan_ds_snapshotted(ds, tx);
2164 
2165         dsl_dir_snap_cmtime_update(ds->ds_dir);
2166 
2167         spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2168             "dataset = %llu", dsobj);
2169 }
2170 
2171 void
2172 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2173 {
2174         ASSERT(dmu_tx_is_syncing(tx));
2175         ASSERT(ds->ds_objset != NULL);
2176         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2177 
2178         /*
2179          * in case we had to change ds_fsid_guid when we opened it,
2180          * sync it out now.
2181          */
2182         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2183         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2184 
2185         dsl_dir_dirty(ds->ds_dir, tx);
2186         dmu_objset_sync(ds->ds_objset, zio, tx);
2187 }
2188 
2189 static void
2190 get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2191 {
2192         uint64_t count = 0;
2193         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2194         zap_cursor_t zc;
2195         zap_attribute_t za;
2196         nvlist_t *propval;
2197         nvlist_t *val;
2198 
2199         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2200         VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2201         VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2202 
2203         /*
2204          * There may me missing entries in ds_next_clones_obj
2205          * due to a bug in a previous version of the code.
2206          * Only trust it if it has the right number of entries.
2207          */
2208         if (ds->ds_phys->ds_next_clones_obj != 0) {
2209                 ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2210                     &count));
2211         }
2212         if (count != ds->ds_phys->ds_num_children - 1) {
2213                 goto fail;
2214         }
2215         for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2216             zap_cursor_retrieve(&zc, &za) == 0;
2217             zap_cursor_advance(&zc)) {
2218                 dsl_dataset_t *clone;
2219                 char buf[ZFS_MAXNAMELEN];
2220                 /*
2221                  * Even though we hold the dp_config_rwlock, the dataset
2222                  * may fail to open, returning ENOENT.  If there is a
2223                  * thread concurrently attempting to destroy this
2224                  * dataset, it will have the ds_rwlock held for
2225                  * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2226                  * dsl_dataset_hold_ref() will fail its
2227                  * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2228                  * dp_config_rwlock, and wait for the destroy progress
2229                  * and signal ds_exclusive_cv.  If the destroy was
2230                  * successful, we will see that
2231                  * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2232                  */
2233                 if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2234                     za.za_first_integer, FTAG, &clone) != 0)
2235                         continue;
2236                 dsl_dir_name(clone->ds_dir, buf);
2237                 VERIFY(nvlist_add_boolean(val, buf) == 0);
2238                 dsl_dataset_rele(clone, FTAG);
2239         }
2240         zap_cursor_fini(&zc);
2241         VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2242         VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2243             propval) == 0);
2244 fail:
2245         nvlist_free(val);
2246         nvlist_free(propval);
2247         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2248 }
2249 
2250 void
2251 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2252 {
2253         uint64_t refd, avail, uobjs, aobjs, ratio;
2254 
2255         dsl_dir_stats(ds->ds_dir, nv);
2256 
2257         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2258         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2259         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2260 
2261         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2262             ds->ds_phys->ds_creation_time);
2263         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2264             ds->ds_phys->ds_creation_txg);
2265         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2266             ds->ds_quota);
2267         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2268             ds->ds_reserved);
2269         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2270             ds->ds_phys->ds_guid);
2271         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2272             ds->ds_phys->ds_unique_bytes);
2273         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2274             ds->ds_object);
2275         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2276             ds->ds_userrefs);
2277         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2278             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2279 
2280         if (ds->ds_phys->ds_prev_snap_obj != 0) {
2281                 uint64_t written, comp, uncomp;
2282                 dsl_pool_t *dp = ds->ds_dir->dd_pool;
2283                 dsl_dataset_t *prev;
2284 
2285                 rw_enter(&dp->dp_config_rwlock, RW_READER);
2286                 int err = dsl_dataset_hold_obj(dp,
2287                     ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2288                 rw_exit(&dp->dp_config_rwlock);
2289                 if (err == 0) {
2290                         err = dsl_dataset_space_written(prev, ds, &written,
2291                             &comp, &uncomp);
2292                         dsl_dataset_rele(prev, FTAG);
2293                         if (err == 0) {
2294                                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2295                                     written);
2296                         }
2297                 }
2298         }
2299 
2300         ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2301             (ds->ds_phys->ds_uncompressed_bytes * 100 /
2302             ds->ds_phys->ds_compressed_bytes);
2303         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2304 
2305         if (ds->ds_phys->ds_next_snap_obj) {
2306                 /*
2307                  * This is a snapshot; override the dd's space used with
2308                  * our unique space and compression ratio.
2309                  */
2310                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2311                     ds->ds_phys->ds_unique_bytes);
2312                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2313 
2314                 get_clones_stat(ds, nv);
2315         }
2316 }
2317 
2318 void
2319 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2320 {
2321         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2322         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2323         stat->dds_guid = ds->ds_phys->ds_guid;
2324         if (ds->ds_phys->ds_next_snap_obj) {
2325                 stat->dds_is_snapshot = B_TRUE;
2326                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2327         } else {
2328                 stat->dds_is_snapshot = B_FALSE;
2329                 stat->dds_num_clones = 0;
2330         }
2331 
2332         /* clone origin is really a dsl_dir thing... */
2333         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2334         if (dsl_dir_is_clone(ds->ds_dir)) {
2335                 dsl_dataset_t *ods;
2336 
2337                 VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2338                     ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2339                 dsl_dataset_name(ods, stat->dds_origin);
2340                 dsl_dataset_drop_ref(ods, FTAG);
2341         } else {
2342                 stat->dds_origin[0] = '\0';
2343         }
2344         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2345 }
2346 
2347 uint64_t
2348 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2349 {
2350         return (ds->ds_fsid_guid);
2351 }
2352 
2353 void
2354 dsl_dataset_space(dsl_dataset_t *ds,
2355     uint64_t *refdbytesp, uint64_t *availbytesp,
2356     uint64_t *usedobjsp, uint64_t *availobjsp)
2357 {
2358         *refdbytesp = ds->ds_phys->ds_referenced_bytes;
2359         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2360         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2361                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2362         if (ds->ds_quota != 0) {
2363                 /*
2364                  * Adjust available bytes according to refquota
2365                  */
2366                 if (*refdbytesp < ds->ds_quota)
2367                         *availbytesp = MIN(*availbytesp,
2368                             ds->ds_quota - *refdbytesp);
2369                 else
2370                         *availbytesp = 0;
2371         }
2372         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2373         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2374 }
2375 
2376 boolean_t
2377 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2378 {
2379         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2380 
2381         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2382             dsl_pool_sync_context(dp));
2383         if (ds->ds_prev == NULL)
2384                 return (B_FALSE);
2385         if (ds->ds_phys->ds_bp.blk_birth >
2386             ds->ds_prev->ds_phys->ds_creation_txg) {
2387                 objset_t *os, *os_prev;
2388                 /*
2389                  * It may be that only the ZIL differs, because it was
2390                  * reset in the head.  Don't count that as being
2391                  * modified.
2392                  */
2393                 if (dmu_objset_from_ds(ds, &os) != 0)
2394                         return (B_TRUE);
2395                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2396                         return (B_TRUE);
2397                 return (bcmp(&os->os_phys->os_meta_dnode,
2398                     &os_prev->os_phys->os_meta_dnode,
2399                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2400         }
2401         return (B_FALSE);
2402 }
2403 
2404 /* ARGSUSED */
2405 static int
2406 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2407 {
2408         dsl_dataset_t *ds = arg1;
2409         char *newsnapname = arg2;
2410         dsl_dir_t *dd = ds->ds_dir;
2411         dsl_dataset_t *hds;
2412         uint64_t val;
2413         int err;
2414 
2415         err = dsl_dataset_hold_obj(dd->dd_pool,
2416             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2417         if (err)
2418                 return (err);
2419 
2420         /* new name better not be in use */
2421         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2422         dsl_dataset_rele(hds, FTAG);
2423 
2424         if (err == 0)
2425                 err = EEXIST;
2426         else if (err == ENOENT)
2427                 err = 0;
2428 
2429         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2430         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2431                 err = ENAMETOOLONG;
2432 
2433         return (err);
2434 }
2435 
2436 static void
2437 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2438 {
2439         dsl_dataset_t *ds = arg1;
2440         const char *newsnapname = arg2;
2441         dsl_dir_t *dd = ds->ds_dir;
2442         objset_t *mos = dd->dd_pool->dp_meta_objset;
2443         dsl_dataset_t *hds;
2444         int err;
2445 
2446         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2447 
2448         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2449             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2450 
2451         VERIFY(0 == dsl_dataset_get_snapname(ds));
2452         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2453         ASSERT3U(err, ==, 0);
2454         mutex_enter(&ds->ds_lock);
2455         (void) strcpy(ds->ds_snapname, newsnapname);
2456         mutex_exit(&ds->ds_lock);
2457         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2458             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2459         ASSERT3U(err, ==, 0);
2460 
2461         spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2462             "dataset = %llu", ds->ds_object);
2463         dsl_dataset_rele(hds, FTAG);
2464 }
2465 
2466 struct renamesnaparg {
2467         dsl_sync_task_group_t *dstg;
2468         char failed[MAXPATHLEN];
2469         char *oldsnap;
2470         char *newsnap;
2471 };
2472 
2473 static int
2474 dsl_snapshot_rename_one(const char *name, void *arg)
2475 {
2476         struct renamesnaparg *ra = arg;
2477         dsl_dataset_t *ds = NULL;
2478         char *snapname;
2479         int err;
2480 
2481         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2482         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2483 
2484         /*
2485          * For recursive snapshot renames the parent won't be changing
2486          * so we just pass name for both the to/from argument.
2487          */
2488         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2489         if (err != 0) {
2490                 strfree(snapname);
2491                 return (err == ENOENT ? 0 : err);
2492         }
2493 
2494 #ifdef _KERNEL
2495         /*
2496          * For all filesystems undergoing rename, we'll need to unmount it.
2497          */
2498         (void) zfs_unmount_snap(snapname, NULL);
2499 #endif
2500         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2501         strfree(snapname);
2502         if (err != 0)
2503                 return (err == ENOENT ? 0 : err);
2504 
2505         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2506             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2507 
2508         return (0);
2509 }
2510 
2511 static int
2512 dsl_recursive_rename(char *oldname, const char *newname)
2513 {
2514         int err;
2515         struct renamesnaparg *ra;
2516         dsl_sync_task_t *dst;
2517         spa_t *spa;
2518         char *cp, *fsname = spa_strdup(oldname);
2519         int len = strlen(oldname) + 1;
2520 
2521         /* truncate the snapshot name to get the fsname */
2522         cp = strchr(fsname, '@');
2523         *cp = '\0';
2524 
2525         err = spa_open(fsname, &spa, FTAG);
2526         if (err) {
2527                 kmem_free(fsname, len);
2528                 return (err);
2529         }
2530         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2531         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2532 
2533         ra->oldsnap = strchr(oldname, '@') + 1;
2534         ra->newsnap = strchr(newname, '@') + 1;
2535         *ra->failed = '\0';
2536 
2537         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2538             DS_FIND_CHILDREN);
2539         kmem_free(fsname, len);
2540 
2541         if (err == 0) {
2542                 err = dsl_sync_task_group_wait(ra->dstg);
2543         }
2544 
2545         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2546             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2547                 dsl_dataset_t *ds = dst->dst_arg1;
2548                 if (dst->dst_err) {
2549                         dsl_dir_name(ds->ds_dir, ra->failed);
2550                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2551                         (void) strlcat(ra->failed, ra->newsnap,
2552                             sizeof (ra->failed));
2553                 }
2554                 dsl_dataset_rele(ds, ra->dstg);
2555         }
2556 
2557         if (err)
2558                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2559 
2560         dsl_sync_task_group_destroy(ra->dstg);
2561         kmem_free(ra, sizeof (struct renamesnaparg));
2562         spa_close(spa, FTAG);
2563         return (err);
2564 }
2565 
2566 static int
2567 dsl_valid_rename(const char *oldname, void *arg)
2568 {
2569         int delta = *(int *)arg;
2570 
2571         if (strlen(oldname) + delta >= MAXNAMELEN)
2572                 return (ENAMETOOLONG);
2573 
2574         return (0);
2575 }
2576 
2577 #pragma weak dmu_objset_rename = dsl_dataset_rename
2578 int
2579 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2580 {
2581         dsl_dir_t *dd;
2582         dsl_dataset_t *ds;
2583         const char *tail;
2584         int err;
2585 
2586         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2587         if (err)
2588                 return (err);
2589 
2590         if (tail == NULL) {
2591                 int delta = strlen(newname) - strlen(oldname);
2592 
2593                 /* if we're growing, validate child name lengths */
2594                 if (delta > 0)
2595                         err = dmu_objset_find(oldname, dsl_valid_rename,
2596                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2597 
2598                 if (err == 0)
2599                         err = dsl_dir_rename(dd, newname);
2600                 dsl_dir_close(dd, FTAG);
2601                 return (err);
2602         }
2603 
2604         if (tail[0] != '@') {
2605                 /* the name ended in a nonexistent component */
2606                 dsl_dir_close(dd, FTAG);
2607                 return (ENOENT);
2608         }
2609 
2610         dsl_dir_close(dd, FTAG);
2611 
2612         /* new name must be snapshot in same filesystem */
2613         tail = strchr(newname, '@');
2614         if (tail == NULL)
2615                 return (EINVAL);
2616         tail++;
2617         if (strncmp(oldname, newname, tail - newname) != 0)
2618                 return (EXDEV);
2619 
2620         if (recursive) {
2621                 err = dsl_recursive_rename(oldname, newname);
2622         } else {
2623                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2624                 if (err)
2625                         return (err);
2626 
2627                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2628                     dsl_dataset_snapshot_rename_check,
2629                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2630 
2631                 dsl_dataset_rele(ds, FTAG);
2632         }
2633 
2634         return (err);
2635 }
2636 
2637 struct promotenode {
2638         list_node_t link;
2639         dsl_dataset_t *ds;
2640 };
2641 
2642 struct promotearg {
2643         list_t shared_snaps, origin_snaps, clone_snaps;
2644         dsl_dataset_t *origin_origin;
2645         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2646         char *err_ds;
2647 };
2648 
2649 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2650 static boolean_t snaplist_unstable(list_t *l);
2651 
2652 static int
2653 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2654 {
2655         dsl_dataset_t *hds = arg1;
2656         struct promotearg *pa = arg2;
2657         struct promotenode *snap = list_head(&pa->shared_snaps);
2658         dsl_dataset_t *origin_ds = snap->ds;
2659         int err;
2660         uint64_t unused;
2661 
2662         /* Check that it is a real clone */
2663         if (!dsl_dir_is_clone(hds->ds_dir))
2664                 return (EINVAL);
2665 
2666         /* Since this is so expensive, don't do the preliminary check */
2667         if (!dmu_tx_is_syncing(tx))
2668                 return (0);
2669 
2670         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2671                 return (EXDEV);
2672 
2673         /* compute origin's new unique space */
2674         snap = list_tail(&pa->clone_snaps);
2675         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2676         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2677             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2678             &pa->unique, &unused, &unused);
2679 
2680         /*
2681          * Walk the snapshots that we are moving
2682          *
2683          * Compute space to transfer.  Consider the incremental changes
2684          * to used for each snapshot:
2685          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2686          * So each snapshot gave birth to:
2687          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2688          * So a sequence would look like:
2689          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2690          * Which simplifies to:
2691          * uN + kN + kN-1 + ... + k1 + k0
2692          * Note however, if we stop before we reach the ORIGIN we get:
2693          * uN + kN + kN-1 + ... + kM - uM-1
2694          */
2695         pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2696         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2697         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2698         for (snap = list_head(&pa->shared_snaps); snap;
2699             snap = list_next(&pa->shared_snaps, snap)) {
2700                 uint64_t val, dlused, dlcomp, dluncomp;
2701                 dsl_dataset_t *ds = snap->ds;
2702 
2703                 /* Check that the snapshot name does not conflict */
2704                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2705                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2706                 if (err == 0) {
2707                         err = EEXIST;
2708                         goto out;
2709                 }
2710                 if (err != ENOENT)
2711                         goto out;
2712 
2713                 /* The very first snapshot does not have a deadlist */
2714                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2715                         continue;
2716 
2717                 dsl_deadlist_space(&ds->ds_deadlist,
2718                     &dlused, &dlcomp, &dluncomp);
2719                 pa->used += dlused;
2720                 pa->comp += dlcomp;
2721                 pa->uncomp += dluncomp;
2722         }
2723 
2724         /*
2725          * If we are a clone of a clone then we never reached ORIGIN,
2726          * so we need to subtract out the clone origin's used space.
2727          */
2728         if (pa->origin_origin) {
2729                 pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2730                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2731                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2732         }
2733 
2734         /* Check that there is enough space here */
2735         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2736             pa->used);
2737         if (err)
2738                 return (err);
2739 
2740         /*
2741          * Compute the amounts of space that will be used by snapshots
2742          * after the promotion (for both origin and clone).  For each,
2743          * it is the amount of space that will be on all of their
2744          * deadlists (that was not born before their new origin).
2745          */
2746         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2747                 uint64_t space;
2748 
2749                 /*
2750                  * Note, typically this will not be a clone of a clone,
2751                  * so dd_origin_txg will be < TXG_INITIAL, so
2752                  * these snaplist_space() -> dsl_deadlist_space_range()
2753                  * calls will be fast because they do not have to
2754                  * iterate over all bps.
2755                  */
2756                 snap = list_head(&pa->origin_snaps);
2757                 err = snaplist_space(&pa->shared_snaps,
2758                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2759                 if (err)
2760                         return (err);
2761 
2762                 err = snaplist_space(&pa->clone_snaps,
2763                     snap->ds->ds_dir->dd_origin_txg, &space);
2764                 if (err)
2765                         return (err);
2766                 pa->cloneusedsnap += space;
2767         }
2768         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2769                 err = snaplist_space(&pa->origin_snaps,
2770                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2771                 if (err)
2772                         return (err);
2773         }
2774 
2775         return (0);
2776 out:
2777         pa->err_ds =  snap->ds->ds_snapname;
2778         return (err);
2779 }
2780 
2781 static void
2782 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2783 {
2784         dsl_dataset_t *hds = arg1;
2785         struct promotearg *pa = arg2;
2786         struct promotenode *snap = list_head(&pa->shared_snaps);
2787         dsl_dataset_t *origin_ds = snap->ds;
2788         dsl_dataset_t *origin_head;
2789         dsl_dir_t *dd = hds->ds_dir;
2790         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2791         dsl_dir_t *odd = NULL;
2792         uint64_t oldnext_obj;
2793         int64_t delta;
2794 
2795         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2796 
2797         snap = list_head(&pa->origin_snaps);
2798         origin_head = snap->ds;
2799 
2800         /*
2801          * We need to explicitly open odd, since origin_ds's dd will be
2802          * changing.
2803          */
2804         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2805             NULL, FTAG, &odd));
2806 
2807         /* change origin's next snap */
2808         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2809         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2810         snap = list_tail(&pa->clone_snaps);
2811         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2812         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2813 
2814         /* change the origin's next clone */
2815         if (origin_ds->ds_phys->ds_next_clones_obj) {
2816                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2817                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2818                     origin_ds->ds_phys->ds_next_clones_obj,
2819                     oldnext_obj, tx));
2820         }
2821 
2822         /* change origin */
2823         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2824         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2825         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2826         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2827         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2828         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2829         origin_head->ds_dir->dd_origin_txg =
2830             origin_ds->ds_phys->ds_creation_txg;
2831 
2832         /* change dd_clone entries */
2833         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2834                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2835                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2836                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2837                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2838                     hds->ds_object, tx));
2839 
2840                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2841                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2842                     origin_head->ds_object, tx));
2843                 if (dd->dd_phys->dd_clones == 0) {
2844                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2845                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2846                 }
2847                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2848                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2849 
2850         }
2851 
2852         /* move snapshots to this dir */
2853         for (snap = list_head(&pa->shared_snaps); snap;
2854             snap = list_next(&pa->shared_snaps, snap)) {
2855                 dsl_dataset_t *ds = snap->ds;
2856 
2857                 /* unregister props as dsl_dir is changing */
2858                 if (ds->ds_objset) {
2859                         dmu_objset_evict(ds->ds_objset);
2860                         ds->ds_objset = NULL;
2861                 }
2862                 /* move snap name entry */
2863                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2864                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2865                     ds->ds_snapname, tx));
2866                 VERIFY(0 == zap_add(dp->dp_meta_objset,
2867                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2868                     8, 1, &ds->ds_object, tx));
2869 
2870                 /* change containing dsl_dir */
2871                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2872                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2873                 ds->ds_phys->ds_dir_obj = dd->dd_object;
2874                 ASSERT3P(ds->ds_dir, ==, odd);
2875                 dsl_dir_close(ds->ds_dir, ds);
2876                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2877                     NULL, ds, &ds->ds_dir));
2878 
2879                 /* move any clone references */
2880                 if (ds->ds_phys->ds_next_clones_obj &&
2881                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2882                         zap_cursor_t zc;
2883                         zap_attribute_t za;
2884 
2885                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
2886                             ds->ds_phys->ds_next_clones_obj);
2887                             zap_cursor_retrieve(&zc, &za) == 0;
2888                             zap_cursor_advance(&zc)) {
2889                                 dsl_dataset_t *cnds;
2890                                 uint64_t o;
2891 
2892                                 if (za.za_first_integer == oldnext_obj) {
2893                                         /*
2894                                          * We've already moved the
2895                                          * origin's reference.
2896                                          */
2897                                         continue;
2898                                 }
2899 
2900                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2901                                     za.za_first_integer, FTAG, &cnds));
2902                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2903 
2904                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2905                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
2906                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
2907                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
2908                                 dsl_dataset_rele(cnds, FTAG);
2909                         }
2910                         zap_cursor_fini(&zc);
2911                 }
2912 
2913                 ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2914         }
2915 
2916         /*
2917          * Change space accounting.
2918          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2919          * both be valid, or both be 0 (resulting in delta == 0).  This
2920          * is true for each of {clone,origin} independently.
2921          */
2922 
2923         delta = pa->cloneusedsnap -
2924             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2925         ASSERT3S(delta, >=, 0);
2926         ASSERT3U(pa->used, >=, delta);
2927         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2928         dsl_dir_diduse_space(dd, DD_USED_HEAD,
2929             pa->used - delta, pa->comp, pa->uncomp, tx);
2930 
2931         delta = pa->originusedsnap -
2932             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2933         ASSERT3S(delta, <=, 0);
2934         ASSERT3U(pa->used, >=, -delta);
2935         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2936         dsl_dir_diduse_space(odd, DD_USED_HEAD,
2937             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2938 
2939         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2940 
2941         /* log history record */
2942         spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2943             "dataset = %llu", hds->ds_object);
2944 
2945         dsl_dir_close(odd, FTAG);
2946 }
2947 
2948 static char *snaplist_tag = "snaplist";
2949 /*
2950  * Make a list of dsl_dataset_t's for the snapshots between first_obj
2951  * (exclusive) and last_obj (inclusive).  The list will be in reverse
2952  * order (last_obj will be the list_head()).  If first_obj == 0, do all
2953  * snapshots back to this dataset's origin.
2954  */
2955 static int
2956 snaplist_make(dsl_pool_t *dp, boolean_t own,
2957     uint64_t first_obj, uint64_t last_obj, list_t *l)
2958 {
2959         uint64_t obj = last_obj;
2960 
2961         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2962 
2963         list_create(l, sizeof (struct promotenode),
2964             offsetof(struct promotenode, link));
2965 
2966         while (obj != first_obj) {
2967                 dsl_dataset_t *ds;
2968                 struct promotenode *snap;
2969                 int err;
2970 
2971                 if (own) {
2972                         err = dsl_dataset_own_obj(dp, obj,
2973                             0, snaplist_tag, &ds);
2974                         if (err == 0)
2975                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
2976                 } else {
2977                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2978                 }
2979                 if (err == ENOENT) {
2980                         /* lost race with snapshot destroy */
2981                         struct promotenode *last = list_tail(l);
2982                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
2983                         obj = last->ds->ds_phys->ds_prev_snap_obj;
2984                         continue;
2985                 } else if (err) {
2986                         return (err);
2987                 }
2988 
2989                 if (first_obj == 0)
2990                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
2991 
2992                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
2993                 snap->ds = ds;
2994                 list_insert_tail(l, snap);
2995                 obj = ds->ds_phys->ds_prev_snap_obj;
2996         }
2997 
2998         return (0);
2999 }
3000 
3001 static int
3002 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3003 {
3004         struct promotenode *snap;
3005 
3006         *spacep = 0;
3007         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3008                 uint64_t used, comp, uncomp;
3009                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3010                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
3011                 *spacep += used;
3012         }
3013         return (0);
3014 }
3015 
3016 static void
3017 snaplist_destroy(list_t *l, boolean_t own)
3018 {
3019         struct promotenode *snap;
3020 
3021         if (!l || !list_link_active(&l->list_head))
3022                 return;
3023 
3024         while ((snap = list_tail(l)) != NULL) {
3025                 list_remove(l, snap);
3026                 if (own)
3027                         dsl_dataset_disown(snap->ds, snaplist_tag);
3028                 else
3029                         dsl_dataset_rele(snap->ds, snaplist_tag);
3030                 kmem_free(snap, sizeof (struct promotenode));
3031         }
3032         list_destroy(l);
3033 }
3034 
3035 /*
3036  * Promote a clone.  Nomenclature note:
3037  * "clone" or "cds": the original clone which is being promoted
3038  * "origin" or "ods": the snapshot which is originally clone's origin
3039  * "origin head" or "ohds": the dataset which is the head
3040  * (filesystem/volume) for the origin
3041  * "origin origin": the origin of the origin's filesystem (typically
3042  * NULL, indicating that the clone is not a clone of a clone).
3043  */
3044 int
3045 dsl_dataset_promote(const char *name, char *conflsnap)
3046 {
3047         dsl_dataset_t *ds;
3048         dsl_dir_t *dd;
3049         dsl_pool_t *dp;
3050         dmu_object_info_t doi;
3051         struct promotearg pa = { 0 };
3052         struct promotenode *snap;
3053         int err;
3054 
3055         err = dsl_dataset_hold(name, FTAG, &ds);
3056         if (err)
3057                 return (err);
3058         dd = ds->ds_dir;
3059         dp = dd->dd_pool;
3060 
3061         err = dmu_object_info(dp->dp_meta_objset,
3062             ds->ds_phys->ds_snapnames_zapobj, &doi);
3063         if (err) {
3064                 dsl_dataset_rele(ds, FTAG);
3065                 return (err);
3066         }
3067 
3068         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3069                 dsl_dataset_rele(ds, FTAG);
3070                 return (EINVAL);
3071         }
3072 
3073         /*
3074          * We are going to inherit all the snapshots taken before our
3075          * origin (i.e., our new origin will be our parent's origin).
3076          * Take ownership of them so that we can rename them into our
3077          * namespace.
3078          */
3079         rw_enter(&dp->dp_config_rwlock, RW_READER);
3080 
3081         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3082             &pa.shared_snaps);
3083         if (err != 0)
3084                 goto out;
3085 
3086         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3087         if (err != 0)
3088                 goto out;
3089 
3090         snap = list_head(&pa.shared_snaps);
3091         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3092         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3093             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3094         if (err != 0)
3095                 goto out;
3096 
3097         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3098                 err = dsl_dataset_hold_obj(dp,
3099                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
3100                     FTAG, &pa.origin_origin);
3101                 if (err != 0)
3102                         goto out;
3103         }
3104 
3105 out:
3106         rw_exit(&dp->dp_config_rwlock);
3107 
3108         /*
3109          * Add in 128x the snapnames zapobj size, since we will be moving
3110          * a bunch of snapnames to the promoted ds, and dirtying their
3111          * bonus buffers.
3112          */
3113         if (err == 0) {
3114                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3115                     dsl_dataset_promote_sync, ds, &pa,
3116                     2 + 2 * doi.doi_physical_blocks_512);
3117                 if (err && pa.err_ds && conflsnap)
3118                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3119         }
3120 
3121         snaplist_destroy(&pa.shared_snaps, B_TRUE);
3122         snaplist_destroy(&pa.clone_snaps, B_FALSE);
3123         snaplist_destroy(&pa.origin_snaps, B_FALSE);
3124         if (pa.origin_origin)
3125                 dsl_dataset_rele(pa.origin_origin, FTAG);
3126         dsl_dataset_rele(ds, FTAG);
3127         return (err);
3128 }
3129 
3130 struct cloneswaparg {
3131         dsl_dataset_t *cds; /* clone dataset */
3132         dsl_dataset_t *ohds; /* origin's head dataset */
3133         boolean_t force;
3134         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3135 };
3136 
3137 /* ARGSUSED */
3138 static int
3139 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3140 {
3141         struct cloneswaparg *csa = arg1;
3142 
3143         /* they should both be heads */
3144         if (dsl_dataset_is_snapshot(csa->cds) ||
3145             dsl_dataset_is_snapshot(csa->ohds))
3146                 return (EINVAL);
3147 
3148         /* the branch point should be just before them */
3149         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3150                 return (EINVAL);
3151 
3152         /* cds should be the clone (unless they are unrelated) */
3153         if (csa->cds->ds_prev != NULL &&
3154             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3155             csa->ohds->ds_object !=
3156             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3157                 return (EINVAL);
3158 
3159         /* the clone should be a child of the origin */
3160         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3161                 return (EINVAL);
3162 
3163         /* ohds shouldn't be modified unless 'force' */
3164         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3165                 return (ETXTBSY);
3166 
3167         /* adjust amount of any unconsumed refreservation */
3168         csa->unused_refres_delta =
3169             (int64_t)MIN(csa->ohds->ds_reserved,
3170             csa->ohds->ds_phys->ds_unique_bytes) -
3171             (int64_t)MIN(csa->ohds->ds_reserved,
3172             csa->cds->ds_phys->ds_unique_bytes);
3173 
3174         if (csa->unused_refres_delta > 0 &&
3175             csa->unused_refres_delta >
3176             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3177                 return (ENOSPC);
3178 
3179         if (csa->ohds->ds_quota != 0 &&
3180             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3181                 return (EDQUOT);
3182 
3183         return (0);
3184 }
3185 
3186 /* ARGSUSED */
3187 static void
3188 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3189 {
3190         struct cloneswaparg *csa = arg1;
3191         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3192 
3193         ASSERT(csa->cds->ds_reserved == 0);
3194         ASSERT(csa->ohds->ds_quota == 0 ||
3195             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3196 
3197         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3198         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3199 
3200         if (csa->cds->ds_objset != NULL) {
3201                 dmu_objset_evict(csa->cds->ds_objset);
3202                 csa->cds->ds_objset = NULL;
3203         }
3204 
3205         if (csa->ohds->ds_objset != NULL) {
3206                 dmu_objset_evict(csa->ohds->ds_objset);
3207                 csa->ohds->ds_objset = NULL;
3208         }
3209 
3210         /*
3211          * Reset origin's unique bytes, if it exists.
3212          */
3213         if (csa->cds->ds_prev) {
3214                 dsl_dataset_t *origin = csa->cds->ds_prev;
3215                 uint64_t comp, uncomp;
3216 
3217                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3218                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3219                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3220                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3221         }
3222 
3223         /* swap blkptrs */
3224         {
3225                 blkptr_t tmp;
3226                 tmp = csa->ohds->ds_phys->ds_bp;
3227                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3228                 csa->cds->ds_phys->ds_bp = tmp;
3229         }
3230 
3231         /* set dd_*_bytes */
3232         {
3233                 int64_t dused, dcomp, duncomp;
3234                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3235                 uint64_t odl_used, odl_comp, odl_uncomp;
3236 
3237                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3238                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3239 
3240                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3241                     &cdl_used, &cdl_comp, &cdl_uncomp);
3242                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3243                     &odl_used, &odl_comp, &odl_uncomp);
3244 
3245                 dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3246                     (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3247                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3248                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3249                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3250                     cdl_uncomp -
3251                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3252 
3253                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3254                     dused, dcomp, duncomp, tx);
3255                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3256                     -dused, -dcomp, -duncomp, tx);
3257 
3258                 /*
3259                  * The difference in the space used by snapshots is the
3260                  * difference in snapshot space due to the head's
3261                  * deadlist (since that's the only thing that's
3262                  * changing that affects the snapused).
3263                  */
3264                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3265                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3266                     &cdl_used, &cdl_comp, &cdl_uncomp);
3267                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3268                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3269                     &odl_used, &odl_comp, &odl_uncomp);
3270                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3271                     DD_USED_HEAD, DD_USED_SNAP, tx);
3272         }
3273 
3274         /* swap ds_*_bytes */
3275         SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3276             csa->cds->ds_phys->ds_referenced_bytes);
3277         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3278             csa->cds->ds_phys->ds_compressed_bytes);
3279         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3280             csa->cds->ds_phys->ds_uncompressed_bytes);
3281         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3282             csa->cds->ds_phys->ds_unique_bytes);
3283 
3284         /* apply any parent delta for change in unconsumed refreservation */
3285         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3286             csa->unused_refres_delta, 0, 0, tx);
3287 
3288         /*
3289          * Swap deadlists.
3290          */
3291         dsl_deadlist_close(&csa->cds->ds_deadlist);
3292         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3293         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3294             csa->cds->ds_phys->ds_deadlist_obj);
3295         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3296             csa->cds->ds_phys->ds_deadlist_obj);
3297         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3298             csa->ohds->ds_phys->ds_deadlist_obj);
3299 
3300         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3301 }
3302 
3303 /*
3304  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3305  * recv" into an existing fs to swizzle the file system to the new
3306  * version, and by "zfs rollback".  Can also be used to swap two
3307  * independent head datasets if neither has any snapshots.
3308  */
3309 int
3310 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3311     boolean_t force)
3312 {
3313         struct cloneswaparg csa;
3314         int error;
3315 
3316         ASSERT(clone->ds_owner);
3317         ASSERT(origin_head->ds_owner);
3318 retry:
3319         /*
3320          * Need exclusive access for the swap. If we're swapping these
3321          * datasets back after an error, we already hold the locks.
3322          */
3323         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3324                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3325         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3326             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3327                 rw_exit(&clone->ds_rwlock);
3328                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3329                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3330                         rw_exit(&origin_head->ds_rwlock);
3331                         goto retry;
3332                 }
3333         }
3334         csa.cds = clone;
3335         csa.ohds = origin_head;
3336         csa.force = force;
3337         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3338             dsl_dataset_clone_swap_check,
3339             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3340         return (error);
3341 }
3342 
3343 /*
3344  * Given a pool name and a dataset object number in that pool,
3345  * return the name of that dataset.
3346  */
3347 int
3348 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3349 {
3350         spa_t *spa;
3351         dsl_pool_t *dp;
3352         dsl_dataset_t *ds;
3353         int error;
3354 
3355         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3356                 return (error);
3357         dp = spa_get_dsl(spa);
3358         rw_enter(&dp->dp_config_rwlock, RW_READER);
3359         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3360                 dsl_dataset_name(ds, buf);
3361                 dsl_dataset_rele(ds, FTAG);
3362         }
3363         rw_exit(&dp->dp_config_rwlock);
3364         spa_close(spa, FTAG);
3365 
3366         return (error);
3367 }
3368 
3369 int
3370 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3371     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3372 {
3373         int error = 0;
3374 
3375         ASSERT3S(asize, >, 0);
3376 
3377         /*
3378          * *ref_rsrv is the portion of asize that will come from any
3379          * unconsumed refreservation space.
3380          */
3381         *ref_rsrv = 0;
3382 
3383         mutex_enter(&ds->ds_lock);
3384         /*
3385          * Make a space adjustment for reserved bytes.
3386          */
3387         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3388                 ASSERT3U(*used, >=,
3389                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3390                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3391                 *ref_rsrv =
3392                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3393         }
3394 
3395         if (!check_quota || ds->ds_quota == 0) {
3396                 mutex_exit(&ds->ds_lock);
3397                 return (0);
3398         }
3399         /*
3400          * If they are requesting more space, and our current estimate
3401          * is over quota, they get to try again unless the actual
3402          * on-disk is over quota and there are no pending changes (which
3403          * may free up space for us).
3404          */
3405         if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3406                 if (inflight > 0 ||
3407                     ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3408                         error = ERESTART;
3409                 else
3410                         error = EDQUOT;
3411         }
3412         mutex_exit(&ds->ds_lock);
3413 
3414         return (error);
3415 }
3416 
3417 /* ARGSUSED */
3418 static int
3419 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3420 {
3421         dsl_dataset_t *ds = arg1;
3422         dsl_prop_setarg_t *psa = arg2;
3423         int err;
3424 
3425         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3426                 return (ENOTSUP);
3427 
3428         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3429                 return (err);
3430 
3431         if (psa->psa_effective_value == 0)
3432                 return (0);
3433 
3434         if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3435             psa->psa_effective_value < ds->ds_reserved)
3436                 return (ENOSPC);
3437 
3438         return (0);
3439 }
3440 
3441 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3442 
3443 void
3444 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3445 {
3446         dsl_dataset_t *ds = arg1;
3447         dsl_prop_setarg_t *psa = arg2;
3448         uint64_t effective_value = psa->psa_effective_value;
3449 
3450         dsl_prop_set_sync(ds, psa, tx);
3451         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3452 
3453         if (ds->ds_quota != effective_value) {
3454                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3455                 ds->ds_quota = effective_value;
3456 
3457                 spa_history_log_internal(LOG_DS_REFQUOTA,
3458                     ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu ",
3459                     (longlong_t)ds->ds_quota, ds->ds_object);
3460         }
3461 }
3462 
3463 int
3464 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3465 {
3466         dsl_dataset_t *ds;
3467         dsl_prop_setarg_t psa;
3468         int err;
3469 
3470         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3471 
3472         err = dsl_dataset_hold(dsname, FTAG, &ds);
3473         if (err)
3474                 return (err);
3475 
3476         /*
3477          * If someone removes a file, then tries to set the quota, we
3478          * want to make sure the file freeing takes effect.
3479          */
3480         txg_wait_open(ds->ds_dir->dd_pool, 0);
3481 
3482         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3483             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3484             ds, &psa, 0);
3485 
3486         dsl_dataset_rele(ds, FTAG);
3487         return (err);
3488 }
3489 
3490 static int
3491 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3492 {
3493         dsl_dataset_t *ds = arg1;
3494         dsl_prop_setarg_t *psa = arg2;
3495         uint64_t effective_value;
3496         uint64_t unique;
3497         int err;
3498 
3499         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3500             SPA_VERSION_REFRESERVATION)
3501                 return (ENOTSUP);
3502 
3503         if (dsl_dataset_is_snapshot(ds))
3504                 return (EINVAL);
3505 
3506         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3507                 return (err);
3508 
3509         effective_value = psa->psa_effective_value;
3510 
3511         /*
3512          * If we are doing the preliminary check in open context, the
3513          * space estimates may be inaccurate.
3514          */
3515         if (!dmu_tx_is_syncing(tx))
3516                 return (0);
3517 
3518         mutex_enter(&ds->ds_lock);
3519         if (!DS_UNIQUE_IS_ACCURATE(ds))
3520                 dsl_dataset_recalc_head_uniq(ds);
3521         unique = ds->ds_phys->ds_unique_bytes;
3522         mutex_exit(&ds->ds_lock);
3523 
3524         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3525                 uint64_t delta = MAX(unique, effective_value) -
3526                     MAX(unique, ds->ds_reserved);
3527 
3528                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3529                         return (ENOSPC);
3530                 if (ds->ds_quota > 0 &&
3531                     effective_value > ds->ds_quota)
3532                         return (ENOSPC);
3533         }
3534 
3535         return (0);
3536 }
3537 
3538 static void
3539 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3540 {
3541         dsl_dataset_t *ds = arg1;
3542         dsl_prop_setarg_t *psa = arg2;
3543         uint64_t effective_value = psa->psa_effective_value;
3544         uint64_t unique;
3545         int64_t delta;
3546 
3547         dsl_prop_set_sync(ds, psa, tx);
3548         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3549 
3550         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3551 
3552         mutex_enter(&ds->ds_dir->dd_lock);
3553         mutex_enter(&ds->ds_lock);
3554         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3555         unique = ds->ds_phys->ds_unique_bytes;
3556         delta = MAX(0, (int64_t)(effective_value - unique)) -
3557             MAX(0, (int64_t)(ds->ds_reserved - unique));
3558         ds->ds_reserved = effective_value;
3559         mutex_exit(&ds->ds_lock);
3560 
3561         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3562         mutex_exit(&ds->ds_dir->dd_lock);
3563 
3564         spa_history_log_internal(LOG_DS_REFRESERV,
3565             ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu",
3566             (longlong_t)effective_value, ds->ds_object);
3567 }
3568 
3569 int
3570 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3571     uint64_t reservation)
3572 {
3573         dsl_dataset_t *ds;
3574         dsl_prop_setarg_t psa;
3575         int err;
3576 
3577         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3578             &reservation);
3579 
3580         err = dsl_dataset_hold(dsname, FTAG, &ds);
3581         if (err)
3582                 return (err);
3583 
3584         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3585             dsl_dataset_set_reservation_check,
3586             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3587 
3588         dsl_dataset_rele(ds, FTAG);
3589         return (err);
3590 }
3591 
3592 typedef struct zfs_hold_cleanup_arg {
3593         dsl_pool_t *dp;
3594         uint64_t dsobj;
3595         char htag[MAXNAMELEN];
3596 } zfs_hold_cleanup_arg_t;
3597 
3598 static void
3599 dsl_dataset_user_release_onexit(void *arg)
3600 {
3601         zfs_hold_cleanup_arg_t *ca = arg;
3602 
3603         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3604             B_TRUE);
3605         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3606 }
3607 
3608 void
3609 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3610     minor_t minor)
3611 {
3612         zfs_hold_cleanup_arg_t *ca;
3613 
3614         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3615         ca->dp = ds->ds_dir->dd_pool;
3616         ca->dsobj = ds->ds_object;
3617         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3618         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3619             dsl_dataset_user_release_onexit, ca, NULL));
3620 }
3621 
3622 /*
3623  * If you add new checks here, you may need to add
3624  * additional checks to the "temporary" case in
3625  * snapshot_check() in dmu_objset.c.
3626  */
3627 static int
3628 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3629 {
3630         dsl_dataset_t *ds = arg1;
3631         struct dsl_ds_holdarg *ha = arg2;
3632         char *htag = ha->htag;
3633         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3634         int error = 0;
3635 
3636         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3637                 return (ENOTSUP);
3638 
3639         if (!dsl_dataset_is_snapshot(ds))
3640                 return (EINVAL);
3641 
3642         /* tags must be unique */
3643         mutex_enter(&ds->ds_lock);
3644         if (ds->ds_phys->ds_userrefs_obj) {
3645                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3646                     8, 1, tx);
3647                 if (error == 0)
3648                         error = EEXIST;
3649                 else if (error == ENOENT)
3650                         error = 0;
3651         }
3652         mutex_exit(&ds->ds_lock);
3653 
3654         if (error == 0 && ha->temphold &&
3655             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3656                 error = E2BIG;
3657 
3658         return (error);
3659 }
3660 
3661 void
3662 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3663 {
3664         dsl_dataset_t *ds = arg1;
3665         struct dsl_ds_holdarg *ha = arg2;
3666         char *htag = ha->htag;
3667         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3668         objset_t *mos = dp->dp_meta_objset;
3669         uint64_t now = gethrestime_sec();
3670         uint64_t zapobj;
3671 
3672         mutex_enter(&ds->ds_lock);
3673         if (ds->ds_phys->ds_userrefs_obj == 0) {
3674                 /*
3675                  * This is the first user hold for this dataset.  Create
3676                  * the userrefs zap object.
3677                  */
3678                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3679                 zapobj = ds->ds_phys->ds_userrefs_obj =
3680                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3681         } else {
3682                 zapobj = ds->ds_phys->ds_userrefs_obj;
3683         }
3684         ds->ds_userrefs++;
3685         mutex_exit(&ds->ds_lock);
3686 
3687         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3688 
3689         if (ha->temphold) {
3690                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3691                     htag, &now, tx));
3692         }
3693 
3694         spa_history_log_internal(LOG_DS_USER_HOLD,
3695             dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3696             (int)ha->temphold, ds->ds_object);
3697 }
3698 
3699 static int
3700 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3701 {
3702         struct dsl_ds_holdarg *ha = arg;
3703         dsl_dataset_t *ds;
3704         int error;
3705         char *name;
3706 
3707         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3708         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3709         error = dsl_dataset_hold(name, ha->dstg, &ds);
3710         strfree(name);
3711         if (error == 0) {
3712                 ha->gotone = B_TRUE;
3713                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3714                     dsl_dataset_user_hold_sync, ds, ha, 0);
3715         } else if (error == ENOENT && ha->recursive) {
3716                 error = 0;
3717         } else {
3718                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3719         }
3720         return (error);
3721 }
3722 
3723 int
3724 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3725     boolean_t temphold)
3726 {
3727         struct dsl_ds_holdarg *ha;
3728         int error;
3729 
3730         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3731         ha->htag = htag;
3732         ha->temphold = temphold;
3733         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3734             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3735             ds, ha, 0);
3736         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3737 
3738         return (error);
3739 }
3740 
3741 int
3742 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3743     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3744 {
3745         struct dsl_ds_holdarg *ha;
3746         dsl_sync_task_t *dst;
3747         spa_t *spa;
3748         int error;
3749         minor_t minor = 0;
3750 
3751         if (cleanup_fd != -1) {
3752                 /* Currently we only support cleanup-on-exit of tempholds. */
3753                 if (!temphold)
3754                         return (EINVAL);
3755                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3756                 if (error)
3757                         return (error);
3758         }
3759 
3760         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3761 
3762         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3763 
3764         error = spa_open(dsname, &spa, FTAG);
3765         if (error) {
3766                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3767                 if (cleanup_fd != -1)
3768                         zfs_onexit_fd_rele(cleanup_fd);
3769                 return (error);
3770         }
3771 
3772         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3773         ha->htag = htag;
3774         ha->snapname = snapname;
3775         ha->recursive = recursive;
3776         ha->temphold = temphold;
3777 
3778         if (recursive) {
3779                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3780                     ha, DS_FIND_CHILDREN);
3781         } else {
3782                 error = dsl_dataset_user_hold_one(dsname, ha);
3783         }
3784         if (error == 0)
3785                 error = dsl_sync_task_group_wait(ha->dstg);
3786 
3787         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3788             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3789                 dsl_dataset_t *ds = dst->dst_arg1;
3790 
3791                 if (dst->dst_err) {
3792                         dsl_dataset_name(ds, ha->failed);
3793                         *strchr(ha->failed, '@') = '\0';
3794                 } else if (error == 0 && minor != 0 && temphold) {
3795                         /*
3796                          * If this hold is to be released upon process exit,
3797                          * register that action now.
3798                          */
3799                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3800                 }
3801                 dsl_dataset_rele(ds, ha->dstg);
3802         }
3803 
3804         if (error == 0 && recursive && !ha->gotone)
3805                 error = ENOENT;
3806 
3807         if (error)
3808                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3809 
3810         dsl_sync_task_group_destroy(ha->dstg);
3811 
3812         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3813         spa_close(spa, FTAG);
3814         if (cleanup_fd != -1)
3815                 zfs_onexit_fd_rele(cleanup_fd);
3816         return (error);
3817 }
3818 
3819 struct dsl_ds_releasearg {
3820         dsl_dataset_t *ds;
3821         const char *htag;
3822         boolean_t own;          /* do we own or just hold ds? */
3823 };
3824 
3825 static int
3826 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3827     boolean_t *might_destroy)
3828 {
3829         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3830         uint64_t zapobj;
3831         uint64_t tmp;
3832         int error;
3833 
3834         *might_destroy = B_FALSE;
3835 
3836         mutex_enter(&ds->ds_lock);
3837         zapobj = ds->ds_phys->ds_userrefs_obj;
3838         if (zapobj == 0) {
3839                 /* The tag can't possibly exist */
3840                 mutex_exit(&ds->ds_lock);
3841                 return (ESRCH);
3842         }
3843 
3844         /* Make sure the tag exists */
3845         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3846         if (error) {
3847                 mutex_exit(&ds->ds_lock);
3848                 if (error == ENOENT)
3849                         error = ESRCH;
3850                 return (error);
3851         }
3852 
3853         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3854             DS_IS_DEFER_DESTROY(ds))
3855                 *might_destroy = B_TRUE;
3856 
3857         mutex_exit(&ds->ds_lock);
3858         return (0);
3859 }
3860 
3861 static int
3862 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3863 {
3864         struct dsl_ds_releasearg *ra = arg1;
3865         dsl_dataset_t *ds = ra->ds;
3866         boolean_t might_destroy;
3867         int error;
3868 
3869         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3870                 return (ENOTSUP);
3871 
3872         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3873         if (error)
3874                 return (error);
3875 
3876         if (might_destroy) {
3877                 struct dsl_ds_destroyarg dsda = {0};
3878 
3879                 if (dmu_tx_is_syncing(tx)) {
3880                         /*
3881                          * If we're not prepared to remove the snapshot,
3882                          * we can't allow the release to happen right now.
3883                          */
3884                         if (!ra->own)
3885                                 return (EBUSY);
3886                 }
3887                 dsda.ds = ds;
3888                 dsda.releasing = B_TRUE;
3889                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
3890         }
3891 
3892         return (0);
3893 }
3894 
3895 static void
3896 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3897 {
3898         struct dsl_ds_releasearg *ra = arg1;
3899         dsl_dataset_t *ds = ra->ds;
3900         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3901         objset_t *mos = dp->dp_meta_objset;
3902         uint64_t zapobj;
3903         uint64_t dsobj = ds->ds_object;
3904         uint64_t refs;
3905         int error;
3906 
3907         mutex_enter(&ds->ds_lock);
3908         ds->ds_userrefs--;
3909         refs = ds->ds_userrefs;
3910         mutex_exit(&ds->ds_lock);
3911         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3912         VERIFY(error == 0 || error == ENOENT);
3913         zapobj = ds->ds_phys->ds_userrefs_obj;
3914         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3915         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3916             DS_IS_DEFER_DESTROY(ds)) {
3917                 struct dsl_ds_destroyarg dsda = {0};
3918 
3919                 ASSERT(ra->own);
3920                 dsda.ds = ds;
3921                 dsda.releasing = B_TRUE;
3922                 /* We already did the destroy_check */
3923                 dsl_dataset_destroy_sync(&dsda, tag, tx);
3924         }
3925 
3926         spa_history_log_internal(LOG_DS_USER_RELEASE,
3927             dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3928             ra->htag, (longlong_t)refs, dsobj);
3929 }
3930 
3931 static int
3932 dsl_dataset_user_release_one(const char *dsname, void *arg)
3933 {
3934         struct dsl_ds_holdarg *ha = arg;
3935         struct dsl_ds_releasearg *ra;
3936         dsl_dataset_t *ds;
3937         int error;
3938         void *dtag = ha->dstg;
3939         char *name;
3940         boolean_t own = B_FALSE;
3941         boolean_t might_destroy;
3942 
3943         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3944         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3945         error = dsl_dataset_hold(name, dtag, &ds);
3946         strfree(name);
3947         if (error == ENOENT && ha->recursive)
3948                 return (0);
3949         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3950         if (error)
3951                 return (error);
3952 
3953         ha->gotone = B_TRUE;
3954 
3955         ASSERT(dsl_dataset_is_snapshot(ds));
3956 
3957         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3958         if (error) {
3959                 dsl_dataset_rele(ds, dtag);
3960                 return (error);
3961         }
3962 
3963         if (might_destroy) {
3964 #ifdef _KERNEL
3965                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3966                 error = zfs_unmount_snap(name, NULL);
3967                 strfree(name);
3968                 if (error) {
3969                         dsl_dataset_rele(ds, dtag);
3970                         return (error);
3971                 }
3972 #endif
3973                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3974                         dsl_dataset_rele(ds, dtag);
3975                         return (EBUSY);
3976                 } else {
3977                         own = B_TRUE;
3978                         dsl_dataset_make_exclusive(ds, dtag);
3979                 }
3980         }
3981 
3982         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
3983         ra->ds = ds;
3984         ra->htag = ha->htag;
3985         ra->own = own;
3986         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
3987             dsl_dataset_user_release_sync, ra, dtag, 0);
3988 
3989         return (0);
3990 }
3991 
3992 int
3993 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
3994     boolean_t recursive)
3995 {
3996         struct dsl_ds_holdarg *ha;
3997         dsl_sync_task_t *dst;
3998         spa_t *spa;
3999         int error;
4000 
4001 top:
4002         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4003 
4004         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4005 
4006         error = spa_open(dsname, &spa, FTAG);
4007         if (error) {
4008                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4009                 return (error);
4010         }
4011 
4012         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4013         ha->htag = htag;
4014         ha->snapname = snapname;
4015         ha->recursive = recursive;
4016         if (recursive) {
4017                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4018                     ha, DS_FIND_CHILDREN);
4019         } else {
4020                 error = dsl_dataset_user_release_one(dsname, ha);
4021         }
4022         if (error == 0)
4023                 error = dsl_sync_task_group_wait(ha->dstg);
4024 
4025         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4026             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4027                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
4028                 dsl_dataset_t *ds = ra->ds;
4029 
4030                 if (dst->dst_err)
4031                         dsl_dataset_name(ds, ha->failed);
4032 
4033                 if (ra->own)
4034                         dsl_dataset_disown(ds, ha->dstg);
4035                 else
4036                         dsl_dataset_rele(ds, ha->dstg);
4037 
4038                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4039         }
4040 
4041         if (error == 0 && recursive && !ha->gotone)
4042                 error = ENOENT;
4043 
4044         if (error && error != EBUSY)
4045                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4046 
4047         dsl_sync_task_group_destroy(ha->dstg);
4048         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4049         spa_close(spa, FTAG);
4050 
4051         /*
4052          * We can get EBUSY if we were racing with deferred destroy and
4053          * dsl_dataset_user_release_check() hadn't done the necessary
4054          * open context setup.  We can also get EBUSY if we're racing
4055          * with destroy and that thread is the ds_owner.  Either way
4056          * the busy condition should be transient, and we should retry
4057          * the release operation.
4058          */
4059         if (error == EBUSY)
4060                 goto top;
4061 
4062         return (error);
4063 }
4064 
4065 /*
4066  * Called at spa_load time (with retry == B_FALSE) to release a stale
4067  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4068  */
4069 int
4070 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4071     boolean_t retry)
4072 {
4073         dsl_dataset_t *ds;
4074         char *snap;
4075         char *name;
4076         int namelen;
4077         int error;
4078 
4079         do {
4080                 rw_enter(&dp->dp_config_rwlock, RW_READER);
4081                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4082                 rw_exit(&dp->dp_config_rwlock);
4083                 if (error)
4084                         return (error);
4085                 namelen = dsl_dataset_namelen(ds)+1;
4086                 name = kmem_alloc(namelen, KM_SLEEP);
4087                 dsl_dataset_name(ds, name);
4088                 dsl_dataset_rele(ds, FTAG);
4089 
4090                 snap = strchr(name, '@');
4091                 *snap = '\0';
4092                 ++snap;
4093                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4094                 kmem_free(name, namelen);
4095 
4096                 /*
4097                  * The object can't have been destroyed because we have a hold,
4098                  * but it might have been renamed, resulting in ENOENT.  Retry
4099                  * if we've been requested to do so.
4100                  *
4101                  * It would be nice if we could use the dsobj all the way
4102                  * through and avoid ENOENT entirely.  But we might need to
4103                  * unmount the snapshot, and there's currently no way to lookup
4104                  * a vfsp using a ZFS object id.
4105                  */
4106         } while ((error == ENOENT) && retry);
4107 
4108         return (error);
4109 }
4110 
4111 int
4112 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4113 {
4114         dsl_dataset_t *ds;
4115         int err;
4116 
4117         err = dsl_dataset_hold(dsname, FTAG, &ds);
4118         if (err)
4119                 return (err);
4120 
4121         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4122         if (ds->ds_phys->ds_userrefs_obj != 0) {
4123                 zap_attribute_t *za;
4124                 zap_cursor_t zc;
4125 
4126                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4127                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4128                     ds->ds_phys->ds_userrefs_obj);
4129                     zap_cursor_retrieve(&zc, za) == 0;
4130                     zap_cursor_advance(&zc)) {
4131                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4132                             za->za_first_integer));
4133                 }
4134                 zap_cursor_fini(&zc);
4135                 kmem_free(za, sizeof (zap_attribute_t));
4136         }
4137         dsl_dataset_rele(ds, FTAG);
4138         return (0);
4139 }
4140 
4141 /*
4142  * Note, this function is used as the callback for dmu_objset_find().  We
4143  * always return 0 so that we will continue to find and process
4144  * inconsistent datasets, even if we encounter an error trying to
4145  * process one of them.
4146  */
4147 /* ARGSUSED */
4148 int
4149 dsl_destroy_inconsistent(const char *dsname, void *arg)
4150 {
4151         dsl_dataset_t *ds;
4152 
4153         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4154                 if (DS_IS_INCONSISTENT(ds))
4155                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4156                 else
4157                         dsl_dataset_disown(ds, FTAG);
4158         }
4159         return (0);
4160 }
4161 
4162 /*
4163  * Return (in *usedp) the amount of space written in new that is not
4164  * present in oldsnap.  New may be a snapshot or the head.  Old must be
4165  * a snapshot before new, in new's filesystem (or its origin).  If not then
4166  * fail and return EINVAL.
4167  *
4168  * The written space is calculated by considering two components:  First, we
4169  * ignore any freed space, and calculate the written as new's used space
4170  * minus old's used space.  Next, we add in the amount of space that was freed
4171  * between the two snapshots, thus reducing new's used space relative to old's.
4172  * Specifically, this is the space that was born before old->ds_creation_txg,
4173  * and freed before new (ie. on new's deadlist or a previous deadlist).
4174  *
4175  * space freed                         [---------------------]
4176  * snapshots                       ---O-------O--------O-------O------
4177  *                                         oldsnap            new
4178  */
4179 int
4180 dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4181     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4182 {
4183         int err = 0;
4184         uint64_t snapobj;
4185         dsl_pool_t *dp = new->ds_dir->dd_pool;
4186 
4187         *usedp = 0;
4188         *usedp += new->ds_phys->ds_referenced_bytes;
4189         *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4190 
4191         *compp = 0;
4192         *compp += new->ds_phys->ds_compressed_bytes;
4193         *compp -= oldsnap->ds_phys->ds_compressed_bytes;
4194 
4195         *uncompp = 0;
4196         *uncompp += new->ds_phys->ds_uncompressed_bytes;
4197         *uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4198 
4199         rw_enter(&dp->dp_config_rwlock, RW_READER);
4200         snapobj = new->ds_object;
4201         while (snapobj != oldsnap->ds_object) {
4202                 dsl_dataset_t *snap;
4203                 uint64_t used, comp, uncomp;
4204 
4205                 if (snapobj == new->ds_object) {
4206                         snap = new;
4207                 } else {
4208                         err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4209                         if (err != 0)
4210                                 break;
4211                 }
4212 
4213                 if (snap->ds_phys->ds_prev_snap_txg ==
4214                     oldsnap->ds_phys->ds_creation_txg) {
4215                         /*
4216                          * The blocks in the deadlist can not be born after
4217                          * ds_prev_snap_txg, so get the whole deadlist space,
4218                          * which is more efficient (especially for old-format
4219                          * deadlists).  Unfortunately the deadlist code
4220                          * doesn't have enough information to make this
4221                          * optimization itself.
4222                          */
4223                         dsl_deadlist_space(&snap->ds_deadlist,
4224                             &used, &comp, &uncomp);
4225                 } else {
4226                         dsl_deadlist_space_range(&snap->ds_deadlist,
4227                             0, oldsnap->ds_phys->ds_creation_txg,
4228                             &used, &comp, &uncomp);
4229                 }
4230                 *usedp += used;
4231                 *compp += comp;
4232                 *uncompp += uncomp;
4233 
4234                 /*
4235                  * If we get to the beginning of the chain of snapshots
4236                  * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4237                  * was not a snapshot of/before new.
4238                  */
4239                 snapobj = snap->ds_phys->ds_prev_snap_obj;
4240                 if (snap != new)
4241                         dsl_dataset_rele(snap, FTAG);
4242                 if (snapobj == 0) {
4243                         err = EINVAL;
4244                         break;
4245                 }
4246 
4247         }
4248         rw_exit(&dp->dp_config_rwlock);
4249         return (err);
4250 }
4251 
4252 /*
4253  * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4254  * lastsnap, and all snapshots in between are deleted.
4255  *
4256  * blocks that would be freed            [---------------------------]
4257  * snapshots                       ---O-------O--------O-------O--------O
4258  *                                        firstsnap        lastsnap
4259  *
4260  * This is the set of blocks that were born after the snap before firstsnap,
4261  * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4262  * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4263  * We calculate this by iterating over the relevant deadlists (from the snap
4264  * after lastsnap, backward to the snap after firstsnap), summing up the
4265  * space on the deadlist that was born after the snap before firstsnap.
4266  */
4267 int
4268 dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4269     dsl_dataset_t *lastsnap,
4270     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4271 {
4272         int err = 0;
4273         uint64_t snapobj;
4274         dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4275 
4276         ASSERT(dsl_dataset_is_snapshot(firstsnap));
4277         ASSERT(dsl_dataset_is_snapshot(lastsnap));
4278 
4279         /*
4280          * Check that the snapshots are in the same dsl_dir, and firstsnap
4281          * is before lastsnap.
4282          */
4283         if (firstsnap->ds_dir != lastsnap->ds_dir ||
4284             firstsnap->ds_phys->ds_creation_txg >
4285             lastsnap->ds_phys->ds_creation_txg)
4286                 return (EINVAL);
4287 
4288         *usedp = *compp = *uncompp = 0;
4289 
4290         rw_enter(&dp->dp_config_rwlock, RW_READER);
4291         snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4292         while (snapobj != firstsnap->ds_object) {
4293                 dsl_dataset_t *ds;
4294                 uint64_t used, comp, uncomp;
4295 
4296                 err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4297                 if (err != 0)
4298                         break;
4299 
4300                 dsl_deadlist_space_range(&ds->ds_deadlist,
4301                     firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4302                     &used, &comp, &uncomp);
4303                 *usedp += used;
4304                 *compp += comp;
4305                 *uncompp += uncomp;
4306 
4307                 snapobj = ds->ds_phys->ds_prev_snap_obj;
4308                 ASSERT3U(snapobj, !=, 0);
4309                 dsl_dataset_rele(ds, FTAG);
4310         }
4311         rw_exit(&dp->dp_config_rwlock);
4312         return (err);
4313 }