1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  */
  25 
  26 #include <sys/dsl_pool.h>
  27 #include <sys/dsl_dataset.h>
  28 #include <sys/dsl_prop.h>
  29 #include <sys/dsl_dir.h>
  30 #include <sys/dsl_synctask.h>
  31 #include <sys/dsl_scan.h>
  32 #include <sys/dnode.h>
  33 #include <sys/dmu_tx.h>
  34 #include <sys/dmu_objset.h>
  35 #include <sys/arc.h>
  36 #include <sys/zap.h>
  37 #include <sys/zio.h>
  38 #include <sys/zfs_context.h>
  39 #include <sys/fs/zfs.h>
  40 #include <sys/zfs_znode.h>
  41 #include <sys/spa_impl.h>
  42 #include <sys/dsl_deadlist.h>
  43 #include <sys/bptree.h>
  44 #include <sys/zfeature.h>
  45 
  46 int zfs_no_write_throttle = 0;
  47 int zfs_write_limit_shift = 3;                  /* 1/8th of physical memory */
  48 int zfs_txg_synctime_ms = 1000;         /* target millisecs to sync a txg */
  49 
  50 uint64_t zfs_write_limit_min = 32 << 20;  /* min write limit is 32MB */
  51 uint64_t zfs_write_limit_max = 0;               /* max data payload per txg */
  52 uint64_t zfs_write_limit_inflated = 0;
  53 uint64_t zfs_write_limit_override = 0;
  54 
  55 kmutex_t zfs_write_limit_lock;
  56 
  57 static pgcnt_t old_physmem = 0;
  58 
  59 int
  60 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
  61 {
  62         uint64_t obj;
  63         int err;
  64 
  65         err = zap_lookup(dp->dp_meta_objset,
  66             dp->dp_root_dir->dd_phys->dd_child_dir_zapobj,
  67             name, sizeof (obj), 1, &obj);
  68         if (err)
  69                 return (err);
  70 
  71         return (dsl_dir_open_obj(dp, obj, name, dp, ddp));
  72 }
  73 
  74 static dsl_pool_t *
  75 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
  76 {
  77         dsl_pool_t *dp;
  78         blkptr_t *bp = spa_get_rootblkptr(spa);
  79 
  80         dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
  81         dp->dp_spa = spa;
  82         dp->dp_meta_rootbp = *bp;
  83         rw_init(&dp->dp_config_rwlock, NULL, RW_DEFAULT, NULL);
  84         dp->dp_write_limit = zfs_write_limit_min;
  85         txg_init(dp, txg);
  86 
  87         txg_list_create(&dp->dp_dirty_datasets,
  88             offsetof(dsl_dataset_t, ds_dirty_link));
  89         txg_list_create(&dp->dp_dirty_dirs,
  90             offsetof(dsl_dir_t, dd_dirty_link));
  91         txg_list_create(&dp->dp_sync_tasks,
  92             offsetof(dsl_sync_task_group_t, dstg_node));
  93         list_create(&dp->dp_synced_datasets, sizeof (dsl_dataset_t),
  94             offsetof(dsl_dataset_t, ds_synced_link));
  95 
  96         mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
  97 
  98         dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
  99             1, 4, 0);
 100 
 101         return (dp);
 102 }
 103 
 104 int
 105 dsl_pool_init(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
 106 {
 107         int err;
 108         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
 109 
 110         err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp,
 111             &dp->dp_meta_objset);
 112         if (err != 0)
 113                 dsl_pool_close(dp);
 114         else
 115                 *dpp = dp;
 116 
 117         return (err);
 118 }
 119 
 120 int
 121 dsl_pool_open(dsl_pool_t *dp)
 122 {
 123         int err;
 124         dsl_dir_t *dd;
 125         dsl_dataset_t *ds;
 126         uint64_t obj;
 127 
 128         ASSERT(!dmu_objset_is_dirty_anywhere(dp->dp_meta_objset));
 129 
 130         rw_enter(&dp->dp_config_rwlock, RW_WRITER);
 131         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 132             DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
 133             &dp->dp_root_dir_obj);
 134         if (err)
 135                 goto out;
 136 
 137         err = dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
 138             NULL, dp, &dp->dp_root_dir);
 139         if (err)
 140                 goto out;
 141 
 142         err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
 143         if (err)
 144                 goto out;
 145 
 146         if (spa_version(dp->dp_spa) >= SPA_VERSION_ORIGIN) {
 147                 err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
 148                 if (err)
 149                         goto out;
 150                 err = dsl_dataset_hold_obj(dp, dd->dd_phys->dd_head_dataset_obj,
 151                     FTAG, &ds);
 152                 if (err == 0) {
 153                         err = dsl_dataset_hold_obj(dp,
 154                             ds->ds_phys->ds_prev_snap_obj, dp,
 155                             &dp->dp_origin_snap);
 156                         dsl_dataset_rele(ds, FTAG);
 157                 }
 158                 dsl_dir_close(dd, dp);
 159                 if (err)
 160                         goto out;
 161         }
 162 
 163         if (spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
 164                 err = dsl_pool_open_special_dir(dp, FREE_DIR_NAME,
 165                     &dp->dp_free_dir);
 166                 if (err)
 167                         goto out;
 168 
 169                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 170                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj);
 171                 if (err)
 172                         goto out;
 173                 VERIFY3U(0, ==, bpobj_open(&dp->dp_free_bpobj,
 174                     dp->dp_meta_objset, obj));
 175         }
 176 
 177         if (spa_feature_is_active(dp->dp_spa,
 178             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
 179                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 180                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
 181                     &dp->dp_bptree_obj);
 182                 if (err != 0)
 183                         goto out;
 184         }
 185 
 186         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 187             DMU_POOL_TMP_USERREFS, sizeof (uint64_t), 1,
 188             &dp->dp_tmp_userrefs_obj);
 189         if (err == ENOENT)
 190                 err = 0;
 191         if (err)
 192                 goto out;
 193 
 194         err = dsl_scan_init(dp, dp->dp_tx.tx_open_txg);
 195 
 196 out:
 197         rw_exit(&dp->dp_config_rwlock);
 198         return (err);
 199 }
 200 
 201 void
 202 dsl_pool_close(dsl_pool_t *dp)
 203 {
 204         /* drop our references from dsl_pool_open() */
 205 
 206         /*
 207          * Since we held the origin_snap from "syncing" context (which
 208          * includes pool-opening context), it actually only got a "ref"
 209          * and not a hold, so just drop that here.
 210          */
 211         if (dp->dp_origin_snap)
 212                 dsl_dataset_drop_ref(dp->dp_origin_snap, dp);
 213         if (dp->dp_mos_dir)
 214                 dsl_dir_close(dp->dp_mos_dir, dp);
 215         if (dp->dp_free_dir)
 216                 dsl_dir_close(dp->dp_free_dir, dp);
 217         if (dp->dp_root_dir)
 218                 dsl_dir_close(dp->dp_root_dir, dp);
 219 
 220         bpobj_close(&dp->dp_free_bpobj);
 221 
 222         /* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
 223         if (dp->dp_meta_objset)
 224                 dmu_objset_evict(dp->dp_meta_objset);
 225 
 226         txg_list_destroy(&dp->dp_dirty_datasets);
 227         txg_list_destroy(&dp->dp_sync_tasks);
 228         txg_list_destroy(&dp->dp_dirty_dirs);
 229         list_destroy(&dp->dp_synced_datasets);
 230 
 231         arc_flush(dp->dp_spa);
 232         txg_fini(dp);
 233         dsl_scan_fini(dp);
 234         rw_destroy(&dp->dp_config_rwlock);
 235         mutex_destroy(&dp->dp_lock);
 236         taskq_destroy(dp->dp_vnrele_taskq);
 237         if (dp->dp_blkstats)
 238                 kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
 239         kmem_free(dp, sizeof (dsl_pool_t));
 240 }
 241 
 242 dsl_pool_t *
 243 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
 244 {
 245         int err;
 246         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
 247         dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
 248         objset_t *os;
 249         dsl_dataset_t *ds;
 250         uint64_t obj;
 251 
 252         /* create and open the MOS (meta-objset) */
 253         dp->dp_meta_objset = dmu_objset_create_impl(spa,
 254             NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx);
 255 
 256         /* create the pool directory */
 257         err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 258             DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
 259         ASSERT3U(err, ==, 0);
 260 
 261         /* Initialize scan structures */
 262         VERIFY3U(0, ==, dsl_scan_init(dp, txg));
 263 
 264         /* create and open the root dir */
 265         dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
 266         VERIFY(0 == dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
 267             NULL, dp, &dp->dp_root_dir));
 268 
 269         /* create and open the meta-objset dir */
 270         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
 271         VERIFY(0 == dsl_pool_open_special_dir(dp,
 272             MOS_DIR_NAME, &dp->dp_mos_dir));
 273 
 274         if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
 275                 /* create and open the free dir */
 276                 (void) dsl_dir_create_sync(dp, dp->dp_root_dir,
 277                     FREE_DIR_NAME, tx);
 278                 VERIFY(0 == dsl_pool_open_special_dir(dp,
 279                     FREE_DIR_NAME, &dp->dp_free_dir));
 280 
 281                 /* create and open the free_bplist */
 282                 obj = bpobj_alloc(dp->dp_meta_objset, SPA_MAXBLOCKSIZE, tx);
 283                 VERIFY(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 284                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx) == 0);
 285                 VERIFY3U(0, ==, bpobj_open(&dp->dp_free_bpobj,
 286                     dp->dp_meta_objset, obj));
 287         }
 288 
 289         if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
 290                 dsl_pool_create_origin(dp, tx);
 291 
 292         /* create the root dataset */
 293         obj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
 294 
 295         /* create the root objset */
 296         VERIFY(0 == dsl_dataset_hold_obj(dp, obj, FTAG, &ds));
 297         os = dmu_objset_create_impl(dp->dp_spa, ds,
 298             dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
 299 #ifdef _KERNEL
 300         zfs_create_fs(os, kcred, zplprops, tx);
 301 #endif
 302         dsl_dataset_rele(ds, FTAG);
 303 
 304         dmu_tx_commit(tx);
 305 
 306         return (dp);
 307 }
 308 
 309 static int
 310 deadlist_enqueue_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
 311 {
 312         dsl_deadlist_t *dl = arg;
 313         dsl_pool_t *dp = dmu_objset_pool(dl->dl_os);
 314         rw_enter(&dp->dp_config_rwlock, RW_READER);
 315         dsl_deadlist_insert(dl, bp, tx);
 316         rw_exit(&dp->dp_config_rwlock);
 317         return (0);
 318 }
 319 
 320 void
 321 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
 322 {
 323         zio_t *zio;
 324         dmu_tx_t *tx;
 325         dsl_dir_t *dd;
 326         dsl_dataset_t *ds;
 327         dsl_sync_task_group_t *dstg;
 328         objset_t *mos = dp->dp_meta_objset;
 329         hrtime_t start, write_time;
 330         uint64_t data_written;
 331         int err;
 332 
 333         /*
 334          * We need to copy dp_space_towrite() before doing
 335          * dsl_sync_task_group_sync(), because
 336          * dsl_dataset_snapshot_reserve_space() will increase
 337          * dp_space_towrite but not actually write anything.
 338          */
 339         data_written = dp->dp_space_towrite[txg & TXG_MASK];
 340 
 341         tx = dmu_tx_create_assigned(dp, txg);
 342 
 343         dp->dp_read_overhead = 0;
 344         start = gethrtime();
 345 
 346         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
 347         while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
 348                 /*
 349                  * We must not sync any non-MOS datasets twice, because
 350                  * we may have taken a snapshot of them.  However, we
 351                  * may sync newly-created datasets on pass 2.
 352                  */
 353                 ASSERT(!list_link_active(&ds->ds_synced_link));
 354                 list_insert_tail(&dp->dp_synced_datasets, ds);
 355                 dsl_dataset_sync(ds, zio, tx);
 356         }
 357         DTRACE_PROBE(pool_sync__1setup);
 358         err = zio_wait(zio);
 359 
 360         write_time = gethrtime() - start;
 361         ASSERT(err == 0);
 362         DTRACE_PROBE(pool_sync__2rootzio);
 363 
 364         for (ds = list_head(&dp->dp_synced_datasets); ds;
 365             ds = list_next(&dp->dp_synced_datasets, ds))
 366                 dmu_objset_do_userquota_updates(ds->ds_objset, tx);
 367 
 368         /*
 369          * Sync the datasets again to push out the changes due to
 370          * userspace updates.  This must be done before we process the
 371          * sync tasks, because that could cause a snapshot of a dataset
 372          * whose ds_bp will be rewritten when we do this 2nd sync.
 373          */
 374         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
 375         while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
 376                 ASSERT(list_link_active(&ds->ds_synced_link));
 377                 dmu_buf_rele(ds->ds_dbuf, ds);
 378                 dsl_dataset_sync(ds, zio, tx);
 379         }
 380         err = zio_wait(zio);
 381 
 382         /*
 383          * Move dead blocks from the pending deadlist to the on-disk
 384          * deadlist.
 385          */
 386         for (ds = list_head(&dp->dp_synced_datasets); ds;
 387             ds = list_next(&dp->dp_synced_datasets, ds)) {
 388                 bplist_iterate(&ds->ds_pending_deadlist,
 389                     deadlist_enqueue_cb, &ds->ds_deadlist, tx);
 390         }
 391 
 392         while (dstg = txg_list_remove(&dp->dp_sync_tasks, txg)) {
 393                 /*
 394                  * No more sync tasks should have been added while we
 395                  * were syncing.
 396                  */
 397                 ASSERT(spa_sync_pass(dp->dp_spa) == 1);
 398                 dsl_sync_task_group_sync(dstg, tx);
 399         }
 400         DTRACE_PROBE(pool_sync__3task);
 401 
 402         start = gethrtime();
 403         while (dd = txg_list_remove(&dp->dp_dirty_dirs, txg))
 404                 dsl_dir_sync(dd, tx);
 405         write_time += gethrtime() - start;
 406 
 407         start = gethrtime();
 408         if (list_head(&mos->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
 409             list_head(&mos->os_free_dnodes[txg & TXG_MASK]) != NULL) {
 410                 zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
 411                 dmu_objset_sync(mos, zio, tx);
 412                 err = zio_wait(zio);
 413                 ASSERT(err == 0);
 414                 dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
 415                 spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
 416         }
 417         write_time += gethrtime() - start;
 418         DTRACE_PROBE2(pool_sync__4io, hrtime_t, write_time,
 419             hrtime_t, dp->dp_read_overhead);
 420         write_time -= dp->dp_read_overhead;
 421 
 422         dmu_tx_commit(tx);
 423 
 424         dp->dp_space_towrite[txg & TXG_MASK] = 0;
 425         ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
 426 
 427         /*
 428          * If the write limit max has not been explicitly set, set it
 429          * to a fraction of available physical memory (default 1/8th).
 430          * Note that we must inflate the limit because the spa
 431          * inflates write sizes to account for data replication.
 432          * Check this each sync phase to catch changing memory size.
 433          */
 434         if (physmem != old_physmem && zfs_write_limit_shift) {
 435                 mutex_enter(&zfs_write_limit_lock);
 436                 old_physmem = physmem;
 437                 zfs_write_limit_max = ptob(physmem) >> zfs_write_limit_shift;
 438                 zfs_write_limit_inflated = MAX(zfs_write_limit_min,
 439                     spa_get_asize(dp->dp_spa, zfs_write_limit_max));
 440                 mutex_exit(&zfs_write_limit_lock);
 441         }
 442 
 443         /*
 444          * Attempt to keep the sync time consistent by adjusting the
 445          * amount of write traffic allowed into each transaction group.
 446          * Weight the throughput calculation towards the current value:
 447          *      thru = 3/4 old_thru + 1/4 new_thru
 448          *
 449          * Note: write_time is in nanosecs, so write_time/MICROSEC
 450          * yields millisecs
 451          */
 452         ASSERT(zfs_write_limit_min > 0);
 453         if (data_written > zfs_write_limit_min / 8 && write_time > MICROSEC) {
 454                 uint64_t throughput = data_written / (write_time / MICROSEC);
 455 
 456                 if (dp->dp_throughput)
 457                         dp->dp_throughput = throughput / 4 +
 458                             3 * dp->dp_throughput / 4;
 459                 else
 460                         dp->dp_throughput = throughput;
 461                 dp->dp_write_limit = MIN(zfs_write_limit_inflated,
 462                     MAX(zfs_write_limit_min,
 463                     dp->dp_throughput * zfs_txg_synctime_ms));
 464         }
 465 }
 466 
 467 void
 468 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
 469 {
 470         dsl_dataset_t *ds;
 471         objset_t *os;
 472 
 473         while (ds = list_head(&dp->dp_synced_datasets)) {
 474                 list_remove(&dp->dp_synced_datasets, ds);
 475                 os = ds->ds_objset;
 476                 zil_clean(os->os_zil, txg);
 477                 ASSERT(!dmu_objset_is_dirty(os, txg));
 478                 dmu_buf_rele(ds->ds_dbuf, ds);
 479         }
 480         ASSERT(!dmu_objset_is_dirty(dp->dp_meta_objset, txg));
 481 }
 482 
 483 /*
 484  * TRUE if the current thread is the tx_sync_thread or if we
 485  * are being called from SPA context during pool initialization.
 486  */
 487 int
 488 dsl_pool_sync_context(dsl_pool_t *dp)
 489 {
 490         return (curthread == dp->dp_tx.tx_sync_thread ||
 491             spa_is_initializing(dp->dp_spa));
 492 }
 493 
 494 uint64_t
 495 dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
 496 {
 497         uint64_t space, resv;
 498 
 499         /*
 500          * Reserve about 1.6% (1/64), or at least 32MB, for allocation
 501          * efficiency.
 502          * XXX The intent log is not accounted for, so it must fit
 503          * within this slop.
 504          *
 505          * If we're trying to assess whether it's OK to do a free,
 506          * cut the reservation in half to allow forward progress
 507          * (e.g. make it possible to rm(1) files from a full pool).
 508          */
 509         space = spa_get_dspace(dp->dp_spa);
 510         resv = MAX(space >> 6, SPA_MINDEVSIZE >> 1);
 511         if (netfree)
 512                 resv >>= 1;
 513 
 514         return (space - resv);
 515 }
 516 
 517 int
 518 dsl_pool_tempreserve_space(dsl_pool_t *dp, uint64_t space, dmu_tx_t *tx)
 519 {
 520         uint64_t reserved = 0;
 521         uint64_t write_limit = (zfs_write_limit_override ?
 522             zfs_write_limit_override : dp->dp_write_limit);
 523 
 524         if (zfs_no_write_throttle) {
 525                 atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK],
 526                     space);
 527                 return (0);
 528         }
 529 
 530         /*
 531          * Check to see if we have exceeded the maximum allowed IO for
 532          * this transaction group.  We can do this without locks since
 533          * a little slop here is ok.  Note that we do the reserved check
 534          * with only half the requested reserve: this is because the
 535          * reserve requests are worst-case, and we really don't want to
 536          * throttle based off of worst-case estimates.
 537          */
 538         if (write_limit > 0) {
 539                 reserved = dp->dp_space_towrite[tx->tx_txg & TXG_MASK]
 540                     + dp->dp_tempreserved[tx->tx_txg & TXG_MASK] / 2;
 541 
 542                 if (reserved && reserved > write_limit)
 543                         return (ERESTART);
 544         }
 545 
 546         atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], space);
 547 
 548         /*
 549          * If this transaction group is over 7/8ths capacity, delay
 550          * the caller 1 clock tick.  This will slow down the "fill"
 551          * rate until the sync process can catch up with us.
 552          */
 553         if (reserved && reserved > (write_limit - (write_limit >> 3)))
 554                 txg_delay(dp, tx->tx_txg, 1);
 555 
 556         return (0);
 557 }
 558 
 559 void
 560 dsl_pool_tempreserve_clear(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
 561 {
 562         ASSERT(dp->dp_tempreserved[tx->tx_txg & TXG_MASK] >= space);
 563         atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], -space);
 564 }
 565 
 566 void
 567 dsl_pool_memory_pressure(dsl_pool_t *dp)
 568 {
 569         uint64_t space_inuse = 0;
 570         int i;
 571 
 572         if (dp->dp_write_limit == zfs_write_limit_min)
 573                 return;
 574 
 575         for (i = 0; i < TXG_SIZE; i++) {
 576                 space_inuse += dp->dp_space_towrite[i];
 577                 space_inuse += dp->dp_tempreserved[i];
 578         }
 579         dp->dp_write_limit = MAX(zfs_write_limit_min,
 580             MIN(dp->dp_write_limit, space_inuse / 4));
 581 }
 582 
 583 void
 584 dsl_pool_willuse_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
 585 {
 586         if (space > 0) {
 587                 mutex_enter(&dp->dp_lock);
 588                 dp->dp_space_towrite[tx->tx_txg & TXG_MASK] += space;
 589                 mutex_exit(&dp->dp_lock);
 590         }
 591 }
 592 
 593 /* ARGSUSED */
 594 static int
 595 upgrade_clones_cb(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
 596 {
 597         dmu_tx_t *tx = arg;
 598         dsl_dataset_t *ds, *prev = NULL;
 599         int err;
 600         dsl_pool_t *dp = spa_get_dsl(spa);
 601 
 602         err = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
 603         if (err)
 604                 return (err);
 605 
 606         while (ds->ds_phys->ds_prev_snap_obj != 0) {
 607                 err = dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
 608                     FTAG, &prev);
 609                 if (err) {
 610                         dsl_dataset_rele(ds, FTAG);
 611                         return (err);
 612                 }
 613 
 614                 if (prev->ds_phys->ds_next_snap_obj != ds->ds_object)
 615                         break;
 616                 dsl_dataset_rele(ds, FTAG);
 617                 ds = prev;
 618                 prev = NULL;
 619         }
 620 
 621         if (prev == NULL) {
 622                 prev = dp->dp_origin_snap;
 623 
 624                 /*
 625                  * The $ORIGIN can't have any data, or the accounting
 626                  * will be wrong.
 627                  */
 628                 ASSERT(prev->ds_phys->ds_bp.blk_birth == 0);
 629 
 630                 /* The origin doesn't get attached to itself */
 631                 if (ds->ds_object == prev->ds_object) {
 632                         dsl_dataset_rele(ds, FTAG);
 633                         return (0);
 634                 }
 635 
 636                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
 637                 ds->ds_phys->ds_prev_snap_obj = prev->ds_object;
 638                 ds->ds_phys->ds_prev_snap_txg = prev->ds_phys->ds_creation_txg;
 639 
 640                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
 641                 ds->ds_dir->dd_phys->dd_origin_obj = prev->ds_object;
 642 
 643                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
 644                 prev->ds_phys->ds_num_children++;
 645 
 646                 if (ds->ds_phys->ds_next_snap_obj == 0) {
 647                         ASSERT(ds->ds_prev == NULL);
 648                         VERIFY(0 == dsl_dataset_hold_obj(dp,
 649                             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
 650                 }
 651         }
 652 
 653         ASSERT(ds->ds_dir->dd_phys->dd_origin_obj == prev->ds_object);
 654         ASSERT(ds->ds_phys->ds_prev_snap_obj == prev->ds_object);
 655 
 656         if (prev->ds_phys->ds_next_clones_obj == 0) {
 657                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
 658                 prev->ds_phys->ds_next_clones_obj =
 659                     zap_create(dp->dp_meta_objset,
 660                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
 661         }
 662         VERIFY(0 == zap_add_int(dp->dp_meta_objset,
 663             prev->ds_phys->ds_next_clones_obj, ds->ds_object, tx));
 664 
 665         dsl_dataset_rele(ds, FTAG);
 666         if (prev != dp->dp_origin_snap)
 667                 dsl_dataset_rele(prev, FTAG);
 668         return (0);
 669 }
 670 
 671 void
 672 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
 673 {
 674         ASSERT(dmu_tx_is_syncing(tx));
 675         ASSERT(dp->dp_origin_snap != NULL);
 676 
 677         VERIFY3U(0, ==, dmu_objset_find_spa(dp->dp_spa, NULL, upgrade_clones_cb,
 678             tx, DS_FIND_CHILDREN));
 679 }
 680 
 681 /* ARGSUSED */
 682 static int
 683 upgrade_dir_clones_cb(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
 684 {
 685         dmu_tx_t *tx = arg;
 686         dsl_dataset_t *ds;
 687         dsl_pool_t *dp = spa_get_dsl(spa);
 688         objset_t *mos = dp->dp_meta_objset;
 689 
 690         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 691 
 692         if (ds->ds_dir->dd_phys->dd_origin_obj) {
 693                 dsl_dataset_t *origin;
 694 
 695                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
 696                     ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &origin));
 697 
 698                 if (origin->ds_dir->dd_phys->dd_clones == 0) {
 699                         dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
 700                         origin->ds_dir->dd_phys->dd_clones = zap_create(mos,
 701                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
 702                 }
 703 
 704                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
 705                     origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
 706 
 707                 dsl_dataset_rele(origin, FTAG);
 708         }
 709 
 710         dsl_dataset_rele(ds, FTAG);
 711         return (0);
 712 }
 713 
 714 void
 715 dsl_pool_upgrade_dir_clones(dsl_pool_t *dp, dmu_tx_t *tx)
 716 {
 717         ASSERT(dmu_tx_is_syncing(tx));
 718         uint64_t obj;
 719 
 720         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, FREE_DIR_NAME, tx);
 721         VERIFY(0 == dsl_pool_open_special_dir(dp,
 722             FREE_DIR_NAME, &dp->dp_free_dir));
 723 
 724         /*
 725          * We can't use bpobj_alloc(), because spa_version() still
 726          * returns the old version, and we need a new-version bpobj with
 727          * subobj support.  So call dmu_object_alloc() directly.
 728          */
 729         obj = dmu_object_alloc(dp->dp_meta_objset, DMU_OT_BPOBJ,
 730             SPA_MAXBLOCKSIZE, DMU_OT_BPOBJ_HDR, sizeof (bpobj_phys_t), tx);
 731         VERIFY3U(0, ==, zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
 732             DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
 733         VERIFY3U(0, ==, bpobj_open(&dp->dp_free_bpobj,
 734             dp->dp_meta_objset, obj));
 735 
 736         VERIFY3U(0, ==, dmu_objset_find_spa(dp->dp_spa, NULL,
 737             upgrade_dir_clones_cb, tx, DS_FIND_CHILDREN));
 738 }
 739 
 740 void
 741 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
 742 {
 743         uint64_t dsobj;
 744         dsl_dataset_t *ds;
 745 
 746         ASSERT(dmu_tx_is_syncing(tx));
 747         ASSERT(dp->dp_origin_snap == NULL);
 748 
 749         /* create the origin dir, ds, & snap-ds */
 750         rw_enter(&dp->dp_config_rwlock, RW_WRITER);
 751         dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
 752             NULL, 0, kcred, tx);
 753         VERIFY(0 == dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
 754         dsl_dataset_snapshot_sync(ds, ORIGIN_DIR_NAME, tx);
 755         VERIFY(0 == dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
 756             dp, &dp->dp_origin_snap));
 757         dsl_dataset_rele(ds, FTAG);
 758         rw_exit(&dp->dp_config_rwlock);
 759 }
 760 
 761 taskq_t *
 762 dsl_pool_vnrele_taskq(dsl_pool_t *dp)
 763 {
 764         return (dp->dp_vnrele_taskq);
 765 }
 766 
 767 /*
 768  * Walk through the pool-wide zap object of temporary snapshot user holds
 769  * and release them.
 770  */
 771 void
 772 dsl_pool_clean_tmp_userrefs(dsl_pool_t *dp)
 773 {
 774         zap_attribute_t za;
 775         zap_cursor_t zc;
 776         objset_t *mos = dp->dp_meta_objset;
 777         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
 778 
 779         if (zapobj == 0)
 780                 return;
 781         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
 782 
 783         for (zap_cursor_init(&zc, mos, zapobj);
 784             zap_cursor_retrieve(&zc, &za) == 0;
 785             zap_cursor_advance(&zc)) {
 786                 char *htag;
 787                 uint64_t dsobj;
 788 
 789                 htag = strchr(za.za_name, '-');
 790                 *htag = '\0';
 791                 ++htag;
 792                 dsobj = strtonum(za.za_name, NULL);
 793                 (void) dsl_dataset_user_release_tmp(dp, dsobj, htag, B_FALSE);
 794         }
 795         zap_cursor_fini(&zc);
 796 }
 797 
 798 /*
 799  * Create the pool-wide zap object for storing temporary snapshot holds.
 800  */
 801 void
 802 dsl_pool_user_hold_create_obj(dsl_pool_t *dp, dmu_tx_t *tx)
 803 {
 804         objset_t *mos = dp->dp_meta_objset;
 805 
 806         ASSERT(dp->dp_tmp_userrefs_obj == 0);
 807         ASSERT(dmu_tx_is_syncing(tx));
 808 
 809         dp->dp_tmp_userrefs_obj = zap_create_link(mos, DMU_OT_USERREFS,
 810             DMU_POOL_DIRECTORY_OBJECT, DMU_POOL_TMP_USERREFS, tx);
 811 }
 812 
 813 static int
 814 dsl_pool_user_hold_rele_impl(dsl_pool_t *dp, uint64_t dsobj,
 815     const char *tag, uint64_t *now, dmu_tx_t *tx, boolean_t holding)
 816 {
 817         objset_t *mos = dp->dp_meta_objset;
 818         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
 819         char *name;
 820         int error;
 821 
 822         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
 823         ASSERT(dmu_tx_is_syncing(tx));
 824 
 825         /*
 826          * If the pool was created prior to SPA_VERSION_USERREFS, the
 827          * zap object for temporary holds might not exist yet.
 828          */
 829         if (zapobj == 0) {
 830                 if (holding) {
 831                         dsl_pool_user_hold_create_obj(dp, tx);
 832                         zapobj = dp->dp_tmp_userrefs_obj;
 833                 } else {
 834                         return (ENOENT);
 835                 }
 836         }
 837 
 838         name = kmem_asprintf("%llx-%s", (u_longlong_t)dsobj, tag);
 839         if (holding)
 840                 error = zap_add(mos, zapobj, name, 8, 1, now, tx);
 841         else
 842                 error = zap_remove(mos, zapobj, name, tx);
 843         strfree(name);
 844 
 845         return (error);
 846 }
 847 
 848 /*
 849  * Add a temporary hold for the given dataset object and tag.
 850  */
 851 int
 852 dsl_pool_user_hold(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
 853     uint64_t *now, dmu_tx_t *tx)
 854 {
 855         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, now, tx, B_TRUE));
 856 }
 857 
 858 /*
 859  * Release a temporary hold for the given dataset object and tag.
 860  */
 861 int
 862 dsl_pool_user_release(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
 863     dmu_tx_t *tx)
 864 {
 865         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, NULL,
 866             tx, B_FALSE));
 867 }