1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
  24  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  25  * Copyright (c) 2013, Joyent, Inc. All rights reserved.
  26  * Copyright (c) 2014, STRATO AG, Inc. All rights reserved.
  27  */
  28 
  29 /* Portions Copyright 2010 Robert Milkowski */
  30 
  31 #include <sys/cred.h>
  32 #include <sys/zfs_context.h>
  33 #include <sys/dmu_objset.h>
  34 #include <sys/dsl_dir.h>
  35 #include <sys/dsl_dataset.h>
  36 #include <sys/dsl_prop.h>
  37 #include <sys/dsl_pool.h>
  38 #include <sys/dsl_synctask.h>
  39 #include <sys/dsl_deleg.h>
  40 #include <sys/dnode.h>
  41 #include <sys/dbuf.h>
  42 #include <sys/zvol.h>
  43 #include <sys/dmu_tx.h>
  44 #include <sys/zap.h>
  45 #include <sys/zil.h>
  46 #include <sys/dmu_impl.h>
  47 #include <sys/zfs_ioctl.h>
  48 #include <sys/sa.h>
  49 #include <sys/zfs_onexit.h>
  50 #include <sys/dsl_destroy.h>
  51 #include <sys/vdev.h>
  52 
  53 /*
  54  * Needed to close a window in dnode_move() that allows the objset to be freed
  55  * before it can be safely accessed.
  56  */
  57 krwlock_t os_lock;
  58 
  59 void
  60 dmu_objset_init(void)
  61 {
  62         rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
  63 }
  64 
  65 void
  66 dmu_objset_fini(void)
  67 {
  68         rw_destroy(&os_lock);
  69 }
  70 
  71 spa_t *
  72 dmu_objset_spa(objset_t *os)
  73 {
  74         return (os->os_spa);
  75 }
  76 
  77 zilog_t *
  78 dmu_objset_zil(objset_t *os)
  79 {
  80         return (os->os_zil);
  81 }
  82 
  83 dsl_pool_t *
  84 dmu_objset_pool(objset_t *os)
  85 {
  86         dsl_dataset_t *ds;
  87 
  88         if ((ds = os->os_dsl_dataset) != NULL && ds->ds_dir)
  89                 return (ds->ds_dir->dd_pool);
  90         else
  91                 return (spa_get_dsl(os->os_spa));
  92 }
  93 
  94 dsl_dataset_t *
  95 dmu_objset_ds(objset_t *os)
  96 {
  97         return (os->os_dsl_dataset);
  98 }
  99 
 100 dmu_objset_type_t
 101 dmu_objset_type(objset_t *os)
 102 {
 103         return (os->os_phys->os_type);
 104 }
 105 
 106 void
 107 dmu_objset_name(objset_t *os, char *buf)
 108 {
 109         dsl_dataset_name(os->os_dsl_dataset, buf);
 110 }
 111 
 112 uint64_t
 113 dmu_objset_id(objset_t *os)
 114 {
 115         dsl_dataset_t *ds = os->os_dsl_dataset;
 116 
 117         return (ds ? ds->ds_object : 0);
 118 }
 119 
 120 zfs_sync_type_t
 121 dmu_objset_syncprop(objset_t *os)
 122 {
 123         return (os->os_sync);
 124 }
 125 
 126 zfs_logbias_op_t
 127 dmu_objset_logbias(objset_t *os)
 128 {
 129         return (os->os_logbias);
 130 }
 131 
 132 static void
 133 checksum_changed_cb(void *arg, uint64_t newval)
 134 {
 135         objset_t *os = arg;
 136 
 137         /*
 138          * Inheritance should have been done by now.
 139          */
 140         ASSERT(newval != ZIO_CHECKSUM_INHERIT);
 141 
 142         os->os_checksum = zio_checksum_select(newval, ZIO_CHECKSUM_ON_VALUE);
 143 }
 144 
 145 static void
 146 compression_changed_cb(void *arg, uint64_t newval)
 147 {
 148         objset_t *os = arg;
 149 
 150         /*
 151          * Inheritance and range checking should have been done by now.
 152          */
 153         ASSERT(newval != ZIO_COMPRESS_INHERIT);
 154 
 155         os->os_compress = zio_compress_select(newval, ZIO_COMPRESS_ON_VALUE);
 156 }
 157 
 158 static void
 159 copies_changed_cb(void *arg, uint64_t newval)
 160 {
 161         objset_t *os = arg;
 162 
 163         /*
 164          * Inheritance and range checking should have been done by now.
 165          */
 166         ASSERT(newval > 0);
 167         ASSERT(newval <= spa_max_replication(os->os_spa));
 168 
 169         os->os_copies = newval;
 170 }
 171 
 172 static void
 173 dedup_changed_cb(void *arg, uint64_t newval)
 174 {
 175         objset_t *os = arg;
 176         spa_t *spa = os->os_spa;
 177         enum zio_checksum checksum;
 178 
 179         /*
 180          * Inheritance should have been done by now.
 181          */
 182         ASSERT(newval != ZIO_CHECKSUM_INHERIT);
 183 
 184         checksum = zio_checksum_dedup_select(spa, newval, ZIO_CHECKSUM_OFF);
 185 
 186         os->os_dedup_checksum = checksum & ZIO_CHECKSUM_MASK;
 187         os->os_dedup_verify = !!(checksum & ZIO_CHECKSUM_VERIFY);
 188 }
 189 
 190 static void
 191 primary_cache_changed_cb(void *arg, uint64_t newval)
 192 {
 193         objset_t *os = arg;
 194 
 195         /*
 196          * Inheritance and range checking should have been done by now.
 197          */
 198         ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
 199             newval == ZFS_CACHE_METADATA);
 200 
 201         os->os_primary_cache = newval;
 202 }
 203 
 204 static void
 205 secondary_cache_changed_cb(void *arg, uint64_t newval)
 206 {
 207         objset_t *os = arg;
 208 
 209         /*
 210          * Inheritance and range checking should have been done by now.
 211          */
 212         ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
 213             newval == ZFS_CACHE_METADATA);
 214 
 215         os->os_secondary_cache = newval;
 216 }
 217 
 218 static void
 219 sync_changed_cb(void *arg, uint64_t newval)
 220 {
 221         objset_t *os = arg;
 222 
 223         /*
 224          * Inheritance and range checking should have been done by now.
 225          */
 226         ASSERT(newval == ZFS_SYNC_STANDARD || newval == ZFS_SYNC_ALWAYS ||
 227             newval == ZFS_SYNC_DISABLED);
 228 
 229         os->os_sync = newval;
 230         if (os->os_zil)
 231                 zil_set_sync(os->os_zil, newval);
 232 }
 233 
 234 static void
 235 redundant_metadata_changed_cb(void *arg, uint64_t newval)
 236 {
 237         objset_t *os = arg;
 238 
 239         /*
 240          * Inheritance and range checking should have been done by now.
 241          */
 242         ASSERT(newval == ZFS_REDUNDANT_METADATA_ALL ||
 243             newval == ZFS_REDUNDANT_METADATA_MOST);
 244 
 245         os->os_redundant_metadata = newval;
 246 }
 247 
 248 static void
 249 logbias_changed_cb(void *arg, uint64_t newval)
 250 {
 251         objset_t *os = arg;
 252 
 253         ASSERT(newval == ZFS_LOGBIAS_LATENCY ||
 254             newval == ZFS_LOGBIAS_THROUGHPUT);
 255         os->os_logbias = newval;
 256         if (os->os_zil)
 257                 zil_set_logbias(os->os_zil, newval);
 258 }
 259 
 260 void
 261 dmu_objset_byteswap(void *buf, size_t size)
 262 {
 263         objset_phys_t *osp = buf;
 264 
 265         ASSERT(size == OBJSET_OLD_PHYS_SIZE || size == sizeof (objset_phys_t));
 266         dnode_byteswap(&osp->os_meta_dnode);
 267         byteswap_uint64_array(&osp->os_zil_header, sizeof (zil_header_t));
 268         osp->os_type = BSWAP_64(osp->os_type);
 269         osp->os_flags = BSWAP_64(osp->os_flags);
 270         if (size == sizeof (objset_phys_t)) {
 271                 dnode_byteswap(&osp->os_userused_dnode);
 272                 dnode_byteswap(&osp->os_groupused_dnode);
 273         }
 274 }
 275 
 276 int
 277 dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
 278     objset_t **osp)
 279 {
 280         objset_t *os;
 281         int i, err;
 282 
 283         ASSERT(ds == NULL || MUTEX_HELD(&ds->ds_opening_lock));
 284 
 285         os = kmem_zalloc(sizeof (objset_t), KM_SLEEP);
 286         os->os_dsl_dataset = ds;
 287         os->os_spa = spa;
 288         os->os_rootbp = bp;
 289         if (!BP_IS_HOLE(os->os_rootbp)) {
 290                 uint32_t aflags = ARC_WAIT;
 291                 zbookmark_t zb;
 292                 SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
 293                     ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
 294 
 295                 if (DMU_OS_IS_L2CACHEABLE(os))
 296                         aflags |= ARC_L2CACHE;
 297                 if (DMU_OS_IS_L2COMPRESSIBLE(os))
 298                         aflags |= ARC_L2COMPRESS;
 299 
 300                 dprintf_bp(os->os_rootbp, "reading %s", "");
 301                 err = arc_read(NULL, spa, os->os_rootbp,
 302                     arc_getbuf_func, &os->os_phys_buf,
 303                     ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL, &aflags, &zb);
 304                 if (err != 0) {
 305                         kmem_free(os, sizeof (objset_t));
 306                         /* convert checksum errors into IO errors */
 307                         if (err == ECKSUM)
 308                                 err = SET_ERROR(EIO);
 309                         return (err);
 310                 }
 311 
 312                 /* Increase the blocksize if we are permitted. */
 313                 if (spa_version(spa) >= SPA_VERSION_USERSPACE &&
 314                     arc_buf_size(os->os_phys_buf) < sizeof (objset_phys_t)) {
 315                         arc_buf_t *buf = arc_buf_alloc(spa,
 316                             sizeof (objset_phys_t), &os->os_phys_buf,
 317                             ARC_BUFC_METADATA);
 318                         bzero(buf->b_data, sizeof (objset_phys_t));
 319                         bcopy(os->os_phys_buf->b_data, buf->b_data,
 320                             arc_buf_size(os->os_phys_buf));
 321                         (void) arc_buf_remove_ref(os->os_phys_buf,
 322                             &os->os_phys_buf);
 323                         os->os_phys_buf = buf;
 324                 }
 325 
 326                 os->os_phys = os->os_phys_buf->b_data;
 327                 os->os_flags = os->os_phys->os_flags;
 328         } else {
 329                 int size = spa_version(spa) >= SPA_VERSION_USERSPACE ?
 330                     sizeof (objset_phys_t) : OBJSET_OLD_PHYS_SIZE;
 331                 os->os_phys_buf = arc_buf_alloc(spa, size,
 332                     &os->os_phys_buf, ARC_BUFC_METADATA);
 333                 os->os_phys = os->os_phys_buf->b_data;
 334                 bzero(os->os_phys, size);
 335         }
 336 
 337         /*
 338          * Note: the changed_cb will be called once before the register
 339          * func returns, thus changing the checksum/compression from the
 340          * default (fletcher2/off).  Snapshots don't need to know about
 341          * checksum/compression/copies.
 342          */
 343         if (ds != NULL) {
 344                 err = dsl_prop_register(ds,
 345                     zfs_prop_to_name(ZFS_PROP_PRIMARYCACHE),
 346                     primary_cache_changed_cb, os);
 347                 if (err == 0) {
 348                         err = dsl_prop_register(ds,
 349                             zfs_prop_to_name(ZFS_PROP_SECONDARYCACHE),
 350                             secondary_cache_changed_cb, os);
 351                 }
 352                 if (!dsl_dataset_is_snapshot(ds)) {
 353                         if (err == 0) {
 354                                 err = dsl_prop_register(ds,
 355                                     zfs_prop_to_name(ZFS_PROP_CHECKSUM),
 356                                     checksum_changed_cb, os);
 357                         }
 358                         if (err == 0) {
 359                                 err = dsl_prop_register(ds,
 360                                     zfs_prop_to_name(ZFS_PROP_COMPRESSION),
 361                                     compression_changed_cb, os);
 362                         }
 363                         if (err == 0) {
 364                                 err = dsl_prop_register(ds,
 365                                     zfs_prop_to_name(ZFS_PROP_COPIES),
 366                                     copies_changed_cb, os);
 367                         }
 368                         if (err == 0) {
 369                                 err = dsl_prop_register(ds,
 370                                     zfs_prop_to_name(ZFS_PROP_DEDUP),
 371                                     dedup_changed_cb, os);
 372                         }
 373                         if (err == 0) {
 374                                 err = dsl_prop_register(ds,
 375                                     zfs_prop_to_name(ZFS_PROP_LOGBIAS),
 376                                     logbias_changed_cb, os);
 377                         }
 378                         if (err == 0) {
 379                                 err = dsl_prop_register(ds,
 380                                     zfs_prop_to_name(ZFS_PROP_SYNC),
 381                                     sync_changed_cb, os);
 382                         }
 383                         if (err == 0) {
 384                                 err = dsl_prop_register(ds,
 385                                     zfs_prop_to_name(
 386                                     ZFS_PROP_REDUNDANT_METADATA),
 387                                     redundant_metadata_changed_cb, os);
 388                         }
 389                 }
 390                 if (err != 0) {
 391                         VERIFY(arc_buf_remove_ref(os->os_phys_buf,
 392                             &os->os_phys_buf));
 393                         kmem_free(os, sizeof (objset_t));
 394                         return (err);
 395                 }
 396         } else {
 397                 /* It's the meta-objset. */
 398                 os->os_checksum = ZIO_CHECKSUM_FLETCHER_4;
 399                 os->os_compress = ZIO_COMPRESS_LZJB;
 400                 os->os_copies = spa_max_replication(spa);
 401                 os->os_dedup_checksum = ZIO_CHECKSUM_OFF;
 402                 os->os_dedup_verify = B_FALSE;
 403                 os->os_logbias = ZFS_LOGBIAS_LATENCY;
 404                 os->os_sync = ZFS_SYNC_STANDARD;
 405                 os->os_primary_cache = ZFS_CACHE_ALL;
 406                 os->os_secondary_cache = ZFS_CACHE_ALL;
 407         }
 408 
 409         if (ds == NULL || !dsl_dataset_is_snapshot(ds))
 410                 os->os_zil_header = os->os_phys->os_zil_header;
 411         os->os_zil = zil_alloc(os, &os->os_zil_header);
 412 
 413         for (i = 0; i < TXG_SIZE; i++) {
 414                 list_create(&os->os_dirty_dnodes[i], sizeof (dnode_t),
 415                     offsetof(dnode_t, dn_dirty_link[i]));
 416                 list_create(&os->os_free_dnodes[i], sizeof (dnode_t),
 417                     offsetof(dnode_t, dn_dirty_link[i]));
 418         }
 419         list_create(&os->os_dnodes, sizeof (dnode_t),
 420             offsetof(dnode_t, dn_link));
 421         list_create(&os->os_downgraded_dbufs, sizeof (dmu_buf_impl_t),
 422             offsetof(dmu_buf_impl_t, db_link));
 423 
 424         mutex_init(&os->os_lock, NULL, MUTEX_DEFAULT, NULL);
 425         mutex_init(&os->os_obj_lock, NULL, MUTEX_DEFAULT, NULL);
 426         mutex_init(&os->os_user_ptr_lock, NULL, MUTEX_DEFAULT, NULL);
 427 
 428         DMU_META_DNODE(os) = dnode_special_open(os,
 429             &os->os_phys->os_meta_dnode, DMU_META_DNODE_OBJECT,
 430             &os->os_meta_dnode);
 431         if (arc_buf_size(os->os_phys_buf) >= sizeof (objset_phys_t)) {
 432                 DMU_USERUSED_DNODE(os) = dnode_special_open(os,
 433                     &os->os_phys->os_userused_dnode, DMU_USERUSED_OBJECT,
 434                     &os->os_userused_dnode);
 435                 DMU_GROUPUSED_DNODE(os) = dnode_special_open(os,
 436                     &os->os_phys->os_groupused_dnode, DMU_GROUPUSED_OBJECT,
 437                     &os->os_groupused_dnode);
 438         }
 439 
 440         *osp = os;
 441         return (0);
 442 }
 443 
 444 int
 445 dmu_objset_from_ds(dsl_dataset_t *ds, objset_t **osp)
 446 {
 447         int err = 0;
 448 
 449         mutex_enter(&ds->ds_opening_lock);
 450         if (ds->ds_objset == NULL) {
 451                 objset_t *os;
 452                 err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
 453                     ds, dsl_dataset_get_blkptr(ds), &os);
 454 
 455                 if (err == 0) {
 456                         mutex_enter(&ds->ds_lock);
 457                         ASSERT(ds->ds_objset == NULL);
 458                         ds->ds_objset = os;
 459                         mutex_exit(&ds->ds_lock);
 460                 }
 461         }
 462         *osp = ds->ds_objset;
 463         mutex_exit(&ds->ds_opening_lock);
 464         return (err);
 465 }
 466 
 467 /*
 468  * Holds the pool while the objset is held.  Therefore only one objset
 469  * can be held at a time.
 470  */
 471 int
 472 dmu_objset_hold(const char *name, void *tag, objset_t **osp)
 473 {
 474         dsl_pool_t *dp;
 475         dsl_dataset_t *ds;
 476         int err;
 477 
 478         err = dsl_pool_hold(name, tag, &dp);
 479         if (err != 0)
 480                 return (err);
 481         err = dsl_dataset_hold(dp, name, tag, &ds);
 482         if (err != 0) {
 483                 dsl_pool_rele(dp, tag);
 484                 return (err);
 485         }
 486 
 487         err = dmu_objset_from_ds(ds, osp);
 488         if (err != 0) {
 489                 dsl_dataset_rele(ds, tag);
 490                 dsl_pool_rele(dp, tag);
 491         }
 492 
 493         return (err);
 494 }
 495 
 496 static int
 497 dmu_objset_own_common(dsl_dataset_t *ds, dmu_objset_type_t type,
 498     boolean_t readonly, void *tag, objset_t **osp)
 499 {
 500         int err;
 501 
 502         err = dmu_objset_from_ds(ds, osp);
 503         if (err != 0) {
 504                 dsl_dataset_disown(ds, tag);
 505         } else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
 506                 dsl_dataset_disown(ds, tag);
 507                 return (SET_ERROR(EINVAL));
 508         } else if (!readonly && dsl_dataset_is_snapshot(ds)) {
 509                 dsl_dataset_disown(ds, tag);
 510                 return (SET_ERROR(EROFS));
 511         }
 512         return (err);
 513 }
 514 
 515 /*
 516  * dsl_pool must not be held when this is called.
 517  * Upon successful return, there will be a longhold on the dataset,
 518  * and the dsl_pool will not be held.
 519  */
 520 int
 521 dmu_objset_own(const char *name, dmu_objset_type_t type,
 522     boolean_t readonly, void *tag, objset_t **osp)
 523 {
 524         dsl_pool_t *dp;
 525         dsl_dataset_t *ds;
 526         int err;
 527 
 528         err = dsl_pool_hold(name, FTAG, &dp);
 529         if (err != 0)
 530                 return (err);
 531         err = dsl_dataset_own(dp, name, tag, &ds);
 532         if (err != 0) {
 533                 dsl_pool_rele(dp, FTAG);
 534                 return (err);
 535         }
 536         err = dmu_objset_own_common(ds, type, readonly, tag, osp);
 537         dsl_pool_rele(dp, FTAG);
 538 
 539         return (err);
 540 }
 541 
 542 int
 543 dmu_objset_own_obj(dsl_pool_t *dp, uint64_t obj, dmu_objset_type_t type,
 544     boolean_t readonly, void *tag, objset_t **osp)
 545 {
 546         dsl_dataset_t *ds;
 547         int err;
 548 
 549         err = dsl_dataset_own_obj(dp, obj, tag, &ds);
 550         if (err != 0)
 551                 return (err);
 552 
 553         return (dmu_objset_own_common(ds, type, readonly, tag, osp));
 554 }
 555 
 556 void
 557 dmu_objset_rele(objset_t *os, void *tag)
 558 {
 559         dsl_pool_t *dp = dmu_objset_pool(os);
 560         dsl_dataset_rele(os->os_dsl_dataset, tag);
 561         dsl_pool_rele(dp, tag);
 562 }
 563 
 564 void
 565 dmu_objset_rele_obj(objset_t *os, void *tag)
 566 {
 567         dsl_dataset_rele(os->os_dsl_dataset, tag);
 568 }
 569 
 570 /*
 571  * When we are called, os MUST refer to an objset associated with a dataset
 572  * that is owned by 'tag'; that is, is held and long held by 'tag' and ds_owner
 573  * == tag.  We will then release and reacquire ownership of the dataset while
 574  * holding the pool config_rwlock to avoid intervening namespace or ownership
 575  * changes may occur.
 576  *
 577  * This exists solely to accommodate zfs_ioc_userspace_upgrade()'s desire to
 578  * release the hold on its dataset and acquire a new one on the dataset of the
 579  * same name so that it can be partially torn down and reconstructed.
 580  */
 581 void
 582 dmu_objset_refresh_ownership(objset_t *os, void *tag)
 583 {
 584         dsl_pool_t *dp;
 585         dsl_dataset_t *ds, *newds;
 586         char name[MAXNAMELEN];
 587 
 588         ds = os->os_dsl_dataset;
 589         VERIFY3P(ds, !=, NULL);
 590         VERIFY3P(ds->ds_owner, ==, tag);
 591         VERIFY(dsl_dataset_long_held(ds));
 592 
 593         dsl_dataset_name(ds, name);
 594         dp = dmu_objset_pool(os);
 595         dsl_pool_config_enter(dp, FTAG);
 596         dmu_objset_disown(os, tag);
 597         VERIFY0(dsl_dataset_own(dp, name, tag, &newds));
 598         VERIFY3P(newds, ==, os->os_dsl_dataset);
 599         dsl_pool_config_exit(dp, FTAG);
 600 }
 601 
 602 void
 603 dmu_objset_disown(objset_t *os, void *tag)
 604 {
 605         dsl_dataset_disown(os->os_dsl_dataset, tag);
 606 }
 607 
 608 void
 609 dmu_objset_evict_dbufs(objset_t *os)
 610 {
 611         dnode_t *dn;
 612 
 613         mutex_enter(&os->os_lock);
 614 
 615         /* process the mdn last, since the other dnodes have holds on it */
 616         list_remove(&os->os_dnodes, DMU_META_DNODE(os));
 617         list_insert_tail(&os->os_dnodes, DMU_META_DNODE(os));
 618 
 619         /*
 620          * Find the first dnode with holds.  We have to do this dance
 621          * because dnode_add_ref() only works if you already have a
 622          * hold.  If there are no holds then it has no dbufs so OK to
 623          * skip.
 624          */
 625         for (dn = list_head(&os->os_dnodes);
 626             dn && !dnode_add_ref(dn, FTAG);
 627             dn = list_next(&os->os_dnodes, dn))
 628                 continue;
 629 
 630         while (dn) {
 631                 dnode_t *next_dn = dn;
 632 
 633                 do {
 634                         next_dn = list_next(&os->os_dnodes, next_dn);
 635                 } while (next_dn && !dnode_add_ref(next_dn, FTAG));
 636 
 637                 mutex_exit(&os->os_lock);
 638                 dnode_evict_dbufs(dn);
 639                 dnode_rele(dn, FTAG);
 640                 mutex_enter(&os->os_lock);
 641                 dn = next_dn;
 642         }
 643         mutex_exit(&os->os_lock);
 644 }
 645 
 646 void
 647 dmu_objset_evict(objset_t *os)
 648 {
 649         dsl_dataset_t *ds = os->os_dsl_dataset;
 650 
 651         for (int t = 0; t < TXG_SIZE; t++)
 652                 ASSERT(!dmu_objset_is_dirty(os, t));
 653 
 654         if (ds) {
 655                 if (!dsl_dataset_is_snapshot(ds)) {
 656                         VERIFY0(dsl_prop_unregister(ds,
 657                             zfs_prop_to_name(ZFS_PROP_CHECKSUM),
 658                             checksum_changed_cb, os));
 659                         VERIFY0(dsl_prop_unregister(ds,
 660                             zfs_prop_to_name(ZFS_PROP_COMPRESSION),
 661                             compression_changed_cb, os));
 662                         VERIFY0(dsl_prop_unregister(ds,
 663                             zfs_prop_to_name(ZFS_PROP_COPIES),
 664                             copies_changed_cb, os));
 665                         VERIFY0(dsl_prop_unregister(ds,
 666                             zfs_prop_to_name(ZFS_PROP_DEDUP),
 667                             dedup_changed_cb, os));
 668                         VERIFY0(dsl_prop_unregister(ds,
 669                             zfs_prop_to_name(ZFS_PROP_LOGBIAS),
 670                             logbias_changed_cb, os));
 671                         VERIFY0(dsl_prop_unregister(ds,
 672                             zfs_prop_to_name(ZFS_PROP_SYNC),
 673                             sync_changed_cb, os));
 674                         VERIFY0(dsl_prop_unregister(ds,
 675                             zfs_prop_to_name(ZFS_PROP_REDUNDANT_METADATA),
 676                             redundant_metadata_changed_cb, os));
 677                 }
 678                 VERIFY0(dsl_prop_unregister(ds,
 679                     zfs_prop_to_name(ZFS_PROP_PRIMARYCACHE),
 680                     primary_cache_changed_cb, os));
 681                 VERIFY0(dsl_prop_unregister(ds,
 682                     zfs_prop_to_name(ZFS_PROP_SECONDARYCACHE),
 683                     secondary_cache_changed_cb, os));
 684         }
 685 
 686         if (os->os_sa)
 687                 sa_tear_down(os);
 688 
 689         dmu_objset_evict_dbufs(os);
 690 
 691         dnode_special_close(&os->os_meta_dnode);
 692         if (DMU_USERUSED_DNODE(os)) {
 693                 dnode_special_close(&os->os_userused_dnode);
 694                 dnode_special_close(&os->os_groupused_dnode);
 695         }
 696         zil_free(os->os_zil);
 697 
 698         ASSERT3P(list_head(&os->os_dnodes), ==, NULL);
 699 
 700         VERIFY(arc_buf_remove_ref(os->os_phys_buf, &os->os_phys_buf));
 701 
 702         /*
 703          * This is a barrier to prevent the objset from going away in
 704          * dnode_move() until we can safely ensure that the objset is still in
 705          * use. We consider the objset valid before the barrier and invalid
 706          * after the barrier.
 707          */
 708         rw_enter(&os_lock, RW_READER);
 709         rw_exit(&os_lock);
 710 
 711         mutex_destroy(&os->os_lock);
 712         mutex_destroy(&os->os_obj_lock);
 713         mutex_destroy(&os->os_user_ptr_lock);
 714         kmem_free(os, sizeof (objset_t));
 715 }
 716 
 717 timestruc_t
 718 dmu_objset_snap_cmtime(objset_t *os)
 719 {
 720         return (dsl_dir_snap_cmtime(os->os_dsl_dataset->ds_dir));
 721 }
 722 
 723 /* called from dsl for meta-objset */
 724 objset_t *
 725 dmu_objset_create_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
 726     dmu_objset_type_t type, dmu_tx_t *tx)
 727 {
 728         objset_t *os;
 729         dnode_t *mdn;
 730 
 731         ASSERT(dmu_tx_is_syncing(tx));
 732 
 733         if (ds != NULL)
 734                 VERIFY0(dmu_objset_from_ds(ds, &os));
 735         else
 736                 VERIFY0(dmu_objset_open_impl(spa, NULL, bp, &os));
 737 
 738         mdn = DMU_META_DNODE(os);
 739 
 740         dnode_allocate(mdn, DMU_OT_DNODE, 1 << DNODE_BLOCK_SHIFT,
 741             DN_MAX_INDBLKSHIFT, DMU_OT_NONE, 0, tx);
 742 
 743         /*
 744          * We don't want to have to increase the meta-dnode's nlevels
 745          * later, because then we could do it in quescing context while
 746          * we are also accessing it in open context.
 747          *
 748          * This precaution is not necessary for the MOS (ds == NULL),
 749          * because the MOS is only updated in syncing context.
 750          * This is most fortunate: the MOS is the only objset that
 751          * needs to be synced multiple times as spa_sync() iterates
 752          * to convergence, so minimizing its dn_nlevels matters.
 753          */
 754         if (ds != NULL) {
 755                 int levels = 1;
 756 
 757                 /*
 758                  * Determine the number of levels necessary for the meta-dnode
 759                  * to contain DN_MAX_OBJECT dnodes.
 760                  */
 761                 while ((uint64_t)mdn->dn_nblkptr << (mdn->dn_datablkshift +
 762                     (levels - 1) * (mdn->dn_indblkshift - SPA_BLKPTRSHIFT)) <
 763                     DN_MAX_OBJECT * sizeof (dnode_phys_t))
 764                         levels++;
 765 
 766                 mdn->dn_next_nlevels[tx->tx_txg & TXG_MASK] =
 767                     mdn->dn_nlevels = levels;
 768         }
 769 
 770         ASSERT(type != DMU_OST_NONE);
 771         ASSERT(type != DMU_OST_ANY);
 772         ASSERT(type < DMU_OST_NUMTYPES);
 773         os->os_phys->os_type = type;
 774         if (dmu_objset_userused_enabled(os)) {
 775                 os->os_phys->os_flags |= OBJSET_FLAG_USERACCOUNTING_COMPLETE;
 776                 os->os_flags = os->os_phys->os_flags;
 777         }
 778 
 779         dsl_dataset_dirty(ds, tx);
 780 
 781         return (os);
 782 }
 783 
 784 typedef struct dmu_objset_create_arg {
 785         const char *doca_name;
 786         cred_t *doca_cred;
 787         void (*doca_userfunc)(objset_t *os, void *arg,
 788             cred_t *cr, dmu_tx_t *tx);
 789         void *doca_userarg;
 790         dmu_objset_type_t doca_type;
 791         uint64_t doca_flags;
 792 } dmu_objset_create_arg_t;
 793 
 794 /*ARGSUSED*/
 795 static int
 796 dmu_objset_create_check(void *arg, dmu_tx_t *tx)
 797 {
 798         dmu_objset_create_arg_t *doca = arg;
 799         dsl_pool_t *dp = dmu_tx_pool(tx);
 800         dsl_dir_t *pdd;
 801         const char *tail;
 802         int error;
 803 
 804         if (strchr(doca->doca_name, '@') != NULL)
 805                 return (SET_ERROR(EINVAL));
 806 
 807         error = dsl_dir_hold(dp, doca->doca_name, FTAG, &pdd, &tail);
 808         if (error != 0)
 809                 return (error);
 810         if (tail == NULL) {
 811                 dsl_dir_rele(pdd, FTAG);
 812                 return (SET_ERROR(EEXIST));
 813         }
 814         error = dsl_fs_ss_limit_check(pdd, 1, ZFS_PROP_FILESYSTEM_LIMIT, NULL,
 815             doca->doca_cred);
 816         dsl_dir_rele(pdd, FTAG);
 817 
 818         return (error);
 819 }
 820 
 821 static void
 822 dmu_objset_create_sync(void *arg, dmu_tx_t *tx)
 823 {
 824         dmu_objset_create_arg_t *doca = arg;
 825         dsl_pool_t *dp = dmu_tx_pool(tx);
 826         dsl_dir_t *pdd;
 827         const char *tail;
 828         dsl_dataset_t *ds;
 829         uint64_t obj;
 830         blkptr_t *bp;
 831         objset_t *os;
 832 
 833         VERIFY0(dsl_dir_hold(dp, doca->doca_name, FTAG, &pdd, &tail));
 834 
 835         obj = dsl_dataset_create_sync(pdd, tail, NULL, doca->doca_flags,
 836             doca->doca_cred, tx);
 837 
 838         VERIFY0(dsl_dataset_hold_obj(pdd->dd_pool, obj, FTAG, &ds));
 839         bp = dsl_dataset_get_blkptr(ds);
 840         os = dmu_objset_create_impl(pdd->dd_pool->dp_spa,
 841             ds, bp, doca->doca_type, tx);
 842 
 843         if (doca->doca_userfunc != NULL) {
 844                 doca->doca_userfunc(os, doca->doca_userarg,
 845                     doca->doca_cred, tx);
 846         }
 847 
 848         spa_history_log_internal_ds(ds, "create", tx, "");
 849         dsl_dataset_rele(ds, FTAG);
 850         dsl_dir_rele(pdd, FTAG);
 851 }
 852 
 853 int
 854 dmu_objset_create(const char *name, dmu_objset_type_t type, uint64_t flags,
 855     void (*func)(objset_t *os, void *arg, cred_t *cr, dmu_tx_t *tx), void *arg)
 856 {
 857         dmu_objset_create_arg_t doca;
 858 
 859         doca.doca_name = name;
 860         doca.doca_cred = CRED();
 861         doca.doca_flags = flags;
 862         doca.doca_userfunc = func;
 863         doca.doca_userarg = arg;
 864         doca.doca_type = type;
 865 
 866         return (dsl_sync_task(name,
 867             dmu_objset_create_check, dmu_objset_create_sync, &doca, 5));
 868 }
 869 
 870 typedef struct dmu_objset_clone_arg {
 871         const char *doca_clone;
 872         const char *doca_origin;
 873         cred_t *doca_cred;
 874 } dmu_objset_clone_arg_t;
 875 
 876 /*ARGSUSED*/
 877 static int
 878 dmu_objset_clone_check(void *arg, dmu_tx_t *tx)
 879 {
 880         dmu_objset_clone_arg_t *doca = arg;
 881         dsl_dir_t *pdd;
 882         const char *tail;
 883         int error;
 884         dsl_dataset_t *origin;
 885         dsl_pool_t *dp = dmu_tx_pool(tx);
 886 
 887         if (strchr(doca->doca_clone, '@') != NULL)
 888                 return (SET_ERROR(EINVAL));
 889 
 890         error = dsl_dir_hold(dp, doca->doca_clone, FTAG, &pdd, &tail);
 891         if (error != 0)
 892                 return (error);
 893         if (tail == NULL) {
 894                 dsl_dir_rele(pdd, FTAG);
 895                 return (SET_ERROR(EEXIST));
 896         }
 897         /* You can't clone across pools. */
 898         if (pdd->dd_pool != dp) {
 899                 dsl_dir_rele(pdd, FTAG);
 900                 return (SET_ERROR(EXDEV));
 901         }
 902         error = dsl_fs_ss_limit_check(pdd, 1, ZFS_PROP_FILESYSTEM_LIMIT, NULL,
 903             doca->doca_cred);
 904         if (error != 0) {
 905                 dsl_dir_rele(pdd, FTAG);
 906                 return (SET_ERROR(EDQUOT));
 907         }
 908         dsl_dir_rele(pdd, FTAG);
 909 
 910         error = dsl_dataset_hold(dp, doca->doca_origin, FTAG, &origin);
 911         if (error != 0)
 912                 return (error);
 913 
 914         /* You can't clone across pools. */
 915         if (origin->ds_dir->dd_pool != dp) {
 916                 dsl_dataset_rele(origin, FTAG);
 917                 return (SET_ERROR(EXDEV));
 918         }
 919 
 920         /* You can only clone snapshots, not the head datasets. */
 921         if (!dsl_dataset_is_snapshot(origin)) {
 922                 dsl_dataset_rele(origin, FTAG);
 923                 return (SET_ERROR(EINVAL));
 924         }
 925         dsl_dataset_rele(origin, FTAG);
 926 
 927         return (0);
 928 }
 929 
 930 static void
 931 dmu_objset_clone_sync(void *arg, dmu_tx_t *tx)
 932 {
 933         dmu_objset_clone_arg_t *doca = arg;
 934         dsl_pool_t *dp = dmu_tx_pool(tx);
 935         dsl_dir_t *pdd;
 936         const char *tail;
 937         dsl_dataset_t *origin, *ds;
 938         uint64_t obj;
 939         char namebuf[MAXNAMELEN];
 940 
 941         VERIFY0(dsl_dir_hold(dp, doca->doca_clone, FTAG, &pdd, &tail));
 942         VERIFY0(dsl_dataset_hold(dp, doca->doca_origin, FTAG, &origin));
 943 
 944         obj = dsl_dataset_create_sync(pdd, tail, origin, 0,
 945             doca->doca_cred, tx);
 946 
 947         VERIFY0(dsl_dataset_hold_obj(pdd->dd_pool, obj, FTAG, &ds));
 948         dsl_dataset_name(origin, namebuf);
 949         spa_history_log_internal_ds(ds, "clone", tx,
 950             "origin=%s (%llu)", namebuf, origin->ds_object);
 951         dsl_dataset_rele(ds, FTAG);
 952         dsl_dataset_rele(origin, FTAG);
 953         dsl_dir_rele(pdd, FTAG);
 954 }
 955 
 956 int
 957 dmu_objset_clone(const char *clone, const char *origin)
 958 {
 959         dmu_objset_clone_arg_t doca;
 960 
 961         doca.doca_clone = clone;
 962         doca.doca_origin = origin;
 963         doca.doca_cred = CRED();
 964 
 965         return (dsl_sync_task(clone,
 966             dmu_objset_clone_check, dmu_objset_clone_sync, &doca, 5));
 967 }
 968 
 969 int
 970 dmu_objset_snapshot_one(const char *fsname, const char *snapname)
 971 {
 972         int err;
 973         char *longsnap = kmem_asprintf("%s@%s", fsname, snapname);
 974         nvlist_t *snaps = fnvlist_alloc();
 975 
 976         fnvlist_add_boolean(snaps, longsnap);
 977         strfree(longsnap);
 978         err = dsl_dataset_snapshot(snaps, NULL, NULL);
 979         fnvlist_free(snaps);
 980         return (err);
 981 }
 982 
 983 static void
 984 dmu_objset_sync_dnodes(list_t *list, list_t *newlist, dmu_tx_t *tx)
 985 {
 986         dnode_t *dn;
 987 
 988         while (dn = list_head(list)) {
 989                 ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
 990                 ASSERT(dn->dn_dbuf->db_data_pending);
 991                 /*
 992                  * Initialize dn_zio outside dnode_sync() because the
 993                  * meta-dnode needs to set it ouside dnode_sync().
 994                  */
 995                 dn->dn_zio = dn->dn_dbuf->db_data_pending->dr_zio;
 996                 ASSERT(dn->dn_zio);
 997 
 998                 ASSERT3U(dn->dn_nlevels, <=, DN_MAX_LEVELS);
 999                 list_remove(list, dn);
1000 
1001                 if (newlist) {
1002                         (void) dnode_add_ref(dn, newlist);
1003                         list_insert_tail(newlist, dn);
1004                 }
1005 
1006                 dnode_sync(dn, tx);
1007         }
1008 }
1009 
1010 /* ARGSUSED */
1011 static void
1012 dmu_objset_write_ready(zio_t *zio, arc_buf_t *abuf, void *arg)
1013 {
1014         blkptr_t *bp = zio->io_bp;
1015         objset_t *os = arg;
1016         dnode_phys_t *dnp = &os->os_phys->os_meta_dnode;
1017 
1018         ASSERT(!BP_IS_EMBEDDED(bp));
1019         ASSERT3P(bp, ==, os->os_rootbp);
1020         ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_OBJSET);
1021         ASSERT0(BP_GET_LEVEL(bp));
1022 
1023         /*
1024          * Update rootbp fill count: it should be the number of objects
1025          * allocated in the object set (not counting the "special"
1026          * objects that are stored in the objset_phys_t -- the meta
1027          * dnode and user/group accounting objects).
1028          */
1029         bp->blk_fill = 0;
1030         for (int i = 0; i < dnp->dn_nblkptr; i++)
1031                 bp->blk_fill += BP_GET_FILL(&dnp->dn_blkptr[i]);
1032 }
1033 
1034 /* ARGSUSED */
1035 static void
1036 dmu_objset_write_done(zio_t *zio, arc_buf_t *abuf, void *arg)
1037 {
1038         blkptr_t *bp = zio->io_bp;
1039         blkptr_t *bp_orig = &zio->io_bp_orig;
1040         objset_t *os = arg;
1041 
1042         if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
1043                 ASSERT(BP_EQUAL(bp, bp_orig));
1044         } else {
1045                 dsl_dataset_t *ds = os->os_dsl_dataset;
1046                 dmu_tx_t *tx = os->os_synctx;
1047 
1048                 (void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
1049                 dsl_dataset_block_born(ds, bp, tx);
1050         }
1051 }
1052 
1053 /* called from dsl */
1054 void
1055 dmu_objset_sync(objset_t *os, zio_t *pio, dmu_tx_t *tx)
1056 {
1057         int txgoff;
1058         zbookmark_t zb;
1059         zio_prop_t zp;
1060         zio_t *zio;
1061         list_t *list;
1062         list_t *newlist = NULL;
1063         dbuf_dirty_record_t *dr;
1064 
1065         dprintf_ds(os->os_dsl_dataset, "txg=%llu\n", tx->tx_txg);
1066 
1067         ASSERT(dmu_tx_is_syncing(tx));
1068         /* XXX the write_done callback should really give us the tx... */
1069         os->os_synctx = tx;
1070 
1071         if (os->os_dsl_dataset == NULL) {
1072                 /*
1073                  * This is the MOS.  If we have upgraded,
1074                  * spa_max_replication() could change, so reset
1075                  * os_copies here.
1076                  */
1077                 os->os_copies = spa_max_replication(os->os_spa);
1078         }
1079 
1080         /*
1081          * Create the root block IO
1082          */
1083         SET_BOOKMARK(&zb, os->os_dsl_dataset ?
1084             os->os_dsl_dataset->ds_object : DMU_META_OBJSET,
1085             ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
1086         arc_release(os->os_phys_buf, &os->os_phys_buf);
1087 
1088         dmu_write_policy(os, NULL, 0, 0, &zp);
1089 
1090         zio = arc_write(pio, os->os_spa, tx->tx_txg,
1091             os->os_rootbp, os->os_phys_buf, DMU_OS_IS_L2CACHEABLE(os),
1092             DMU_OS_IS_L2COMPRESSIBLE(os), &zp, dmu_objset_write_ready,
1093             NULL, dmu_objset_write_done, os, ZIO_PRIORITY_ASYNC_WRITE,
1094             ZIO_FLAG_MUSTSUCCEED, &zb);
1095 
1096         /*
1097          * Sync special dnodes - the parent IO for the sync is the root block
1098          */
1099         DMU_META_DNODE(os)->dn_zio = zio;
1100         dnode_sync(DMU_META_DNODE(os), tx);
1101 
1102         os->os_phys->os_flags = os->os_flags;
1103 
1104         if (DMU_USERUSED_DNODE(os) &&
1105             DMU_USERUSED_DNODE(os)->dn_type != DMU_OT_NONE) {
1106                 DMU_USERUSED_DNODE(os)->dn_zio = zio;
1107                 dnode_sync(DMU_USERUSED_DNODE(os), tx);
1108                 DMU_GROUPUSED_DNODE(os)->dn_zio = zio;
1109                 dnode_sync(DMU_GROUPUSED_DNODE(os), tx);
1110         }
1111 
1112         txgoff = tx->tx_txg & TXG_MASK;
1113 
1114         if (dmu_objset_userused_enabled(os)) {
1115                 newlist = &os->os_synced_dnodes;
1116                 /*
1117                  * We must create the list here because it uses the
1118                  * dn_dirty_link[] of this txg.
1119                  */
1120                 list_create(newlist, sizeof (dnode_t),
1121                     offsetof(dnode_t, dn_dirty_link[txgoff]));
1122         }
1123 
1124         dmu_objset_sync_dnodes(&os->os_free_dnodes[txgoff], newlist, tx);
1125         dmu_objset_sync_dnodes(&os->os_dirty_dnodes[txgoff], newlist, tx);
1126 
1127         list = &DMU_META_DNODE(os)->dn_dirty_records[txgoff];
1128         while (dr = list_head(list)) {
1129                 ASSERT0(dr->dr_dbuf->db_level);
1130                 list_remove(list, dr);
1131                 if (dr->dr_zio)
1132                         zio_nowait(dr->dr_zio);
1133         }
1134         /*
1135          * Free intent log blocks up to this tx.
1136          */
1137         zil_sync(os->os_zil, tx);
1138         os->os_phys->os_zil_header = os->os_zil_header;
1139         zio_nowait(zio);
1140 }
1141 
1142 boolean_t
1143 dmu_objset_is_dirty(objset_t *os, uint64_t txg)
1144 {
1145         return (!list_is_empty(&os->os_dirty_dnodes[txg & TXG_MASK]) ||
1146             !list_is_empty(&os->os_free_dnodes[txg & TXG_MASK]));
1147 }
1148 
1149 static objset_used_cb_t *used_cbs[DMU_OST_NUMTYPES];
1150 
1151 void
1152 dmu_objset_register_type(dmu_objset_type_t ost, objset_used_cb_t *cb)
1153 {
1154         used_cbs[ost] = cb;
1155 }
1156 
1157 boolean_t
1158 dmu_objset_userused_enabled(objset_t *os)
1159 {
1160         return (spa_version(os->os_spa) >= SPA_VERSION_USERSPACE &&
1161             used_cbs[os->os_phys->os_type] != NULL &&
1162             DMU_USERUSED_DNODE(os) != NULL);
1163 }
1164 
1165 static void
1166 do_userquota_update(objset_t *os, uint64_t used, uint64_t flags,
1167     uint64_t user, uint64_t group, boolean_t subtract, dmu_tx_t *tx)
1168 {
1169         if ((flags & DNODE_FLAG_USERUSED_ACCOUNTED)) {
1170                 int64_t delta = DNODE_SIZE + used;
1171                 if (subtract)
1172                         delta = -delta;
1173                 VERIFY3U(0, ==, zap_increment_int(os, DMU_USERUSED_OBJECT,
1174                     user, delta, tx));
1175                 VERIFY3U(0, ==, zap_increment_int(os, DMU_GROUPUSED_OBJECT,
1176                     group, delta, tx));
1177         }
1178 }
1179 
1180 void
1181 dmu_objset_do_userquota_updates(objset_t *os, dmu_tx_t *tx)
1182 {
1183         dnode_t *dn;
1184         list_t *list = &os->os_synced_dnodes;
1185 
1186         ASSERT(list_head(list) == NULL || dmu_objset_userused_enabled(os));
1187 
1188         while (dn = list_head(list)) {
1189                 int flags;
1190                 ASSERT(!DMU_OBJECT_IS_SPECIAL(dn->dn_object));
1191                 ASSERT(dn->dn_phys->dn_type == DMU_OT_NONE ||
1192                     dn->dn_phys->dn_flags &
1193                     DNODE_FLAG_USERUSED_ACCOUNTED);
1194 
1195                 /* Allocate the user/groupused objects if necessary. */
1196                 if (DMU_USERUSED_DNODE(os)->dn_type == DMU_OT_NONE) {
1197                         VERIFY(0 == zap_create_claim(os,
1198                             DMU_USERUSED_OBJECT,
1199                             DMU_OT_USERGROUP_USED, DMU_OT_NONE, 0, tx));
1200                         VERIFY(0 == zap_create_claim(os,
1201                             DMU_GROUPUSED_OBJECT,
1202                             DMU_OT_USERGROUP_USED, DMU_OT_NONE, 0, tx));
1203                 }
1204 
1205                 /*
1206                  * We intentionally modify the zap object even if the
1207                  * net delta is zero.  Otherwise
1208                  * the block of the zap obj could be shared between
1209                  * datasets but need to be different between them after
1210                  * a bprewrite.
1211                  */
1212 
1213                 flags = dn->dn_id_flags;
1214                 ASSERT(flags);
1215                 if (flags & DN_ID_OLD_EXIST)  {
1216                         do_userquota_update(os, dn->dn_oldused, dn->dn_oldflags,
1217                             dn->dn_olduid, dn->dn_oldgid, B_TRUE, tx);
1218                 }
1219                 if (flags & DN_ID_NEW_EXIST) {
1220                         do_userquota_update(os, DN_USED_BYTES(dn->dn_phys),
1221                             dn->dn_phys->dn_flags,  dn->dn_newuid,
1222                             dn->dn_newgid, B_FALSE, tx);
1223                 }
1224 
1225                 mutex_enter(&dn->dn_mtx);
1226                 dn->dn_oldused = 0;
1227                 dn->dn_oldflags = 0;
1228                 if (dn->dn_id_flags & DN_ID_NEW_EXIST) {
1229                         dn->dn_olduid = dn->dn_newuid;
1230                         dn->dn_oldgid = dn->dn_newgid;
1231                         dn->dn_id_flags |= DN_ID_OLD_EXIST;
1232                         if (dn->dn_bonuslen == 0)
1233                                 dn->dn_id_flags |= DN_ID_CHKED_SPILL;
1234                         else
1235                                 dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1236                 }
1237                 dn->dn_id_flags &= ~(DN_ID_NEW_EXIST);
1238                 mutex_exit(&dn->dn_mtx);
1239 
1240                 list_remove(list, dn);
1241                 dnode_rele(dn, list);
1242         }
1243 }
1244 
1245 /*
1246  * Returns a pointer to data to find uid/gid from
1247  *
1248  * If a dirty record for transaction group that is syncing can't
1249  * be found then NULL is returned.  In the NULL case it is assumed
1250  * the uid/gid aren't changing.
1251  */
1252 static void *
1253 dmu_objset_userquota_find_data(dmu_buf_impl_t *db, dmu_tx_t *tx)
1254 {
1255         dbuf_dirty_record_t *dr, **drp;
1256         void *data;
1257 
1258         if (db->db_dirtycnt == 0)
1259                 return (db->db.db_data);  /* Nothing is changing */
1260 
1261         for (drp = &db->db_last_dirty; (dr = *drp) != NULL; drp = &dr->dr_next)
1262                 if (dr->dr_txg == tx->tx_txg)
1263                         break;
1264 
1265         if (dr == NULL) {
1266                 data = NULL;
1267         } else {
1268                 dnode_t *dn;
1269 
1270                 DB_DNODE_ENTER(dr->dr_dbuf);
1271                 dn = DB_DNODE(dr->dr_dbuf);
1272 
1273                 if (dn->dn_bonuslen == 0 &&
1274                     dr->dr_dbuf->db_blkid == DMU_SPILL_BLKID)
1275                         data = dr->dt.dl.dr_data->b_data;
1276                 else
1277                         data = dr->dt.dl.dr_data;
1278 
1279                 DB_DNODE_EXIT(dr->dr_dbuf);
1280         }
1281 
1282         return (data);
1283 }
1284 
1285 void
1286 dmu_objset_userquota_get_ids(dnode_t *dn, boolean_t before, dmu_tx_t *tx)
1287 {
1288         objset_t *os = dn->dn_objset;
1289         void *data = NULL;
1290         dmu_buf_impl_t *db = NULL;
1291         uint64_t *user = NULL;
1292         uint64_t *group = NULL;
1293         int flags = dn->dn_id_flags;
1294         int error;
1295         boolean_t have_spill = B_FALSE;
1296 
1297         if (!dmu_objset_userused_enabled(dn->dn_objset))
1298                 return;
1299 
1300         if (before && (flags & (DN_ID_CHKED_BONUS|DN_ID_OLD_EXIST|
1301             DN_ID_CHKED_SPILL)))
1302                 return;
1303 
1304         if (before && dn->dn_bonuslen != 0)
1305                 data = DN_BONUS(dn->dn_phys);
1306         else if (!before && dn->dn_bonuslen != 0) {
1307                 if (dn->dn_bonus) {
1308                         db = dn->dn_bonus;
1309                         mutex_enter(&db->db_mtx);
1310                         data = dmu_objset_userquota_find_data(db, tx);
1311                 } else {
1312                         data = DN_BONUS(dn->dn_phys);
1313                 }
1314         } else if (dn->dn_bonuslen == 0 && dn->dn_bonustype == DMU_OT_SA) {
1315                         int rf = 0;
1316 
1317                         if (RW_WRITE_HELD(&dn->dn_struct_rwlock))
1318                                 rf |= DB_RF_HAVESTRUCT;
1319                         error = dmu_spill_hold_by_dnode(dn,
1320                             rf | DB_RF_MUST_SUCCEED,
1321                             FTAG, (dmu_buf_t **)&db);
1322                         ASSERT(error == 0);
1323                         mutex_enter(&db->db_mtx);
1324                         data = (before) ? db->db.db_data :
1325                             dmu_objset_userquota_find_data(db, tx);
1326                         have_spill = B_TRUE;
1327         } else {
1328                 mutex_enter(&dn->dn_mtx);
1329                 dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1330                 mutex_exit(&dn->dn_mtx);
1331                 return;
1332         }
1333 
1334         if (before) {
1335                 ASSERT(data);
1336                 user = &dn->dn_olduid;
1337                 group = &dn->dn_oldgid;
1338         } else if (data) {
1339                 user = &dn->dn_newuid;
1340                 group = &dn->dn_newgid;
1341         }
1342 
1343         /*
1344          * Must always call the callback in case the object
1345          * type has changed and that type isn't an object type to track
1346          */
1347         error = used_cbs[os->os_phys->os_type](dn->dn_bonustype, data,
1348             user, group);
1349 
1350         /*
1351          * Preserve existing uid/gid when the callback can't determine
1352          * what the new uid/gid are and the callback returned EEXIST.
1353          * The EEXIST error tells us to just use the existing uid/gid.
1354          * If we don't know what the old values are then just assign
1355          * them to 0, since that is a new file  being created.
1356          */
1357         if (!before && data == NULL && error == EEXIST) {
1358                 if (flags & DN_ID_OLD_EXIST) {
1359                         dn->dn_newuid = dn->dn_olduid;
1360                         dn->dn_newgid = dn->dn_oldgid;
1361                 } else {
1362                         dn->dn_newuid = 0;
1363                         dn->dn_newgid = 0;
1364                 }
1365                 error = 0;
1366         }
1367 
1368         if (db)
1369                 mutex_exit(&db->db_mtx);
1370 
1371         mutex_enter(&dn->dn_mtx);
1372         if (error == 0 && before)
1373                 dn->dn_id_flags |= DN_ID_OLD_EXIST;
1374         if (error == 0 && !before)
1375                 dn->dn_id_flags |= DN_ID_NEW_EXIST;
1376 
1377         if (have_spill) {
1378                 dn->dn_id_flags |= DN_ID_CHKED_SPILL;
1379         } else {
1380                 dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1381         }
1382         mutex_exit(&dn->dn_mtx);
1383         if (have_spill)
1384                 dmu_buf_rele((dmu_buf_t *)db, FTAG);
1385 }
1386 
1387 boolean_t
1388 dmu_objset_userspace_present(objset_t *os)
1389 {
1390         return (os->os_phys->os_flags &
1391             OBJSET_FLAG_USERACCOUNTING_COMPLETE);
1392 }
1393 
1394 int
1395 dmu_objset_userspace_upgrade(objset_t *os)
1396 {
1397         uint64_t obj;
1398         int err = 0;
1399 
1400         if (dmu_objset_userspace_present(os))
1401                 return (0);
1402         if (!dmu_objset_userused_enabled(os))
1403                 return (SET_ERROR(ENOTSUP));
1404         if (dmu_objset_is_snapshot(os))
1405                 return (SET_ERROR(EINVAL));
1406 
1407         /*
1408          * We simply need to mark every object dirty, so that it will be
1409          * synced out and now accounted.  If this is called
1410          * concurrently, or if we already did some work before crashing,
1411          * that's fine, since we track each object's accounted state
1412          * independently.
1413          */
1414 
1415         for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE, 0)) {
1416                 dmu_tx_t *tx;
1417                 dmu_buf_t *db;
1418                 int objerr;
1419 
1420                 if (issig(JUSTLOOKING) && issig(FORREAL))
1421                         return (SET_ERROR(EINTR));
1422 
1423                 objerr = dmu_bonus_hold(os, obj, FTAG, &db);
1424                 if (objerr != 0)
1425                         continue;
1426                 tx = dmu_tx_create(os);
1427                 dmu_tx_hold_bonus(tx, obj);
1428                 objerr = dmu_tx_assign(tx, TXG_WAIT);
1429                 if (objerr != 0) {
1430                         dmu_tx_abort(tx);
1431                         continue;
1432                 }
1433                 dmu_buf_will_dirty(db, tx);
1434                 dmu_buf_rele(db, FTAG);
1435                 dmu_tx_commit(tx);
1436         }
1437 
1438         os->os_flags |= OBJSET_FLAG_USERACCOUNTING_COMPLETE;
1439         txg_wait_synced(dmu_objset_pool(os), 0);
1440         return (0);
1441 }
1442 
1443 void
1444 dmu_objset_space(objset_t *os, uint64_t *refdbytesp, uint64_t *availbytesp,
1445     uint64_t *usedobjsp, uint64_t *availobjsp)
1446 {
1447         dsl_dataset_space(os->os_dsl_dataset, refdbytesp, availbytesp,
1448             usedobjsp, availobjsp);
1449 }
1450 
1451 uint64_t
1452 dmu_objset_fsid_guid(objset_t *os)
1453 {
1454         return (dsl_dataset_fsid_guid(os->os_dsl_dataset));
1455 }
1456 
1457 void
1458 dmu_objset_fast_stat(objset_t *os, dmu_objset_stats_t *stat)
1459 {
1460         stat->dds_type = os->os_phys->os_type;
1461         if (os->os_dsl_dataset)
1462                 dsl_dataset_fast_stat(os->os_dsl_dataset, stat);
1463 }
1464 
1465 void
1466 dmu_objset_stats(objset_t *os, nvlist_t *nv)
1467 {
1468         ASSERT(os->os_dsl_dataset ||
1469             os->os_phys->os_type == DMU_OST_META);
1470 
1471         if (os->os_dsl_dataset != NULL)
1472                 dsl_dataset_stats(os->os_dsl_dataset, nv);
1473 
1474         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_TYPE,
1475             os->os_phys->os_type);
1476         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERACCOUNTING,
1477             dmu_objset_userspace_present(os));
1478 }
1479 
1480 int
1481 dmu_objset_is_snapshot(objset_t *os)
1482 {
1483         if (os->os_dsl_dataset != NULL)
1484                 return (dsl_dataset_is_snapshot(os->os_dsl_dataset));
1485         else
1486                 return (B_FALSE);
1487 }
1488 
1489 int
1490 dmu_snapshot_realname(objset_t *os, char *name, char *real, int maxlen,
1491     boolean_t *conflict)
1492 {
1493         dsl_dataset_t *ds = os->os_dsl_dataset;
1494         uint64_t ignored;
1495 
1496         if (ds->ds_phys->ds_snapnames_zapobj == 0)
1497                 return (SET_ERROR(ENOENT));
1498 
1499         return (zap_lookup_norm(ds->ds_dir->dd_pool->dp_meta_objset,
1500             ds->ds_phys->ds_snapnames_zapobj, name, 8, 1, &ignored, MT_FIRST,
1501             real, maxlen, conflict));
1502 }
1503 
1504 int
1505 dmu_snapshot_list_next(objset_t *os, int namelen, char *name,
1506     uint64_t *idp, uint64_t *offp, boolean_t *case_conflict)
1507 {
1508         dsl_dataset_t *ds = os->os_dsl_dataset;
1509         zap_cursor_t cursor;
1510         zap_attribute_t attr;
1511 
1512         ASSERT(dsl_pool_config_held(dmu_objset_pool(os)));
1513 
1514         if (ds->ds_phys->ds_snapnames_zapobj == 0)
1515                 return (SET_ERROR(ENOENT));
1516 
1517         zap_cursor_init_serialized(&cursor,
1518             ds->ds_dir->dd_pool->dp_meta_objset,
1519             ds->ds_phys->ds_snapnames_zapobj, *offp);
1520 
1521         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1522                 zap_cursor_fini(&cursor);
1523                 return (SET_ERROR(ENOENT));
1524         }
1525 
1526         if (strlen(attr.za_name) + 1 > namelen) {
1527                 zap_cursor_fini(&cursor);
1528                 return (SET_ERROR(ENAMETOOLONG));
1529         }
1530 
1531         (void) strcpy(name, attr.za_name);
1532         if (idp)
1533                 *idp = attr.za_first_integer;
1534         if (case_conflict)
1535                 *case_conflict = attr.za_normalization_conflict;
1536         zap_cursor_advance(&cursor);
1537         *offp = zap_cursor_serialize(&cursor);
1538         zap_cursor_fini(&cursor);
1539 
1540         return (0);
1541 }
1542 
1543 int
1544 dmu_dir_list_next(objset_t *os, int namelen, char *name,
1545     uint64_t *idp, uint64_t *offp)
1546 {
1547         dsl_dir_t *dd = os->os_dsl_dataset->ds_dir;
1548         zap_cursor_t cursor;
1549         zap_attribute_t attr;
1550 
1551         /* there is no next dir on a snapshot! */
1552         if (os->os_dsl_dataset->ds_object !=
1553             dd->dd_phys->dd_head_dataset_obj)
1554                 return (SET_ERROR(ENOENT));
1555 
1556         zap_cursor_init_serialized(&cursor,
1557             dd->dd_pool->dp_meta_objset,
1558             dd->dd_phys->dd_child_dir_zapobj, *offp);
1559 
1560         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1561                 zap_cursor_fini(&cursor);
1562                 return (SET_ERROR(ENOENT));
1563         }
1564 
1565         if (strlen(attr.za_name) + 1 > namelen) {
1566                 zap_cursor_fini(&cursor);
1567                 return (SET_ERROR(ENAMETOOLONG));
1568         }
1569 
1570         (void) strcpy(name, attr.za_name);
1571         if (idp)
1572                 *idp = attr.za_first_integer;
1573         zap_cursor_advance(&cursor);
1574         *offp = zap_cursor_serialize(&cursor);
1575         zap_cursor_fini(&cursor);
1576 
1577         return (0);
1578 }
1579 
1580 typedef struct dmu_objset_find_ctx {
1581         taskq_t         *dc_tq;
1582         dsl_pool_t      *dc_dp;
1583         uint64_t        dc_obj;
1584         int             (*dc_func)(dsl_pool_t *, dsl_dataset_t *, void *);
1585         void            *dc_arg;
1586         int             dc_flags;
1587         kmutex_t        *dc_error_lock;
1588         int             *dc_error;
1589 } dmu_objset_find_ctx_t;
1590 
1591 static void
1592 dmu_objset_find_dp_impl(void *arg)
1593 {
1594         dmu_objset_find_ctx_t *dcp = arg;
1595         dsl_pool_t *dp = dcp->dc_dp;
1596         dmu_objset_find_ctx_t *child_dcp;
1597         dsl_dir_t *dd;
1598         dsl_dataset_t *ds;
1599         zap_cursor_t zc;
1600         zap_attribute_t *attr;
1601         uint64_t thisobj;
1602         int err;
1603 
1604         dsl_pool_config_enter(dp, FTAG);
1605 
1606         /* don't process if there already was an error */
1607         if (*dcp->dc_error)
1608                 goto out;
1609 
1610         err = dsl_dir_hold_obj(dp, dcp->dc_obj, NULL, FTAG, &dd);
1611         if (err != 0)
1612                 goto fail;
1613 
1614         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1615         if (dd->dd_myname[0] == '$') {
1616                 dsl_dir_rele(dd, FTAG);
1617                 goto out;
1618         }
1619 
1620         thisobj = dd->dd_phys->dd_head_dataset_obj;
1621         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1622 
1623         /*
1624          * Iterate over all children.
1625          */
1626         if (dcp->dc_flags & DS_FIND_CHILDREN) {
1627                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1628                     dd->dd_phys->dd_child_dir_zapobj);
1629                     zap_cursor_retrieve(&zc, attr) == 0;
1630                     (void) zap_cursor_advance(&zc)) {
1631                         ASSERT3U(attr->za_integer_length, ==,
1632                             sizeof (uint64_t));
1633                         ASSERT3U(attr->za_num_integers, ==, 1);
1634 
1635                         child_dcp = kmem_alloc(sizeof(*child_dcp), KM_SLEEP);
1636                         *child_dcp = *dcp;
1637                         child_dcp->dc_obj = attr->za_first_integer;
1638                         taskq_dispatch(dcp->dc_tq, dmu_objset_find_dp_impl,
1639                             child_dcp, TQ_SLEEP);
1640                 }
1641                 zap_cursor_fini(&zc);
1642         }
1643 
1644         /*
1645          * Iterate over all snapshots.
1646          */
1647         if (dcp->dc_flags & DS_FIND_SNAPSHOTS) {
1648                 dsl_dataset_t *ds;
1649                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1650 
1651                 if (err == 0) {
1652                         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
1653                         dsl_dataset_rele(ds, FTAG);
1654 
1655                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1656                             zap_cursor_retrieve(&zc, attr) == 0;
1657                             (void) zap_cursor_advance(&zc)) {
1658                                 ASSERT3U(attr->za_integer_length, ==,
1659                                     sizeof (uint64_t));
1660                                 ASSERT3U(attr->za_num_integers, ==, 1);
1661 
1662                                 err = dsl_dataset_hold_obj(dp,
1663                                     attr->za_first_integer, FTAG, &ds);
1664                                 if (err != 0)
1665                                         break;
1666                                 err = dcp->dc_func(dp, ds, dcp->dc_arg);
1667                                 dsl_dataset_rele(ds, FTAG);
1668                                 if (err != 0)
1669                                         break;
1670                         }
1671                         zap_cursor_fini(&zc);
1672                 }
1673         }
1674 
1675         dsl_dir_rele(dd, FTAG);
1676         kmem_free(attr, sizeof (zap_attribute_t));
1677 
1678         if (err != 0)
1679                 goto fail;
1680 
1681         /*
1682          * Apply to self.
1683          */
1684         err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1685         if (err != 0)
1686                 goto fail;
1687         err = dcp->dc_func(dp, ds, dcp->dc_arg);
1688         dsl_dataset_rele(ds, FTAG);
1689 
1690 fail:
1691         if (err) {
1692                 mutex_enter(dcp->dc_error_lock);
1693                 /* only keep first error */
1694                 if (*dcp->dc_error == 0)
1695                         *dcp->dc_error = err;
1696                 mutex_exit(dcp->dc_error_lock);
1697         }
1698 
1699 out:
1700         dsl_pool_config_exit(dp, FTAG);
1701         kmem_free(dcp, sizeof(*dcp));
1702 }
1703 
1704 /*
1705  * Find objsets under and including ddobj, call func(ds) on each.
1706  * The order for the enumeration is completely undefined.
1707  * func is called with dsl_pool_config held.
1708  */
1709 int
1710 dmu_objset_find_dp(dsl_pool_t *dp, uint64_t ddobj,
1711     int func(dsl_pool_t *, dsl_dataset_t *, void *), void *arg, int flags)
1712 {
1713         int error = 0;
1714         taskq_t *tq = NULL;
1715         int ntasks;
1716         dmu_objset_find_ctx_t *dcp;
1717         kmutex_t err_lock;
1718 
1719         ntasks = vdev_count_leaves(dp->dp_spa) * 4;
1720         tq = taskq_create("dmu_objset_find", ntasks, minclsyspri, ntasks,
1721             INT_MAX, 0);
1722         if (!tq)
1723                 return (SET_ERROR(ENOMEM));
1724 
1725         mutex_init(&err_lock, NULL, MUTEX_DEFAULT, NULL);
1726         dcp = kmem_alloc(sizeof(*dcp), KM_SLEEP);
1727         dcp->dc_tq = tq;
1728         dcp->dc_dp = dp;
1729         dcp->dc_obj = ddobj;
1730         dcp->dc_func = func;
1731         dcp->dc_arg = arg;
1732         dcp->dc_flags = flags;
1733         dcp->dc_error_lock = &err_lock;
1734         dcp->dc_error = &error;
1735         /* dcp and dc_name will be freed by task */
1736         taskq_dispatch(tq, dmu_objset_find_dp_impl, dcp, TQ_SLEEP);
1737 
1738         taskq_wait(tq);
1739         taskq_destroy(tq);
1740         mutex_destroy(&err_lock);
1741 
1742         return (error);
1743 }
1744 
1745 /*
1746  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1747  * The dp_config_rwlock must not be held when this is called, and it
1748  * will not be held when the callback is called.
1749  * Therefore this function should only be used when the pool is not changing
1750  * (e.g. in syncing context), or the callback can deal with the possible races.
1751  */
1752 static int
1753 dmu_objset_find_impl(spa_t *spa, const char *name,
1754     int func(const char *, void *), void *arg, int flags)
1755 {
1756         dsl_dir_t *dd;
1757         dsl_pool_t *dp = spa_get_dsl(spa);
1758         dsl_dataset_t *ds;
1759         zap_cursor_t zc;
1760         zap_attribute_t *attr;
1761         char *child;
1762         uint64_t thisobj;
1763         int err;
1764 
1765         dsl_pool_config_enter(dp, FTAG);
1766 
1767         err = dsl_dir_hold(dp, name, FTAG, &dd, NULL);
1768         if (err != 0) {
1769                 dsl_pool_config_exit(dp, FTAG);
1770                 return (err);
1771         }
1772 
1773         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1774         if (dd->dd_myname[0] == '$') {
1775                 dsl_dir_rele(dd, FTAG);
1776                 dsl_pool_config_exit(dp, FTAG);
1777                 return (0);
1778         }
1779 
1780         thisobj = dd->dd_phys->dd_head_dataset_obj;
1781         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1782 
1783         /*
1784          * Iterate over all children.
1785          */
1786         if (flags & DS_FIND_CHILDREN) {
1787                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1788                     dd->dd_phys->dd_child_dir_zapobj);
1789                     zap_cursor_retrieve(&zc, attr) == 0;
1790                     (void) zap_cursor_advance(&zc)) {
1791                         ASSERT3U(attr->za_integer_length, ==,
1792                             sizeof (uint64_t));
1793                         ASSERT3U(attr->za_num_integers, ==, 1);
1794 
1795                         child = kmem_asprintf("%s/%s", name, attr->za_name);
1796                         dsl_pool_config_exit(dp, FTAG);
1797                         err = dmu_objset_find_impl(spa, child,
1798                             func, arg, flags);
1799                         dsl_pool_config_enter(dp, FTAG);
1800                         strfree(child);
1801                         if (err != 0)
1802                                 break;
1803                 }
1804                 zap_cursor_fini(&zc);
1805 
1806                 if (err != 0) {
1807                         dsl_dir_rele(dd, FTAG);
1808                         dsl_pool_config_exit(dp, FTAG);
1809                         kmem_free(attr, sizeof (zap_attribute_t));
1810                         return (err);
1811                 }
1812         }
1813 
1814         /*
1815          * Iterate over all snapshots.
1816          */
1817         if (flags & DS_FIND_SNAPSHOTS) {
1818                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1819 
1820                 if (err == 0) {
1821                         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
1822                         dsl_dataset_rele(ds, FTAG);
1823 
1824                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1825                             zap_cursor_retrieve(&zc, attr) == 0;
1826                             (void) zap_cursor_advance(&zc)) {
1827                                 ASSERT3U(attr->za_integer_length, ==,
1828                                     sizeof (uint64_t));
1829                                 ASSERT3U(attr->za_num_integers, ==, 1);
1830 
1831                                 child = kmem_asprintf("%s@%s",
1832                                     name, attr->za_name);
1833                                 dsl_pool_config_exit(dp, FTAG);
1834                                 err = func(child, arg);
1835                                 dsl_pool_config_enter(dp, FTAG);
1836                                 strfree(child);
1837                                 if (err != 0)
1838                                         break;
1839                         }
1840                         zap_cursor_fini(&zc);
1841                 }
1842         }
1843 
1844         dsl_dir_rele(dd, FTAG);
1845         kmem_free(attr, sizeof (zap_attribute_t));
1846         dsl_pool_config_exit(dp, FTAG);
1847 
1848         if (err != 0)
1849                 return (err);
1850 
1851         /* Apply to self. */
1852         return (func(name, arg));
1853 }
1854 
1855 /*
1856  * See comment above dmu_objset_find_impl().
1857  */
1858 int
1859 dmu_objset_find(char *name, int func(const char *, void *), void *arg,
1860     int flags)
1861 {
1862         spa_t *spa;
1863         int error;
1864 
1865         error = spa_open(name, &spa, FTAG);
1866         if (error != 0)
1867                 return (error);
1868         error = dmu_objset_find_impl(spa, name, func, arg, flags);
1869         spa_close(spa, FTAG);
1870         return (error);
1871 }
1872 
1873 void
1874 dmu_objset_set_user(objset_t *os, void *user_ptr)
1875 {
1876         ASSERT(MUTEX_HELD(&os->os_user_ptr_lock));
1877         os->os_user_ptr = user_ptr;
1878 }
1879 
1880 void *
1881 dmu_objset_get_user(objset_t *os)
1882 {
1883         ASSERT(MUTEX_HELD(&os->os_user_ptr_lock));
1884         return (os->os_user_ptr);
1885 }
1886 
1887 /*
1888  * Determine name of filesystem, given name of snapshot.
1889  * buf must be at least MAXNAMELEN bytes
1890  */
1891 int
1892 dmu_fsname(const char *snapname, char *buf)
1893 {
1894         char *atp = strchr(snapname, '@');
1895         if (atp == NULL)
1896                 return (SET_ERROR(EINVAL));
1897         if (atp - snapname >= MAXNAMELEN)
1898                 return (SET_ERROR(ENAMETOOLONG));
1899         (void) strlcpy(buf, snapname, atp - snapname + 1);
1900         return (0);
1901 }