1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  */
  25 
  26 /* Portions Copyright 2010 Robert Milkowski */
  27 
  28 #include <sys/cred.h>
  29 #include <sys/zfs_context.h>
  30 #include <sys/dmu_objset.h>
  31 #include <sys/dsl_dir.h>
  32 #include <sys/dsl_dataset.h>
  33 #include <sys/dsl_prop.h>
  34 #include <sys/dsl_pool.h>
  35 #include <sys/dsl_synctask.h>
  36 #include <sys/dsl_deleg.h>
  37 #include <sys/dnode.h>
  38 #include <sys/dbuf.h>
  39 #include <sys/zvol.h>
  40 #include <sys/dmu_tx.h>
  41 #include <sys/zap.h>
  42 #include <sys/zil.h>
  43 #include <sys/dmu_impl.h>
  44 #include <sys/zfs_ioctl.h>
  45 #include <sys/sa.h>
  46 #include <sys/zfs_onexit.h>
  47 
  48 /*
  49  * Needed to close a window in dnode_move() that allows the objset to be freed
  50  * before it can be safely accessed.
  51  */
  52 krwlock_t os_lock;
  53 
  54 void
  55 dmu_objset_init(void)
  56 {
  57         rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
  58 }
  59 
  60 void
  61 dmu_objset_fini(void)
  62 {
  63         rw_destroy(&os_lock);
  64 }
  65 
  66 spa_t *
  67 dmu_objset_spa(objset_t *os)
  68 {
  69         return (os->os_spa);
  70 }
  71 
  72 zilog_t *
  73 dmu_objset_zil(objset_t *os)
  74 {
  75         return (os->os_zil);
  76 }
  77 
  78 dsl_pool_t *
  79 dmu_objset_pool(objset_t *os)
  80 {
  81         dsl_dataset_t *ds;
  82 
  83         if ((ds = os->os_dsl_dataset) != NULL && ds->ds_dir)
  84                 return (ds->ds_dir->dd_pool);
  85         else
  86                 return (spa_get_dsl(os->os_spa));
  87 }
  88 
  89 dsl_dataset_t *
  90 dmu_objset_ds(objset_t *os)
  91 {
  92         return (os->os_dsl_dataset);
  93 }
  94 
  95 dmu_objset_type_t
  96 dmu_objset_type(objset_t *os)
  97 {
  98         return (os->os_phys->os_type);
  99 }
 100 
 101 void
 102 dmu_objset_name(objset_t *os, char *buf)
 103 {
 104         dsl_dataset_name(os->os_dsl_dataset, buf);
 105 }
 106 
 107 uint64_t
 108 dmu_objset_id(objset_t *os)
 109 {
 110         dsl_dataset_t *ds = os->os_dsl_dataset;
 111 
 112         return (ds ? ds->ds_object : 0);
 113 }
 114 
 115 uint64_t
 116 dmu_objset_syncprop(objset_t *os)
 117 {
 118         return (os->os_sync);
 119 }
 120 
 121 uint64_t
 122 dmu_objset_logbias(objset_t *os)
 123 {
 124         return (os->os_logbias);
 125 }
 126 
 127 static void
 128 checksum_changed_cb(void *arg, uint64_t newval)
 129 {
 130         objset_t *os = arg;
 131 
 132         /*
 133          * Inheritance should have been done by now.
 134          */
 135         ASSERT(newval != ZIO_CHECKSUM_INHERIT);
 136 
 137         os->os_checksum = zio_checksum_select(newval, ZIO_CHECKSUM_ON_VALUE);
 138 }
 139 
 140 static void
 141 compression_changed_cb(void *arg, uint64_t newval)
 142 {
 143         objset_t *os = arg;
 144 
 145         /*
 146          * Inheritance and range checking should have been done by now.
 147          */
 148         ASSERT(newval != ZIO_COMPRESS_INHERIT);
 149 
 150         os->os_compress = zio_compress_select(newval, ZIO_COMPRESS_ON_VALUE);
 151 }
 152 
 153 static void
 154 copies_changed_cb(void *arg, uint64_t newval)
 155 {
 156         objset_t *os = arg;
 157 
 158         /*
 159          * Inheritance and range checking should have been done by now.
 160          */
 161         ASSERT(newval > 0);
 162         ASSERT(newval <= spa_max_replication(os->os_spa));
 163 
 164         os->os_copies = newval;
 165 }
 166 
 167 static void
 168 dedup_changed_cb(void *arg, uint64_t newval)
 169 {
 170         objset_t *os = arg;
 171         spa_t *spa = os->os_spa;
 172         enum zio_checksum checksum;
 173 
 174         /*
 175          * Inheritance should have been done by now.
 176          */
 177         ASSERT(newval != ZIO_CHECKSUM_INHERIT);
 178 
 179         checksum = zio_checksum_dedup_select(spa, newval, ZIO_CHECKSUM_OFF);
 180 
 181         os->os_dedup_checksum = checksum & ZIO_CHECKSUM_MASK;
 182         os->os_dedup_verify = !!(checksum & ZIO_CHECKSUM_VERIFY);
 183 }
 184 
 185 static void
 186 primary_cache_changed_cb(void *arg, uint64_t newval)
 187 {
 188         objset_t *os = arg;
 189 
 190         /*
 191          * Inheritance and range checking should have been done by now.
 192          */
 193         ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
 194             newval == ZFS_CACHE_METADATA);
 195 
 196         os->os_primary_cache = newval;
 197 }
 198 
 199 static void
 200 secondary_cache_changed_cb(void *arg, uint64_t newval)
 201 {
 202         objset_t *os = arg;
 203 
 204         /*
 205          * Inheritance and range checking should have been done by now.
 206          */
 207         ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
 208             newval == ZFS_CACHE_METADATA);
 209 
 210         os->os_secondary_cache = newval;
 211 }
 212 
 213 static void
 214 sync_changed_cb(void *arg, uint64_t newval)
 215 {
 216         objset_t *os = arg;
 217 
 218         /*
 219          * Inheritance and range checking should have been done by now.
 220          */
 221         ASSERT(newval == ZFS_SYNC_STANDARD || newval == ZFS_SYNC_ALWAYS ||
 222             newval == ZFS_SYNC_DISABLED);
 223 
 224         os->os_sync = newval;
 225         if (os->os_zil)
 226                 zil_set_sync(os->os_zil, newval);
 227 }
 228 
 229 static void
 230 logbias_changed_cb(void *arg, uint64_t newval)
 231 {
 232         objset_t *os = arg;
 233 
 234         ASSERT(newval == ZFS_LOGBIAS_LATENCY ||
 235             newval == ZFS_LOGBIAS_THROUGHPUT);
 236         os->os_logbias = newval;
 237         if (os->os_zil)
 238                 zil_set_logbias(os->os_zil, newval);
 239 }
 240 
 241 void
 242 dmu_objset_byteswap(void *buf, size_t size)
 243 {
 244         objset_phys_t *osp = buf;
 245 
 246         ASSERT(size == OBJSET_OLD_PHYS_SIZE || size == sizeof (objset_phys_t));
 247         dnode_byteswap(&osp->os_meta_dnode);
 248         byteswap_uint64_array(&osp->os_zil_header, sizeof (zil_header_t));
 249         osp->os_type = BSWAP_64(osp->os_type);
 250         osp->os_flags = BSWAP_64(osp->os_flags);
 251         if (size == sizeof (objset_phys_t)) {
 252                 dnode_byteswap(&osp->os_userused_dnode);
 253                 dnode_byteswap(&osp->os_groupused_dnode);
 254         }
 255 }
 256 
 257 int
 258 dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
 259     objset_t **osp)
 260 {
 261         objset_t *os;
 262         int i, err;
 263 
 264         ASSERT(ds == NULL || MUTEX_HELD(&ds->ds_opening_lock));
 265 
 266         os = kmem_zalloc(sizeof (objset_t), KM_SLEEP);
 267         os->os_dsl_dataset = ds;
 268         os->os_spa = spa;
 269         os->os_rootbp = bp;
 270         if (!BP_IS_HOLE(os->os_rootbp)) {
 271                 uint32_t aflags = ARC_WAIT;
 272                 zbookmark_t zb;
 273                 SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
 274                     ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
 275 
 276                 if (DMU_OS_IS_L2CACHEABLE(os))
 277                         aflags |= ARC_L2CACHE;
 278 
 279                 dprintf_bp(os->os_rootbp, "reading %s", "");
 280                 /*
 281                  * XXX when bprewrite scrub can change the bp,
 282                  * and this is called from dmu_objset_open_ds_os, the bp
 283                  * could change, and we'll need a lock.
 284                  */
 285                 err = dsl_read_nolock(NULL, spa, os->os_rootbp,
 286                     arc_getbuf_func, &os->os_phys_buf,
 287                     ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL, &aflags, &zb);
 288                 if (err) {
 289                         kmem_free(os, sizeof (objset_t));
 290                         /* convert checksum errors into IO errors */
 291                         if (err == ECKSUM)
 292                                 err = EIO;
 293                         return (err);
 294                 }
 295 
 296                 /* Increase the blocksize if we are permitted. */
 297                 if (spa_version(spa) >= SPA_VERSION_USERSPACE &&
 298                     arc_buf_size(os->os_phys_buf) < sizeof (objset_phys_t)) {
 299                         arc_buf_t *buf = arc_buf_alloc(spa,
 300                             sizeof (objset_phys_t), &os->os_phys_buf,
 301                             ARC_BUFC_METADATA);
 302                         bzero(buf->b_data, sizeof (objset_phys_t));
 303                         bcopy(os->os_phys_buf->b_data, buf->b_data,
 304                             arc_buf_size(os->os_phys_buf));
 305                         (void) arc_buf_remove_ref(os->os_phys_buf,
 306                             &os->os_phys_buf);
 307                         os->os_phys_buf = buf;
 308                 }
 309 
 310                 os->os_phys = os->os_phys_buf->b_data;
 311                 os->os_flags = os->os_phys->os_flags;
 312         } else {
 313                 int size = spa_version(spa) >= SPA_VERSION_USERSPACE ?
 314                     sizeof (objset_phys_t) : OBJSET_OLD_PHYS_SIZE;
 315                 os->os_phys_buf = arc_buf_alloc(spa, size,
 316                     &os->os_phys_buf, ARC_BUFC_METADATA);
 317                 os->os_phys = os->os_phys_buf->b_data;
 318                 bzero(os->os_phys, size);
 319         }
 320 
 321         /*
 322          * Note: the changed_cb will be called once before the register
 323          * func returns, thus changing the checksum/compression from the
 324          * default (fletcher2/off).  Snapshots don't need to know about
 325          * checksum/compression/copies.
 326          */
 327         if (ds) {
 328                 err = dsl_prop_register(ds, "primarycache",
 329                     primary_cache_changed_cb, os);
 330                 if (err == 0)
 331                         err = dsl_prop_register(ds, "secondarycache",
 332                             secondary_cache_changed_cb, os);
 333                 if (!dsl_dataset_is_snapshot(ds)) {
 334                         if (err == 0)
 335                                 err = dsl_prop_register(ds, "checksum",
 336                                     checksum_changed_cb, os);
 337                         if (err == 0)
 338                                 err = dsl_prop_register(ds, "compression",
 339                                     compression_changed_cb, os);
 340                         if (err == 0)
 341                                 err = dsl_prop_register(ds, "copies",
 342                                     copies_changed_cb, os);
 343                         if (err == 0)
 344                                 err = dsl_prop_register(ds, "dedup",
 345                                     dedup_changed_cb, os);
 346                         if (err == 0)
 347                                 err = dsl_prop_register(ds, "logbias",
 348                                     logbias_changed_cb, os);
 349                         if (err == 0)
 350                                 err = dsl_prop_register(ds, "sync",
 351                                     sync_changed_cb, os);
 352                 }
 353                 if (err) {
 354                         VERIFY(arc_buf_remove_ref(os->os_phys_buf,
 355                             &os->os_phys_buf) == 1);
 356                         kmem_free(os, sizeof (objset_t));
 357                         return (err);
 358                 }
 359         } else if (ds == NULL) {
 360                 /* It's the meta-objset. */
 361                 os->os_checksum = ZIO_CHECKSUM_FLETCHER_4;
 362                 os->os_compress = ZIO_COMPRESS_LZJB;
 363                 os->os_copies = spa_max_replication(spa);
 364                 os->os_dedup_checksum = ZIO_CHECKSUM_OFF;
 365                 os->os_dedup_verify = 0;
 366                 os->os_logbias = 0;
 367                 os->os_sync = 0;
 368                 os->os_primary_cache = ZFS_CACHE_ALL;
 369                 os->os_secondary_cache = ZFS_CACHE_ALL;
 370         }
 371 
 372         if (ds == NULL || !dsl_dataset_is_snapshot(ds))
 373                 os->os_zil_header = os->os_phys->os_zil_header;
 374         os->os_zil = zil_alloc(os, &os->os_zil_header);
 375 
 376         for (i = 0; i < TXG_SIZE; i++) {
 377                 list_create(&os->os_dirty_dnodes[i], sizeof (dnode_t),
 378                     offsetof(dnode_t, dn_dirty_link[i]));
 379                 list_create(&os->os_free_dnodes[i], sizeof (dnode_t),
 380                     offsetof(dnode_t, dn_dirty_link[i]));
 381         }
 382         list_create(&os->os_dnodes, sizeof (dnode_t),
 383             offsetof(dnode_t, dn_link));
 384         list_create(&os->os_downgraded_dbufs, sizeof (dmu_buf_impl_t),
 385             offsetof(dmu_buf_impl_t, db_link));
 386 
 387         mutex_init(&os->os_lock, NULL, MUTEX_DEFAULT, NULL);
 388         mutex_init(&os->os_obj_lock, NULL, MUTEX_DEFAULT, NULL);
 389         mutex_init(&os->os_user_ptr_lock, NULL, MUTEX_DEFAULT, NULL);
 390 
 391         DMU_META_DNODE(os) = dnode_special_open(os,
 392             &os->os_phys->os_meta_dnode, DMU_META_DNODE_OBJECT,
 393             &os->os_meta_dnode);
 394         if (arc_buf_size(os->os_phys_buf) >= sizeof (objset_phys_t)) {
 395                 DMU_USERUSED_DNODE(os) = dnode_special_open(os,
 396                     &os->os_phys->os_userused_dnode, DMU_USERUSED_OBJECT,
 397                     &os->os_userused_dnode);
 398                 DMU_GROUPUSED_DNODE(os) = dnode_special_open(os,
 399                     &os->os_phys->os_groupused_dnode, DMU_GROUPUSED_OBJECT,
 400                     &os->os_groupused_dnode);
 401         }
 402 
 403         /*
 404          * We should be the only thread trying to do this because we
 405          * have ds_opening_lock
 406          */
 407         if (ds) {
 408                 mutex_enter(&ds->ds_lock);
 409                 ASSERT(ds->ds_objset == NULL);
 410                 ds->ds_objset = os;
 411                 mutex_exit(&ds->ds_lock);
 412         }
 413 
 414         *osp = os;
 415         return (0);
 416 }
 417 
 418 int
 419 dmu_objset_from_ds(dsl_dataset_t *ds, objset_t **osp)
 420 {
 421         int err = 0;
 422 
 423         mutex_enter(&ds->ds_opening_lock);
 424         *osp = ds->ds_objset;
 425         if (*osp == NULL) {
 426                 err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
 427                     ds, dsl_dataset_get_blkptr(ds), osp);
 428         }
 429         mutex_exit(&ds->ds_opening_lock);
 430         return (err);
 431 }
 432 
 433 /* called from zpl */
 434 int
 435 dmu_objset_hold(const char *name, void *tag, objset_t **osp)
 436 {
 437         dsl_dataset_t *ds;
 438         int err;
 439 
 440         err = dsl_dataset_hold(name, tag, &ds);
 441         if (err)
 442                 return (err);
 443 
 444         err = dmu_objset_from_ds(ds, osp);
 445         if (err)
 446                 dsl_dataset_rele(ds, tag);
 447 
 448         return (err);
 449 }
 450 
 451 /* called from zpl */
 452 int
 453 dmu_objset_own(const char *name, dmu_objset_type_t type,
 454     boolean_t readonly, void *tag, objset_t **osp)
 455 {
 456         dsl_dataset_t *ds;
 457         int err;
 458 
 459         err = dsl_dataset_own(name, B_FALSE, tag, &ds);
 460         if (err)
 461                 return (err);
 462 
 463         err = dmu_objset_from_ds(ds, osp);
 464         if (err) {
 465                 dsl_dataset_disown(ds, tag);
 466         } else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
 467                 dmu_objset_disown(*osp, tag);
 468                 return (EINVAL);
 469         } else if (!readonly && dsl_dataset_is_snapshot(ds)) {
 470                 dmu_objset_disown(*osp, tag);
 471                 return (EROFS);
 472         }
 473         return (err);
 474 }
 475 
 476 void
 477 dmu_objset_rele(objset_t *os, void *tag)
 478 {
 479         dsl_dataset_rele(os->os_dsl_dataset, tag);
 480 }
 481 
 482 void
 483 dmu_objset_disown(objset_t *os, void *tag)
 484 {
 485         dsl_dataset_disown(os->os_dsl_dataset, tag);
 486 }
 487 
 488 int
 489 dmu_objset_evict_dbufs(objset_t *os)
 490 {
 491         dnode_t *dn;
 492 
 493         mutex_enter(&os->os_lock);
 494 
 495         /* process the mdn last, since the other dnodes have holds on it */
 496         list_remove(&os->os_dnodes, DMU_META_DNODE(os));
 497         list_insert_tail(&os->os_dnodes, DMU_META_DNODE(os));
 498 
 499         /*
 500          * Find the first dnode with holds.  We have to do this dance
 501          * because dnode_add_ref() only works if you already have a
 502          * hold.  If there are no holds then it has no dbufs so OK to
 503          * skip.
 504          */
 505         for (dn = list_head(&os->os_dnodes);
 506             dn && !dnode_add_ref(dn, FTAG);
 507             dn = list_next(&os->os_dnodes, dn))
 508                 continue;
 509 
 510         while (dn) {
 511                 dnode_t *next_dn = dn;
 512 
 513                 do {
 514                         next_dn = list_next(&os->os_dnodes, next_dn);
 515                 } while (next_dn && !dnode_add_ref(next_dn, FTAG));
 516 
 517                 mutex_exit(&os->os_lock);
 518                 dnode_evict_dbufs(dn);
 519                 dnode_rele(dn, FTAG);
 520                 mutex_enter(&os->os_lock);
 521                 dn = next_dn;
 522         }
 523         dn = list_head(&os->os_dnodes);
 524         mutex_exit(&os->os_lock);
 525         return (dn != DMU_META_DNODE(os));
 526 }
 527 
 528 void
 529 dmu_objset_evict(objset_t *os)
 530 {
 531         dsl_dataset_t *ds = os->os_dsl_dataset;
 532 
 533         for (int t = 0; t < TXG_SIZE; t++)
 534                 ASSERT(!dmu_objset_is_dirty(os, t));
 535 
 536         if (ds) {
 537                 if (!dsl_dataset_is_snapshot(ds)) {
 538                         VERIFY(0 == dsl_prop_unregister(ds, "checksum",
 539                             checksum_changed_cb, os));
 540                         VERIFY(0 == dsl_prop_unregister(ds, "compression",
 541                             compression_changed_cb, os));
 542                         VERIFY(0 == dsl_prop_unregister(ds, "copies",
 543                             copies_changed_cb, os));
 544                         VERIFY(0 == dsl_prop_unregister(ds, "dedup",
 545                             dedup_changed_cb, os));
 546                         VERIFY(0 == dsl_prop_unregister(ds, "logbias",
 547                             logbias_changed_cb, os));
 548                         VERIFY(0 == dsl_prop_unregister(ds, "sync",
 549                             sync_changed_cb, os));
 550                 }
 551                 VERIFY(0 == dsl_prop_unregister(ds, "primarycache",
 552                     primary_cache_changed_cb, os));
 553                 VERIFY(0 == dsl_prop_unregister(ds, "secondarycache",
 554                     secondary_cache_changed_cb, os));
 555         }
 556 
 557         if (os->os_sa)
 558                 sa_tear_down(os);
 559 
 560         /*
 561          * We should need only a single pass over the dnode list, since
 562          * nothing can be added to the list at this point.
 563          */
 564         (void) dmu_objset_evict_dbufs(os);
 565 
 566         dnode_special_close(&os->os_meta_dnode);
 567         if (DMU_USERUSED_DNODE(os)) {
 568                 dnode_special_close(&os->os_userused_dnode);
 569                 dnode_special_close(&os->os_groupused_dnode);
 570         }
 571         zil_free(os->os_zil);
 572 
 573         ASSERT3P(list_head(&os->os_dnodes), ==, NULL);
 574 
 575         VERIFY(arc_buf_remove_ref(os->os_phys_buf, &os->os_phys_buf) == 1);
 576 
 577         /*
 578          * This is a barrier to prevent the objset from going away in
 579          * dnode_move() until we can safely ensure that the objset is still in
 580          * use. We consider the objset valid before the barrier and invalid
 581          * after the barrier.
 582          */
 583         rw_enter(&os_lock, RW_READER);
 584         rw_exit(&os_lock);
 585 
 586         mutex_destroy(&os->os_lock);
 587         mutex_destroy(&os->os_obj_lock);
 588         mutex_destroy(&os->os_user_ptr_lock);
 589         kmem_free(os, sizeof (objset_t));
 590 }
 591 
 592 timestruc_t
 593 dmu_objset_snap_cmtime(objset_t *os)
 594 {
 595         return (dsl_dir_snap_cmtime(os->os_dsl_dataset->ds_dir));
 596 }
 597 
 598 /* called from dsl for meta-objset */
 599 objset_t *
 600 dmu_objset_create_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
 601     dmu_objset_type_t type, dmu_tx_t *tx)
 602 {
 603         objset_t *os;
 604         dnode_t *mdn;
 605 
 606         ASSERT(dmu_tx_is_syncing(tx));
 607         if (ds != NULL)
 608                 VERIFY(0 == dmu_objset_from_ds(ds, &os));
 609         else
 610                 VERIFY(0 == dmu_objset_open_impl(spa, NULL, bp, &os));
 611 
 612         mdn = DMU_META_DNODE(os);
 613 
 614         dnode_allocate(mdn, DMU_OT_DNODE, 1 << DNODE_BLOCK_SHIFT,
 615             DN_MAX_INDBLKSHIFT, DMU_OT_NONE, 0, tx);
 616 
 617         /*
 618          * We don't want to have to increase the meta-dnode's nlevels
 619          * later, because then we could do it in quescing context while
 620          * we are also accessing it in open context.
 621          *
 622          * This precaution is not necessary for the MOS (ds == NULL),
 623          * because the MOS is only updated in syncing context.
 624          * This is most fortunate: the MOS is the only objset that
 625          * needs to be synced multiple times as spa_sync() iterates
 626          * to convergence, so minimizing its dn_nlevels matters.
 627          */
 628         if (ds != NULL) {
 629                 int levels = 1;
 630 
 631                 /*
 632                  * Determine the number of levels necessary for the meta-dnode
 633                  * to contain DN_MAX_OBJECT dnodes.
 634                  */
 635                 while ((uint64_t)mdn->dn_nblkptr << (mdn->dn_datablkshift +
 636                     (levels - 1) * (mdn->dn_indblkshift - SPA_BLKPTRSHIFT)) <
 637                     DN_MAX_OBJECT * sizeof (dnode_phys_t))
 638                         levels++;
 639 
 640                 mdn->dn_next_nlevels[tx->tx_txg & TXG_MASK] =
 641                     mdn->dn_nlevels = levels;
 642         }
 643 
 644         ASSERT(type != DMU_OST_NONE);
 645         ASSERT(type != DMU_OST_ANY);
 646         ASSERT(type < DMU_OST_NUMTYPES);
 647         os->os_phys->os_type = type;
 648         if (dmu_objset_userused_enabled(os)) {
 649                 os->os_phys->os_flags |= OBJSET_FLAG_USERACCOUNTING_COMPLETE;
 650                 os->os_flags = os->os_phys->os_flags;
 651         }
 652 
 653         dsl_dataset_dirty(ds, tx);
 654 
 655         return (os);
 656 }
 657 
 658 struct oscarg {
 659         void (*userfunc)(objset_t *os, void *arg, cred_t *cr, dmu_tx_t *tx);
 660         void *userarg;
 661         dsl_dataset_t *clone_origin;
 662         const char *lastname;
 663         dmu_objset_type_t type;
 664         uint64_t flags;
 665         cred_t *cr;
 666 };
 667 
 668 /*ARGSUSED*/
 669 static int
 670 dmu_objset_create_check(void *arg1, void *arg2, dmu_tx_t *tx)
 671 {
 672         dsl_dir_t *dd = arg1;
 673         struct oscarg *oa = arg2;
 674         objset_t *mos = dd->dd_pool->dp_meta_objset;
 675         int err;
 676         uint64_t ddobj;
 677 
 678         err = zap_lookup(mos, dd->dd_phys->dd_child_dir_zapobj,
 679             oa->lastname, sizeof (uint64_t), 1, &ddobj);
 680         if (err != ENOENT)
 681                 return (err ? err : EEXIST);
 682 
 683         if (oa->clone_origin != NULL) {
 684                 /* You can't clone across pools. */
 685                 if (oa->clone_origin->ds_dir->dd_pool != dd->dd_pool)
 686                         return (EXDEV);
 687 
 688                 /* You can only clone snapshots, not the head datasets. */
 689                 if (!dsl_dataset_is_snapshot(oa->clone_origin))
 690                         return (EINVAL);
 691         }
 692 
 693         return (0);
 694 }
 695 
 696 static void
 697 dmu_objset_create_sync(void *arg1, void *arg2, dmu_tx_t *tx)
 698 {
 699         dsl_dir_t *dd = arg1;
 700         spa_t *spa = dd->dd_pool->dp_spa;
 701         struct oscarg *oa = arg2;
 702         uint64_t obj;
 703         dsl_dataset_t *ds;
 704         blkptr_t *bp;
 705 
 706         ASSERT(dmu_tx_is_syncing(tx));
 707 
 708         obj = dsl_dataset_create_sync(dd, oa->lastname,
 709             oa->clone_origin, oa->flags, oa->cr, tx);
 710 
 711         VERIFY3U(0, ==, dsl_dataset_hold_obj(dd->dd_pool, obj, FTAG, &ds));
 712         bp = dsl_dataset_get_blkptr(ds);
 713         if (BP_IS_HOLE(bp)) {
 714                 objset_t *os =
 715                     dmu_objset_create_impl(spa, ds, bp, oa->type, tx);
 716 
 717                 if (oa->userfunc)
 718                         oa->userfunc(os, oa->userarg, oa->cr, tx);
 719         }
 720 
 721         if (oa->clone_origin == NULL) {
 722                 spa_history_log_internal_ds(ds, "create", tx, "");
 723         } else {
 724                 char namebuf[MAXNAMELEN];
 725                 dsl_dataset_name(oa->clone_origin, namebuf);
 726                 spa_history_log_internal_ds(ds, "clone", tx,
 727                     "origin=%s (%llu)", namebuf, oa->clone_origin->ds_object);
 728         }
 729         dsl_dataset_rele(ds, FTAG);
 730 }
 731 
 732 int
 733 dmu_objset_create(const char *name, dmu_objset_type_t type, uint64_t flags,
 734     void (*func)(objset_t *os, void *arg, cred_t *cr, dmu_tx_t *tx), void *arg)
 735 {
 736         dsl_dir_t *pdd;
 737         const char *tail;
 738         int err = 0;
 739         struct oscarg oa = { 0 };
 740 
 741         ASSERT(strchr(name, '@') == NULL);
 742         err = dsl_dir_open(name, FTAG, &pdd, &tail);
 743         if (err)
 744                 return (err);
 745         if (tail == NULL) {
 746                 dsl_dir_close(pdd, FTAG);
 747                 return (EEXIST);
 748         }
 749 
 750         oa.userfunc = func;
 751         oa.userarg = arg;
 752         oa.lastname = tail;
 753         oa.type = type;
 754         oa.flags = flags;
 755         oa.cr = CRED();
 756 
 757         err = dsl_sync_task_do(pdd->dd_pool, dmu_objset_create_check,
 758             dmu_objset_create_sync, pdd, &oa, 5);
 759         dsl_dir_close(pdd, FTAG);
 760         return (err);
 761 }
 762 
 763 int
 764 dmu_objset_clone(const char *name, dsl_dataset_t *clone_origin, uint64_t flags)
 765 {
 766         dsl_dir_t *pdd;
 767         const char *tail;
 768         int err = 0;
 769         struct oscarg oa = { 0 };
 770 
 771         ASSERT(strchr(name, '@') == NULL);
 772         err = dsl_dir_open(name, FTAG, &pdd, &tail);
 773         if (err)
 774                 return (err);
 775         if (tail == NULL) {
 776                 dsl_dir_close(pdd, FTAG);
 777                 return (EEXIST);
 778         }
 779 
 780         oa.lastname = tail;
 781         oa.clone_origin = clone_origin;
 782         oa.flags = flags;
 783         oa.cr = CRED();
 784 
 785         err = dsl_sync_task_do(pdd->dd_pool, dmu_objset_create_check,
 786             dmu_objset_create_sync, pdd, &oa, 5);
 787         dsl_dir_close(pdd, FTAG);
 788         return (err);
 789 }
 790 
 791 int
 792 dmu_objset_destroy(const char *name, boolean_t defer)
 793 {
 794         dsl_dataset_t *ds;
 795         int error;
 796 
 797         error = dsl_dataset_own(name, B_TRUE, FTAG, &ds);
 798         if (error == 0) {
 799                 error = dsl_dataset_destroy(ds, FTAG, defer);
 800                 /* dsl_dataset_destroy() closes the ds. */
 801         }
 802 
 803         return (error);
 804 }
 805 
 806 typedef struct snapallarg {
 807         dsl_sync_task_group_t *saa_dstg;
 808         boolean_t saa_needsuspend;
 809         nvlist_t *saa_props;
 810 
 811         /* the following are used only if 'temporary' is set: */
 812         boolean_t saa_temporary;
 813         const char *saa_htag;
 814         struct dsl_ds_holdarg *saa_ha;
 815         dsl_dataset_t *saa_newds;
 816 } snapallarg_t;
 817 
 818 typedef struct snaponearg {
 819         const char *soa_longname; /* long snap name */
 820         const char *soa_snapname; /* short snap name */
 821         snapallarg_t *soa_saa;
 822 } snaponearg_t;
 823 
 824 static int
 825 snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
 826 {
 827         objset_t *os = arg1;
 828         snaponearg_t *soa = arg2;
 829         snapallarg_t *saa = soa->soa_saa;
 830         int error;
 831 
 832         /* The props have already been checked by zfs_check_userprops(). */
 833 
 834         error = dsl_dataset_snapshot_check(os->os_dsl_dataset,
 835             soa->soa_snapname, tx);
 836         if (error)
 837                 return (error);
 838 
 839         if (saa->saa_temporary) {
 840                 /*
 841                  * Ideally we would just call
 842                  * dsl_dataset_user_hold_check() and
 843                  * dsl_dataset_destroy_check() here.  However the
 844                  * dataset we want to hold and destroy is the snapshot
 845                  * that we just confirmed we can create, but it won't
 846                  * exist until after these checks are run.  Do any
 847                  * checks we can here and if more checks are added to
 848                  * those routines in the future, similar checks may be
 849                  * necessary here.
 850                  */
 851                 if (spa_version(os->os_spa) < SPA_VERSION_USERREFS)
 852                         return (ENOTSUP);
 853                 /*
 854                  * Not checking number of tags because the tag will be
 855                  * unique, as it will be the only tag.
 856                  */
 857                 if (strlen(saa->saa_htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
 858                         return (E2BIG);
 859 
 860                 saa->saa_ha = kmem_alloc(sizeof (struct dsl_ds_holdarg),
 861                     KM_SLEEP);
 862                 saa->saa_ha->temphold = B_TRUE;
 863                 saa->saa_ha->htag = saa->saa_htag;
 864         }
 865         return (error);
 866 }
 867 
 868 static void
 869 snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
 870 {
 871         objset_t *os = arg1;
 872         dsl_dataset_t *ds = os->os_dsl_dataset;
 873         snaponearg_t *soa = arg2;
 874         snapallarg_t *saa = soa->soa_saa;
 875 
 876         dsl_dataset_snapshot_sync(ds, soa->soa_snapname, tx);
 877 
 878         if (saa->saa_props != NULL) {
 879                 dsl_props_arg_t pa;
 880                 pa.pa_props = saa->saa_props;
 881                 pa.pa_source = ZPROP_SRC_LOCAL;
 882                 dsl_props_set_sync(ds->ds_prev, &pa, tx);
 883         }
 884 
 885         if (saa->saa_temporary) {
 886                 struct dsl_ds_destroyarg da;
 887 
 888                 dsl_dataset_user_hold_sync(ds->ds_prev, saa->saa_ha, tx);
 889                 kmem_free(saa->saa_ha, sizeof (struct dsl_ds_holdarg));
 890                 saa->saa_ha = NULL;
 891                 saa->saa_newds = ds->ds_prev;
 892 
 893                 da.ds = ds->ds_prev;
 894                 da.defer = B_TRUE;
 895                 dsl_dataset_destroy_sync(&da, FTAG, tx);
 896         }
 897 }
 898 
 899 static int
 900 snapshot_one_impl(const char *snapname, void *arg)
 901 {
 902         char fsname[MAXPATHLEN];
 903         snapallarg_t *saa = arg;
 904         snaponearg_t *soa;
 905         objset_t *os;
 906         int err;
 907 
 908         (void) strlcpy(fsname, snapname, sizeof (fsname));
 909         strchr(fsname, '@')[0] = '\0';
 910 
 911         err = dmu_objset_hold(fsname, saa, &os);
 912         if (err != 0)
 913                 return (err);
 914 
 915         /*
 916          * If the objset is in an inconsistent state (eg, in the process
 917          * of being destroyed), don't snapshot it.
 918          */
 919         if (os->os_dsl_dataset->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) {
 920                 dmu_objset_rele(os, saa);
 921                 return (EBUSY);
 922         }
 923 
 924         if (saa->saa_needsuspend) {
 925                 err = zil_suspend(dmu_objset_zil(os));
 926                 if (err) {
 927                         dmu_objset_rele(os, saa);
 928                         return (err);
 929                 }
 930         }
 931 
 932         soa = kmem_zalloc(sizeof (*soa), KM_SLEEP);
 933         soa->soa_saa = saa;
 934         soa->soa_longname = snapname;
 935         soa->soa_snapname = strchr(snapname, '@') + 1;
 936 
 937         dsl_sync_task_create(saa->saa_dstg, snapshot_check, snapshot_sync,
 938             os, soa, 3);
 939 
 940         return (0);
 941 }
 942 
 943 /*
 944  * The snapshots must all be in the same pool.
 945  */
 946 int
 947 dmu_objset_snapshot(nvlist_t *snaps, nvlist_t *props, nvlist_t *errors)
 948 {
 949         dsl_sync_task_t *dst;
 950         snapallarg_t saa = { 0 };
 951         spa_t *spa;
 952         int rv = 0;
 953         int err;
 954         nvpair_t *pair;
 955 
 956         pair = nvlist_next_nvpair(snaps, NULL);
 957         if (pair == NULL)
 958                 return (0);
 959 
 960         err = spa_open(nvpair_name(pair), &spa, FTAG);
 961         if (err)
 962                 return (err);
 963         saa.saa_dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 964         saa.saa_props = props;
 965         saa.saa_needsuspend = (spa_version(spa) < SPA_VERSION_FAST_SNAP);
 966 
 967         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
 968             pair = nvlist_next_nvpair(snaps, pair)) {
 969                 err = snapshot_one_impl(nvpair_name(pair), &saa);
 970                 if (err != 0) {
 971                         if (errors != NULL) {
 972                                 fnvlist_add_int32(errors,
 973                                     nvpair_name(pair), err);
 974                         }
 975                         rv = err;
 976                 }
 977         }
 978 
 979         /*
 980          * If any call to snapshot_one_impl() failed, don't execute the
 981          * sync task.  The error handling code below will clean up the
 982          * snaponearg_t from any successful calls to
 983          * snapshot_one_impl().
 984          */
 985         if (rv == 0)
 986                 err = dsl_sync_task_group_wait(saa.saa_dstg);
 987         if (err != 0)
 988                 rv = err;
 989 
 990         for (dst = list_head(&saa.saa_dstg->dstg_tasks); dst;
 991             dst = list_next(&saa.saa_dstg->dstg_tasks, dst)) {
 992                 objset_t *os = dst->dst_arg1;
 993                 snaponearg_t *soa = dst->dst_arg2;
 994                 if (dst->dst_err != 0) {
 995                         if (errors != NULL) {
 996                                 fnvlist_add_int32(errors,
 997                                     soa->soa_longname, dst->dst_err);
 998                         }
 999                         rv = dst->dst_err;
1000                 }
1001 
1002                 if (saa.saa_needsuspend)
1003                         zil_resume(dmu_objset_zil(os));
1004                 dmu_objset_rele(os, &saa);
1005                 kmem_free(soa, sizeof (*soa));
1006         }
1007 
1008         dsl_sync_task_group_destroy(saa.saa_dstg);
1009         spa_close(spa, FTAG);
1010         return (rv);
1011 }
1012 
1013 int
1014 dmu_objset_snapshot_one(const char *fsname, const char *snapname)
1015 {
1016         int err;
1017         char *longsnap = kmem_asprintf("%s@%s", fsname, snapname);
1018         nvlist_t *snaps = fnvlist_alloc();
1019 
1020         fnvlist_add_boolean(snaps, longsnap);
1021         err = dmu_objset_snapshot(snaps, NULL, NULL);
1022         fnvlist_free(snaps);
1023         strfree(longsnap);
1024         return (err);
1025 }
1026 
1027 int
1028 dmu_objset_snapshot_tmp(const char *snapname, const char *tag, int cleanup_fd)
1029 {
1030         dsl_sync_task_t *dst;
1031         snapallarg_t saa = { 0 };
1032         spa_t *spa;
1033         minor_t minor;
1034         int err;
1035 
1036         err = spa_open(snapname, &spa, FTAG);
1037         if (err)
1038                 return (err);
1039         saa.saa_dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
1040         saa.saa_htag = tag;
1041         saa.saa_needsuspend = (spa_version(spa) < SPA_VERSION_FAST_SNAP);
1042         saa.saa_temporary = B_TRUE;
1043 
1044         if (cleanup_fd < 0) {
1045                 spa_close(spa, FTAG);
1046                 return (EINVAL);
1047         }
1048         if ((err = zfs_onexit_fd_hold(cleanup_fd, &minor)) != 0) {
1049                 spa_close(spa, FTAG);
1050                 return (err);
1051         }
1052 
1053         err = snapshot_one_impl(snapname, &saa);
1054 
1055         if (err == 0)
1056                 err = dsl_sync_task_group_wait(saa.saa_dstg);
1057 
1058         for (dst = list_head(&saa.saa_dstg->dstg_tasks); dst;
1059             dst = list_next(&saa.saa_dstg->dstg_tasks, dst)) {
1060                 objset_t *os = dst->dst_arg1;
1061                 dsl_register_onexit_hold_cleanup(saa.saa_newds, tag, minor);
1062                 if (saa.saa_needsuspend)
1063                         zil_resume(dmu_objset_zil(os));
1064                 dmu_objset_rele(os, &saa);
1065         }
1066 
1067         zfs_onexit_fd_rele(cleanup_fd);
1068         dsl_sync_task_group_destroy(saa.saa_dstg);
1069         spa_close(spa, FTAG);
1070         return (err);
1071 }
1072 
1073 
1074 static void
1075 dmu_objset_sync_dnodes(list_t *list, list_t *newlist, dmu_tx_t *tx)
1076 {
1077         dnode_t *dn;
1078 
1079         while (dn = list_head(list)) {
1080                 ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
1081                 ASSERT(dn->dn_dbuf->db_data_pending);
1082                 /*
1083                  * Initialize dn_zio outside dnode_sync() because the
1084                  * meta-dnode needs to set it ouside dnode_sync().
1085                  */
1086                 dn->dn_zio = dn->dn_dbuf->db_data_pending->dr_zio;
1087                 ASSERT(dn->dn_zio);
1088 
1089                 ASSERT3U(dn->dn_nlevels, <=, DN_MAX_LEVELS);
1090                 list_remove(list, dn);
1091 
1092                 if (newlist) {
1093                         (void) dnode_add_ref(dn, newlist);
1094                         list_insert_tail(newlist, dn);
1095                 }
1096 
1097                 dnode_sync(dn, tx);
1098         }
1099 }
1100 
1101 /* ARGSUSED */
1102 static void
1103 dmu_objset_write_ready(zio_t *zio, arc_buf_t *abuf, void *arg)
1104 {
1105         blkptr_t *bp = zio->io_bp;
1106         objset_t *os = arg;
1107         dnode_phys_t *dnp = &os->os_phys->os_meta_dnode;
1108 
1109         ASSERT(bp == os->os_rootbp);
1110         ASSERT(BP_GET_TYPE(bp) == DMU_OT_OBJSET);
1111         ASSERT(BP_GET_LEVEL(bp) == 0);
1112 
1113         /*
1114          * Update rootbp fill count: it should be the number of objects
1115          * allocated in the object set (not counting the "special"
1116          * objects that are stored in the objset_phys_t -- the meta
1117          * dnode and user/group accounting objects).
1118          */
1119         bp->blk_fill = 0;
1120         for (int i = 0; i < dnp->dn_nblkptr; i++)
1121                 bp->blk_fill += dnp->dn_blkptr[i].blk_fill;
1122 }
1123 
1124 /* ARGSUSED */
1125 static void
1126 dmu_objset_write_done(zio_t *zio, arc_buf_t *abuf, void *arg)
1127 {
1128         blkptr_t *bp = zio->io_bp;
1129         blkptr_t *bp_orig = &zio->io_bp_orig;
1130         objset_t *os = arg;
1131 
1132         if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
1133                 ASSERT(BP_EQUAL(bp, bp_orig));
1134         } else {
1135                 dsl_dataset_t *ds = os->os_dsl_dataset;
1136                 dmu_tx_t *tx = os->os_synctx;
1137 
1138                 (void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
1139                 dsl_dataset_block_born(ds, bp, tx);
1140         }
1141 }
1142 
1143 /* called from dsl */
1144 void
1145 dmu_objset_sync(objset_t *os, zio_t *pio, dmu_tx_t *tx)
1146 {
1147         int txgoff;
1148         zbookmark_t zb;
1149         zio_prop_t zp;
1150         zio_t *zio;
1151         list_t *list;
1152         list_t *newlist = NULL;
1153         dbuf_dirty_record_t *dr;
1154 
1155         dprintf_ds(os->os_dsl_dataset, "txg=%llu\n", tx->tx_txg);
1156 
1157         ASSERT(dmu_tx_is_syncing(tx));
1158         /* XXX the write_done callback should really give us the tx... */
1159         os->os_synctx = tx;
1160 
1161         if (os->os_dsl_dataset == NULL) {
1162                 /*
1163                  * This is the MOS.  If we have upgraded,
1164                  * spa_max_replication() could change, so reset
1165                  * os_copies here.
1166                  */
1167                 os->os_copies = spa_max_replication(os->os_spa);
1168         }
1169 
1170         /*
1171          * Create the root block IO
1172          */
1173         SET_BOOKMARK(&zb, os->os_dsl_dataset ?
1174             os->os_dsl_dataset->ds_object : DMU_META_OBJSET,
1175             ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
1176         VERIFY3U(0, ==, arc_release_bp(os->os_phys_buf, &os->os_phys_buf,
1177             os->os_rootbp, os->os_spa, &zb));
1178 
1179         dmu_write_policy(os, NULL, 0, 0, &zp);
1180 
1181         zio = arc_write(pio, os->os_spa, tx->tx_txg,
1182             os->os_rootbp, os->os_phys_buf, DMU_OS_IS_L2CACHEABLE(os), &zp,
1183             dmu_objset_write_ready, dmu_objset_write_done, os,
1184             ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
1185 
1186         /*
1187          * Sync special dnodes - the parent IO for the sync is the root block
1188          */
1189         DMU_META_DNODE(os)->dn_zio = zio;
1190         dnode_sync(DMU_META_DNODE(os), tx);
1191 
1192         os->os_phys->os_flags = os->os_flags;
1193 
1194         if (DMU_USERUSED_DNODE(os) &&
1195             DMU_USERUSED_DNODE(os)->dn_type != DMU_OT_NONE) {
1196                 DMU_USERUSED_DNODE(os)->dn_zio = zio;
1197                 dnode_sync(DMU_USERUSED_DNODE(os), tx);
1198                 DMU_GROUPUSED_DNODE(os)->dn_zio = zio;
1199                 dnode_sync(DMU_GROUPUSED_DNODE(os), tx);
1200         }
1201 
1202         txgoff = tx->tx_txg & TXG_MASK;
1203 
1204         if (dmu_objset_userused_enabled(os)) {
1205                 newlist = &os->os_synced_dnodes;
1206                 /*
1207                  * We must create the list here because it uses the
1208                  * dn_dirty_link[] of this txg.
1209                  */
1210                 list_create(newlist, sizeof (dnode_t),
1211                     offsetof(dnode_t, dn_dirty_link[txgoff]));
1212         }
1213 
1214         dmu_objset_sync_dnodes(&os->os_free_dnodes[txgoff], newlist, tx);
1215         dmu_objset_sync_dnodes(&os->os_dirty_dnodes[txgoff], newlist, tx);
1216 
1217         list = &DMU_META_DNODE(os)->dn_dirty_records[txgoff];
1218         while (dr = list_head(list)) {
1219                 ASSERT(dr->dr_dbuf->db_level == 0);
1220                 list_remove(list, dr);
1221                 if (dr->dr_zio)
1222                         zio_nowait(dr->dr_zio);
1223         }
1224         /*
1225          * Free intent log blocks up to this tx.
1226          */
1227         zil_sync(os->os_zil, tx);
1228         os->os_phys->os_zil_header = os->os_zil_header;
1229         zio_nowait(zio);
1230 }
1231 
1232 boolean_t
1233 dmu_objset_is_dirty(objset_t *os, uint64_t txg)
1234 {
1235         return (!list_is_empty(&os->os_dirty_dnodes[txg & TXG_MASK]) ||
1236             !list_is_empty(&os->os_free_dnodes[txg & TXG_MASK]));
1237 }
1238 
1239 static objset_used_cb_t *used_cbs[DMU_OST_NUMTYPES];
1240 
1241 void
1242 dmu_objset_register_type(dmu_objset_type_t ost, objset_used_cb_t *cb)
1243 {
1244         used_cbs[ost] = cb;
1245 }
1246 
1247 boolean_t
1248 dmu_objset_userused_enabled(objset_t *os)
1249 {
1250         return (spa_version(os->os_spa) >= SPA_VERSION_USERSPACE &&
1251             used_cbs[os->os_phys->os_type] != NULL &&
1252             DMU_USERUSED_DNODE(os) != NULL);
1253 }
1254 
1255 static void
1256 do_userquota_update(objset_t *os, uint64_t used, uint64_t flags,
1257     uint64_t user, uint64_t group, boolean_t subtract, dmu_tx_t *tx)
1258 {
1259         if ((flags & DNODE_FLAG_USERUSED_ACCOUNTED)) {
1260                 int64_t delta = DNODE_SIZE + used;
1261                 if (subtract)
1262                         delta = -delta;
1263                 VERIFY3U(0, ==, zap_increment_int(os, DMU_USERUSED_OBJECT,
1264                     user, delta, tx));
1265                 VERIFY3U(0, ==, zap_increment_int(os, DMU_GROUPUSED_OBJECT,
1266                     group, delta, tx));
1267         }
1268 }
1269 
1270 void
1271 dmu_objset_do_userquota_updates(objset_t *os, dmu_tx_t *tx)
1272 {
1273         dnode_t *dn;
1274         list_t *list = &os->os_synced_dnodes;
1275 
1276         ASSERT(list_head(list) == NULL || dmu_objset_userused_enabled(os));
1277 
1278         while (dn = list_head(list)) {
1279                 int flags;
1280                 ASSERT(!DMU_OBJECT_IS_SPECIAL(dn->dn_object));
1281                 ASSERT(dn->dn_phys->dn_type == DMU_OT_NONE ||
1282                     dn->dn_phys->dn_flags &
1283                     DNODE_FLAG_USERUSED_ACCOUNTED);
1284 
1285                 /* Allocate the user/groupused objects if necessary. */
1286                 if (DMU_USERUSED_DNODE(os)->dn_type == DMU_OT_NONE) {
1287                         VERIFY(0 == zap_create_claim(os,
1288                             DMU_USERUSED_OBJECT,
1289                             DMU_OT_USERGROUP_USED, DMU_OT_NONE, 0, tx));
1290                         VERIFY(0 == zap_create_claim(os,
1291                             DMU_GROUPUSED_OBJECT,
1292                             DMU_OT_USERGROUP_USED, DMU_OT_NONE, 0, tx));
1293                 }
1294 
1295                 /*
1296                  * We intentionally modify the zap object even if the
1297                  * net delta is zero.  Otherwise
1298                  * the block of the zap obj could be shared between
1299                  * datasets but need to be different between them after
1300                  * a bprewrite.
1301                  */
1302 
1303                 flags = dn->dn_id_flags;
1304                 ASSERT(flags);
1305                 if (flags & DN_ID_OLD_EXIST)  {
1306                         do_userquota_update(os, dn->dn_oldused, dn->dn_oldflags,
1307                             dn->dn_olduid, dn->dn_oldgid, B_TRUE, tx);
1308                 }
1309                 if (flags & DN_ID_NEW_EXIST) {
1310                         do_userquota_update(os, DN_USED_BYTES(dn->dn_phys),
1311                             dn->dn_phys->dn_flags,  dn->dn_newuid,
1312                             dn->dn_newgid, B_FALSE, tx);
1313                 }
1314 
1315                 mutex_enter(&dn->dn_mtx);
1316                 dn->dn_oldused = 0;
1317                 dn->dn_oldflags = 0;
1318                 if (dn->dn_id_flags & DN_ID_NEW_EXIST) {
1319                         dn->dn_olduid = dn->dn_newuid;
1320                         dn->dn_oldgid = dn->dn_newgid;
1321                         dn->dn_id_flags |= DN_ID_OLD_EXIST;
1322                         if (dn->dn_bonuslen == 0)
1323                                 dn->dn_id_flags |= DN_ID_CHKED_SPILL;
1324                         else
1325                                 dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1326                 }
1327                 dn->dn_id_flags &= ~(DN_ID_NEW_EXIST);
1328                 mutex_exit(&dn->dn_mtx);
1329 
1330                 list_remove(list, dn);
1331                 dnode_rele(dn, list);
1332         }
1333 }
1334 
1335 /*
1336  * Returns a pointer to data to find uid/gid from
1337  *
1338  * If a dirty record for transaction group that is syncing can't
1339  * be found then NULL is returned.  In the NULL case it is assumed
1340  * the uid/gid aren't changing.
1341  */
1342 static void *
1343 dmu_objset_userquota_find_data(dmu_buf_impl_t *db, dmu_tx_t *tx)
1344 {
1345         dbuf_dirty_record_t *dr, **drp;
1346         void *data;
1347 
1348         if (db->db_dirtycnt == 0)
1349                 return (db->db.db_data);  /* Nothing is changing */
1350 
1351         for (drp = &db->db_last_dirty; (dr = *drp) != NULL; drp = &dr->dr_next)
1352                 if (dr->dr_txg == tx->tx_txg)
1353                         break;
1354 
1355         if (dr == NULL) {
1356                 data = NULL;
1357         } else {
1358                 dnode_t *dn;
1359 
1360                 DB_DNODE_ENTER(dr->dr_dbuf);
1361                 dn = DB_DNODE(dr->dr_dbuf);
1362 
1363                 if (dn->dn_bonuslen == 0 &&
1364                     dr->dr_dbuf->db_blkid == DMU_SPILL_BLKID)
1365                         data = dr->dt.dl.dr_data->b_data;
1366                 else
1367                         data = dr->dt.dl.dr_data;
1368 
1369                 DB_DNODE_EXIT(dr->dr_dbuf);
1370         }
1371 
1372         return (data);
1373 }
1374 
1375 void
1376 dmu_objset_userquota_get_ids(dnode_t *dn, boolean_t before, dmu_tx_t *tx)
1377 {
1378         objset_t *os = dn->dn_objset;
1379         void *data = NULL;
1380         dmu_buf_impl_t *db = NULL;
1381         uint64_t *user, *group;
1382         int flags = dn->dn_id_flags;
1383         int error;
1384         boolean_t have_spill = B_FALSE;
1385 
1386         if (!dmu_objset_userused_enabled(dn->dn_objset))
1387                 return;
1388 
1389         if (before && (flags & (DN_ID_CHKED_BONUS|DN_ID_OLD_EXIST|
1390             DN_ID_CHKED_SPILL)))
1391                 return;
1392 
1393         if (before && dn->dn_bonuslen != 0)
1394                 data = DN_BONUS(dn->dn_phys);
1395         else if (!before && dn->dn_bonuslen != 0) {
1396                 if (dn->dn_bonus) {
1397                         db = dn->dn_bonus;
1398                         mutex_enter(&db->db_mtx);
1399                         data = dmu_objset_userquota_find_data(db, tx);
1400                 } else {
1401                         data = DN_BONUS(dn->dn_phys);
1402                 }
1403         } else if (dn->dn_bonuslen == 0 && dn->dn_bonustype == DMU_OT_SA) {
1404                         int rf = 0;
1405 
1406                         if (RW_WRITE_HELD(&dn->dn_struct_rwlock))
1407                                 rf |= DB_RF_HAVESTRUCT;
1408                         error = dmu_spill_hold_by_dnode(dn,
1409                             rf | DB_RF_MUST_SUCCEED,
1410                             FTAG, (dmu_buf_t **)&db);
1411                         ASSERT(error == 0);
1412                         mutex_enter(&db->db_mtx);
1413                         data = (before) ? db->db.db_data :
1414                             dmu_objset_userquota_find_data(db, tx);
1415                         have_spill = B_TRUE;
1416         } else {
1417                 mutex_enter(&dn->dn_mtx);
1418                 dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1419                 mutex_exit(&dn->dn_mtx);
1420                 return;
1421         }
1422 
1423         if (before) {
1424                 ASSERT(data);
1425                 user = &dn->dn_olduid;
1426                 group = &dn->dn_oldgid;
1427         } else if (data) {
1428                 user = &dn->dn_newuid;
1429                 group = &dn->dn_newgid;
1430         }
1431 
1432         /*
1433          * Must always call the callback in case the object
1434          * type has changed and that type isn't an object type to track
1435          */
1436         error = used_cbs[os->os_phys->os_type](dn->dn_bonustype, data,
1437             user, group);
1438 
1439         /*
1440          * Preserve existing uid/gid when the callback can't determine
1441          * what the new uid/gid are and the callback returned EEXIST.
1442          * The EEXIST error tells us to just use the existing uid/gid.
1443          * If we don't know what the old values are then just assign
1444          * them to 0, since that is a new file  being created.
1445          */
1446         if (!before && data == NULL && error == EEXIST) {
1447                 if (flags & DN_ID_OLD_EXIST) {
1448                         dn->dn_newuid = dn->dn_olduid;
1449                         dn->dn_newgid = dn->dn_oldgid;
1450                 } else {
1451                         dn->dn_newuid = 0;
1452                         dn->dn_newgid = 0;
1453                 }
1454                 error = 0;
1455         }
1456 
1457         if (db)
1458                 mutex_exit(&db->db_mtx);
1459 
1460         mutex_enter(&dn->dn_mtx);
1461         if (error == 0 && before)
1462                 dn->dn_id_flags |= DN_ID_OLD_EXIST;
1463         if (error == 0 && !before)
1464                 dn->dn_id_flags |= DN_ID_NEW_EXIST;
1465 
1466         if (have_spill) {
1467                 dn->dn_id_flags |= DN_ID_CHKED_SPILL;
1468         } else {
1469                 dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1470         }
1471         mutex_exit(&dn->dn_mtx);
1472         if (have_spill)
1473                 dmu_buf_rele((dmu_buf_t *)db, FTAG);
1474 }
1475 
1476 boolean_t
1477 dmu_objset_userspace_present(objset_t *os)
1478 {
1479         return (os->os_phys->os_flags &
1480             OBJSET_FLAG_USERACCOUNTING_COMPLETE);
1481 }
1482 
1483 int
1484 dmu_objset_userspace_upgrade(objset_t *os)
1485 {
1486         uint64_t obj;
1487         int err = 0;
1488 
1489         if (dmu_objset_userspace_present(os))
1490                 return (0);
1491         if (!dmu_objset_userused_enabled(os))
1492                 return (ENOTSUP);
1493         if (dmu_objset_is_snapshot(os))
1494                 return (EINVAL);
1495 
1496         /*
1497          * We simply need to mark every object dirty, so that it will be
1498          * synced out and now accounted.  If this is called
1499          * concurrently, or if we already did some work before crashing,
1500          * that's fine, since we track each object's accounted state
1501          * independently.
1502          */
1503 
1504         for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE, 0)) {
1505                 dmu_tx_t *tx;
1506                 dmu_buf_t *db;
1507                 int objerr;
1508 
1509                 if (issig(JUSTLOOKING) && issig(FORREAL))
1510                         return (EINTR);
1511 
1512                 objerr = dmu_bonus_hold(os, obj, FTAG, &db);
1513                 if (objerr)
1514                         continue;
1515                 tx = dmu_tx_create(os);
1516                 dmu_tx_hold_bonus(tx, obj);
1517                 objerr = dmu_tx_assign(tx, TXG_WAIT);
1518                 if (objerr) {
1519                         dmu_tx_abort(tx);
1520                         continue;
1521                 }
1522                 dmu_buf_will_dirty(db, tx);
1523                 dmu_buf_rele(db, FTAG);
1524                 dmu_tx_commit(tx);
1525         }
1526 
1527         os->os_flags |= OBJSET_FLAG_USERACCOUNTING_COMPLETE;
1528         txg_wait_synced(dmu_objset_pool(os), 0);
1529         return (0);
1530 }
1531 
1532 void
1533 dmu_objset_space(objset_t *os, uint64_t *refdbytesp, uint64_t *availbytesp,
1534     uint64_t *usedobjsp, uint64_t *availobjsp)
1535 {
1536         dsl_dataset_space(os->os_dsl_dataset, refdbytesp, availbytesp,
1537             usedobjsp, availobjsp);
1538 }
1539 
1540 uint64_t
1541 dmu_objset_fsid_guid(objset_t *os)
1542 {
1543         return (dsl_dataset_fsid_guid(os->os_dsl_dataset));
1544 }
1545 
1546 void
1547 dmu_objset_fast_stat(objset_t *os, dmu_objset_stats_t *stat)
1548 {
1549         stat->dds_type = os->os_phys->os_type;
1550         if (os->os_dsl_dataset)
1551                 dsl_dataset_fast_stat(os->os_dsl_dataset, stat);
1552 }
1553 
1554 void
1555 dmu_objset_stats(objset_t *os, nvlist_t *nv)
1556 {
1557         ASSERT(os->os_dsl_dataset ||
1558             os->os_phys->os_type == DMU_OST_META);
1559 
1560         if (os->os_dsl_dataset != NULL)
1561                 dsl_dataset_stats(os->os_dsl_dataset, nv);
1562 
1563         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_TYPE,
1564             os->os_phys->os_type);
1565         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERACCOUNTING,
1566             dmu_objset_userspace_present(os));
1567 }
1568 
1569 int
1570 dmu_objset_is_snapshot(objset_t *os)
1571 {
1572         if (os->os_dsl_dataset != NULL)
1573                 return (dsl_dataset_is_snapshot(os->os_dsl_dataset));
1574         else
1575                 return (B_FALSE);
1576 }
1577 
1578 int
1579 dmu_snapshot_realname(objset_t *os, char *name, char *real, int maxlen,
1580     boolean_t *conflict)
1581 {
1582         dsl_dataset_t *ds = os->os_dsl_dataset;
1583         uint64_t ignored;
1584 
1585         if (ds->ds_phys->ds_snapnames_zapobj == 0)
1586                 return (ENOENT);
1587 
1588         return (zap_lookup_norm(ds->ds_dir->dd_pool->dp_meta_objset,
1589             ds->ds_phys->ds_snapnames_zapobj, name, 8, 1, &ignored, MT_FIRST,
1590             real, maxlen, conflict));
1591 }
1592 
1593 int
1594 dmu_snapshot_list_next(objset_t *os, int namelen, char *name,
1595     uint64_t *idp, uint64_t *offp, boolean_t *case_conflict)
1596 {
1597         dsl_dataset_t *ds = os->os_dsl_dataset;
1598         zap_cursor_t cursor;
1599         zap_attribute_t attr;
1600 
1601         if (ds->ds_phys->ds_snapnames_zapobj == 0)
1602                 return (ENOENT);
1603 
1604         zap_cursor_init_serialized(&cursor,
1605             ds->ds_dir->dd_pool->dp_meta_objset,
1606             ds->ds_phys->ds_snapnames_zapobj, *offp);
1607 
1608         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1609                 zap_cursor_fini(&cursor);
1610                 return (ENOENT);
1611         }
1612 
1613         if (strlen(attr.za_name) + 1 > namelen) {
1614                 zap_cursor_fini(&cursor);
1615                 return (ENAMETOOLONG);
1616         }
1617 
1618         (void) strcpy(name, attr.za_name);
1619         if (idp)
1620                 *idp = attr.za_first_integer;
1621         if (case_conflict)
1622                 *case_conflict = attr.za_normalization_conflict;
1623         zap_cursor_advance(&cursor);
1624         *offp = zap_cursor_serialize(&cursor);
1625         zap_cursor_fini(&cursor);
1626 
1627         return (0);
1628 }
1629 
1630 int
1631 dmu_dir_list_next(objset_t *os, int namelen, char *name,
1632     uint64_t *idp, uint64_t *offp)
1633 {
1634         dsl_dir_t *dd = os->os_dsl_dataset->ds_dir;
1635         zap_cursor_t cursor;
1636         zap_attribute_t attr;
1637 
1638         /* there is no next dir on a snapshot! */
1639         if (os->os_dsl_dataset->ds_object !=
1640             dd->dd_phys->dd_head_dataset_obj)
1641                 return (ENOENT);
1642 
1643         zap_cursor_init_serialized(&cursor,
1644             dd->dd_pool->dp_meta_objset,
1645             dd->dd_phys->dd_child_dir_zapobj, *offp);
1646 
1647         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1648                 zap_cursor_fini(&cursor);
1649                 return (ENOENT);
1650         }
1651 
1652         if (strlen(attr.za_name) + 1 > namelen) {
1653                 zap_cursor_fini(&cursor);
1654                 return (ENAMETOOLONG);
1655         }
1656 
1657         (void) strcpy(name, attr.za_name);
1658         if (idp)
1659                 *idp = attr.za_first_integer;
1660         zap_cursor_advance(&cursor);
1661         *offp = zap_cursor_serialize(&cursor);
1662         zap_cursor_fini(&cursor);
1663 
1664         return (0);
1665 }
1666 
1667 struct findarg {
1668         int (*func)(const char *, void *);
1669         void *arg;
1670 };
1671 
1672 /* ARGSUSED */
1673 static int
1674 findfunc(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
1675 {
1676         struct findarg *fa = arg;
1677         return (fa->func(dsname, fa->arg));
1678 }
1679 
1680 /*
1681  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1682  * Perhaps change all callers to use dmu_objset_find_spa()?
1683  */
1684 int
1685 dmu_objset_find(char *name, int func(const char *, void *), void *arg,
1686     int flags)
1687 {
1688         struct findarg fa;
1689         fa.func = func;
1690         fa.arg = arg;
1691         return (dmu_objset_find_spa(NULL, name, findfunc, &fa, flags));
1692 }
1693 
1694 /*
1695  * Find all objsets under name, call func on each
1696  */
1697 int
1698 dmu_objset_find_spa(spa_t *spa, const char *name,
1699     int func(spa_t *, uint64_t, const char *, void *), void *arg, int flags)
1700 {
1701         dsl_dir_t *dd;
1702         dsl_pool_t *dp;
1703         dsl_dataset_t *ds;
1704         zap_cursor_t zc;
1705         zap_attribute_t *attr;
1706         char *child;
1707         uint64_t thisobj;
1708         int err;
1709 
1710         if (name == NULL)
1711                 name = spa_name(spa);
1712         err = dsl_dir_open_spa(spa, name, FTAG, &dd, NULL);
1713         if (err)
1714                 return (err);
1715 
1716         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1717         if (dd->dd_myname[0] == '$') {
1718                 dsl_dir_close(dd, FTAG);
1719                 return (0);
1720         }
1721 
1722         thisobj = dd->dd_phys->dd_head_dataset_obj;
1723         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1724         dp = dd->dd_pool;
1725 
1726         /*
1727          * Iterate over all children.
1728          */
1729         if (flags & DS_FIND_CHILDREN) {
1730                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1731                     dd->dd_phys->dd_child_dir_zapobj);
1732                     zap_cursor_retrieve(&zc, attr) == 0;
1733                     (void) zap_cursor_advance(&zc)) {
1734                         ASSERT(attr->za_integer_length == sizeof (uint64_t));
1735                         ASSERT(attr->za_num_integers == 1);
1736 
1737                         child = kmem_asprintf("%s/%s", name, attr->za_name);
1738                         err = dmu_objset_find_spa(spa, child, func, arg, flags);
1739                         strfree(child);
1740                         if (err)
1741                                 break;
1742                 }
1743                 zap_cursor_fini(&zc);
1744 
1745                 if (err) {
1746                         dsl_dir_close(dd, FTAG);
1747                         kmem_free(attr, sizeof (zap_attribute_t));
1748                         return (err);
1749                 }
1750         }
1751 
1752         /*
1753          * Iterate over all snapshots.
1754          */
1755         if (flags & DS_FIND_SNAPSHOTS) {
1756                 if (!dsl_pool_sync_context(dp))
1757                         rw_enter(&dp->dp_config_rwlock, RW_READER);
1758                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1759                 if (!dsl_pool_sync_context(dp))
1760                         rw_exit(&dp->dp_config_rwlock);
1761 
1762                 if (err == 0) {
1763                         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
1764                         dsl_dataset_rele(ds, FTAG);
1765 
1766                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1767                             zap_cursor_retrieve(&zc, attr) == 0;
1768                             (void) zap_cursor_advance(&zc)) {
1769                                 ASSERT(attr->za_integer_length ==
1770                                     sizeof (uint64_t));
1771                                 ASSERT(attr->za_num_integers == 1);
1772 
1773                                 child = kmem_asprintf("%s@%s",
1774                                     name, attr->za_name);
1775                                 err = func(spa, attr->za_first_integer,
1776                                     child, arg);
1777                                 strfree(child);
1778                                 if (err)
1779                                         break;
1780                         }
1781                         zap_cursor_fini(&zc);
1782                 }
1783         }
1784 
1785         dsl_dir_close(dd, FTAG);
1786         kmem_free(attr, sizeof (zap_attribute_t));
1787 
1788         if (err)
1789                 return (err);
1790 
1791         /*
1792          * Apply to self if appropriate.
1793          */
1794         err = func(spa, thisobj, name, arg);
1795         return (err);
1796 }
1797 
1798 /* ARGSUSED */
1799 int
1800 dmu_objset_prefetch(const char *name, void *arg)
1801 {
1802         dsl_dataset_t *ds;
1803 
1804         if (dsl_dataset_hold(name, FTAG, &ds))
1805                 return (0);
1806 
1807         if (!BP_IS_HOLE(&ds->ds_phys->ds_bp)) {
1808                 mutex_enter(&ds->ds_opening_lock);
1809                 if (ds->ds_objset == NULL) {
1810                         uint32_t aflags = ARC_NOWAIT | ARC_PREFETCH;
1811                         zbookmark_t zb;
1812 
1813                         SET_BOOKMARK(&zb, ds->ds_object, ZB_ROOT_OBJECT,
1814                             ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
1815 
1816                         (void) dsl_read_nolock(NULL, dsl_dataset_get_spa(ds),
1817                             &ds->ds_phys->ds_bp, NULL, NULL,
1818                             ZIO_PRIORITY_ASYNC_READ,
1819                             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
1820                             &aflags, &zb);
1821                 }
1822                 mutex_exit(&ds->ds_opening_lock);
1823         }
1824 
1825         dsl_dataset_rele(ds, FTAG);
1826         return (0);
1827 }
1828 
1829 void
1830 dmu_objset_set_user(objset_t *os, void *user_ptr)
1831 {
1832         ASSERT(MUTEX_HELD(&os->os_user_ptr_lock));
1833         os->os_user_ptr = user_ptr;
1834 }
1835 
1836 void *
1837 dmu_objset_get_user(objset_t *os)
1838 {
1839         ASSERT(MUTEX_HELD(&os->os_user_ptr_lock));
1840         return (os->os_user_ptr);
1841 }