Print this page
5269 zfs: zpool import slow
While importing a pool all objsets are enumerated twice, once to check
the zil log chains and once to claim them. On pools with many datasets
this process might take a substantial amount of time.
Speed up the process by parallelizing it utilizing a taskq. The number
of parallel tasks is limited to 4 times the number of leaf vdevs.


   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
  24  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  25  * Copyright (c) 2013, Joyent, Inc. All rights reserved.

  26  */
  27 
  28 /* Portions Copyright 2010 Robert Milkowski */
  29 
  30 #include <sys/cred.h>
  31 #include <sys/zfs_context.h>
  32 #include <sys/dmu_objset.h>
  33 #include <sys/dsl_dir.h>
  34 #include <sys/dsl_dataset.h>
  35 #include <sys/dsl_prop.h>
  36 #include <sys/dsl_pool.h>
  37 #include <sys/dsl_synctask.h>
  38 #include <sys/dsl_deleg.h>
  39 #include <sys/dnode.h>
  40 #include <sys/dbuf.h>
  41 #include <sys/zvol.h>
  42 #include <sys/dmu_tx.h>
  43 #include <sys/zap.h>
  44 #include <sys/zil.h>
  45 #include <sys/dmu_impl.h>
  46 #include <sys/zfs_ioctl.h>
  47 #include <sys/sa.h>
  48 #include <sys/zfs_onexit.h>
  49 #include <sys/dsl_destroy.h>

  50 
  51 /*
  52  * Needed to close a window in dnode_move() that allows the objset to be freed
  53  * before it can be safely accessed.
  54  */
  55 krwlock_t os_lock;
  56 
  57 void
  58 dmu_objset_init(void)
  59 {
  60         rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
  61 }
  62 
  63 void
  64 dmu_objset_fini(void)
  65 {
  66         rw_destroy(&os_lock);
  67 }
  68 
  69 spa_t *


 474         int err;
 475 
 476         err = dsl_pool_hold(name, tag, &dp);
 477         if (err != 0)
 478                 return (err);
 479         err = dsl_dataset_hold(dp, name, tag, &ds);
 480         if (err != 0) {
 481                 dsl_pool_rele(dp, tag);
 482                 return (err);
 483         }
 484 
 485         err = dmu_objset_from_ds(ds, osp);
 486         if (err != 0) {
 487                 dsl_dataset_rele(ds, tag);
 488                 dsl_pool_rele(dp, tag);
 489         }
 490 
 491         return (err);
 492 }
 493 



















 494 /*
 495  * dsl_pool must not be held when this is called.
 496  * Upon successful return, there will be a longhold on the dataset,
 497  * and the dsl_pool will not be held.
 498  */
 499 int
 500 dmu_objset_own(const char *name, dmu_objset_type_t type,
 501     boolean_t readonly, void *tag, objset_t **osp)
 502 {
 503         dsl_pool_t *dp;
 504         dsl_dataset_t *ds;
 505         int err;
 506 
 507         err = dsl_pool_hold(name, FTAG, &dp);
 508         if (err != 0)
 509                 return (err);
 510         err = dsl_dataset_own(dp, name, tag, &ds);
 511         if (err != 0) {
 512                 dsl_pool_rele(dp, FTAG);
 513                 return (err);
 514         }
 515 
 516         err = dmu_objset_from_ds(ds, osp);
 517         dsl_pool_rele(dp, FTAG);
 518         if (err != 0) {
 519                 dsl_dataset_disown(ds, tag);
 520         } else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
 521                 dsl_dataset_disown(ds, tag);
 522                 return (SET_ERROR(EINVAL));
 523         } else if (!readonly && dsl_dataset_is_snapshot(ds)) {
 524                 dsl_dataset_disown(ds, tag);
 525                 return (SET_ERROR(EROFS));
 526         }




 527         return (err);


 528 }
 529 
 530 void
 531 dmu_objset_rele(objset_t *os, void *tag)
 532 {
 533         dsl_pool_t *dp = dmu_objset_pool(os);
 534         dsl_dataset_rele(os->os_dsl_dataset, tag);
 535         dsl_pool_rele(dp, tag);
 536 }
 537 






 538 /*
 539  * When we are called, os MUST refer to an objset associated with a dataset
 540  * that is owned by 'tag'; that is, is held and long held by 'tag' and ds_owner
 541  * == tag.  We will then release and reacquire ownership of the dataset while
 542  * holding the pool config_rwlock to avoid intervening namespace or ownership
 543  * changes may occur.
 544  *
 545  * This exists solely to accommodate zfs_ioc_userspace_upgrade()'s desire to
 546  * release the hold on its dataset and acquire a new one on the dataset of the
 547  * same name so that it can be partially torn down and reconstructed.
 548  */
 549 void
 550 dmu_objset_refresh_ownership(objset_t *os, void *tag)
 551 {
 552         dsl_pool_t *dp;
 553         dsl_dataset_t *ds, *newds;
 554         char name[MAXNAMELEN];
 555 
 556         ds = os->os_dsl_dataset;
 557         VERIFY3P(ds, !=, NULL);


1528         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1529                 zap_cursor_fini(&cursor);
1530                 return (SET_ERROR(ENOENT));
1531         }
1532 
1533         if (strlen(attr.za_name) + 1 > namelen) {
1534                 zap_cursor_fini(&cursor);
1535                 return (SET_ERROR(ENAMETOOLONG));
1536         }
1537 
1538         (void) strcpy(name, attr.za_name);
1539         if (idp)
1540                 *idp = attr.za_first_integer;
1541         zap_cursor_advance(&cursor);
1542         *offp = zap_cursor_serialize(&cursor);
1543         zap_cursor_fini(&cursor);
1544 
1545         return (0);
1546 }
1547 
1548 /*
1549  * Find objsets under and including ddobj, call func(ds) on each.
1550  */
1551 int
1552 dmu_objset_find_dp(dsl_pool_t *dp, uint64_t ddobj,
1553     int func(dsl_pool_t *, dsl_dataset_t *, void *), void *arg, int flags)







1554 {



1555         dsl_dir_t *dd;
1556         dsl_dataset_t *ds;
1557         zap_cursor_t zc;
1558         zap_attribute_t *attr;
1559         uint64_t thisobj;
1560         int err;
1561 
1562         ASSERT(dsl_pool_config_held(dp));




1563 
1564         err = dsl_dir_hold_obj(dp, ddobj, NULL, FTAG, &dd);
1565         if (err != 0)
1566                 return (err);
1567 
1568         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1569         if (dd->dd_myname[0] == '$') {
1570                 dsl_dir_rele(dd, FTAG);
1571                 return (0);
1572         }
1573 
1574         thisobj = dd->dd_phys->dd_head_dataset_obj;
1575         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1576 
1577         /*
1578          * Iterate over all children.
1579          */
1580         if (flags & DS_FIND_CHILDREN) {
1581                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1582                     dd->dd_phys->dd_child_dir_zapobj);
1583                     zap_cursor_retrieve(&zc, attr) == 0;
1584                     (void) zap_cursor_advance(&zc)) {
1585                         ASSERT3U(attr->za_integer_length, ==,
1586                             sizeof (uint64_t));
1587                         ASSERT3U(attr->za_num_integers, ==, 1);
1588 
1589                         err = dmu_objset_find_dp(dp, attr->za_first_integer,
1590                             func, arg, flags);
1591                         if (err != 0)
1592                                 break;

1593                 }
1594                 zap_cursor_fini(&zc);
1595 
1596                 if (err != 0) {
1597                         dsl_dir_rele(dd, FTAG);
1598                         kmem_free(attr, sizeof (zap_attribute_t));
1599                         return (err);
1600                 }
1601         }
1602 
1603         /*
1604          * Iterate over all snapshots.
1605          */
1606         if (flags & DS_FIND_SNAPSHOTS) {
1607                 dsl_dataset_t *ds;
1608                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1609 
1610                 if (err == 0) {
1611                         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
1612                         dsl_dataset_rele(ds, FTAG);
1613 
1614                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1615                             zap_cursor_retrieve(&zc, attr) == 0;
1616                             (void) zap_cursor_advance(&zc)) {
1617                                 ASSERT3U(attr->za_integer_length, ==,
1618                                     sizeof (uint64_t));
1619                                 ASSERT3U(attr->za_num_integers, ==, 1);
1620 
1621                                 err = dsl_dataset_hold_obj(dp,
1622                                     attr->za_first_integer, FTAG, &ds);
1623                                 if (err != 0)
1624                                         break;
1625                                 err = func(dp, ds, arg);
1626                                 dsl_dataset_rele(ds, FTAG);
1627                                 if (err != 0)
1628                                         break;
1629                         }
1630                         zap_cursor_fini(&zc);
1631                 }
1632         }
1633 
1634         dsl_dir_rele(dd, FTAG);
1635         kmem_free(attr, sizeof (zap_attribute_t));
1636 
1637         if (err != 0)
1638                 return (err);
1639 
1640         /*
1641          * Apply to self.
1642          */
1643         err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1644         if (err != 0)
1645                 return (err);
1646         err = func(dp, ds, arg);
1647         dsl_dataset_rele(ds, FTAG);
1648         return (err);





















































1649 }
1650 
1651 /*
1652  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1653  * The dp_config_rwlock must not be held when this is called, and it
1654  * will not be held when the callback is called.
1655  * Therefore this function should only be used when the pool is not changing
1656  * (e.g. in syncing context), or the callback can deal with the possible races.
1657  */
1658 static int
1659 dmu_objset_find_impl(spa_t *spa, const char *name,
1660     int func(const char *, void *), void *arg, int flags)
1661 {
1662         dsl_dir_t *dd;
1663         dsl_pool_t *dp = spa_get_dsl(spa);
1664         dsl_dataset_t *ds;
1665         zap_cursor_t zc;
1666         zap_attribute_t *attr;
1667         char *child;
1668         uint64_t thisobj;




   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
  24  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  25  * Copyright (c) 2013, Joyent, Inc. All rights reserved.
  26  * Copyright (c) 2014, STRATO AG, Inc. All rights reserved.
  27  */
  28 
  29 /* Portions Copyright 2010 Robert Milkowski */
  30 
  31 #include <sys/cred.h>
  32 #include <sys/zfs_context.h>
  33 #include <sys/dmu_objset.h>
  34 #include <sys/dsl_dir.h>
  35 #include <sys/dsl_dataset.h>
  36 #include <sys/dsl_prop.h>
  37 #include <sys/dsl_pool.h>
  38 #include <sys/dsl_synctask.h>
  39 #include <sys/dsl_deleg.h>
  40 #include <sys/dnode.h>
  41 #include <sys/dbuf.h>
  42 #include <sys/zvol.h>
  43 #include <sys/dmu_tx.h>
  44 #include <sys/zap.h>
  45 #include <sys/zil.h>
  46 #include <sys/dmu_impl.h>
  47 #include <sys/zfs_ioctl.h>
  48 #include <sys/sa.h>
  49 #include <sys/zfs_onexit.h>
  50 #include <sys/dsl_destroy.h>
  51 #include <sys/vdev.h>
  52 
  53 /*
  54  * Needed to close a window in dnode_move() that allows the objset to be freed
  55  * before it can be safely accessed.
  56  */
  57 krwlock_t os_lock;
  58 
  59 void
  60 dmu_objset_init(void)
  61 {
  62         rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
  63 }
  64 
  65 void
  66 dmu_objset_fini(void)
  67 {
  68         rw_destroy(&os_lock);
  69 }
  70 
  71 spa_t *


 476         int err;
 477 
 478         err = dsl_pool_hold(name, tag, &dp);
 479         if (err != 0)
 480                 return (err);
 481         err = dsl_dataset_hold(dp, name, tag, &ds);
 482         if (err != 0) {
 483                 dsl_pool_rele(dp, tag);
 484                 return (err);
 485         }
 486 
 487         err = dmu_objset_from_ds(ds, osp);
 488         if (err != 0) {
 489                 dsl_dataset_rele(ds, tag);
 490                 dsl_pool_rele(dp, tag);
 491         }
 492 
 493         return (err);
 494 }
 495 
 496 static int
 497 dmu_objset_own_common(dsl_dataset_t *ds, dmu_objset_type_t type,
 498     boolean_t readonly, void *tag, objset_t **osp)
 499 {
 500         int err;
 501 
 502         err = dmu_objset_from_ds(ds, osp);
 503         if (err != 0) {
 504                 dsl_dataset_disown(ds, tag);
 505         } else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
 506                 dsl_dataset_disown(ds, tag);
 507                 return (SET_ERROR(EINVAL));
 508         } else if (!readonly && dsl_dataset_is_snapshot(ds)) {
 509                 dsl_dataset_disown(ds, tag);
 510                 return (SET_ERROR(EROFS));
 511         }
 512         return (err);
 513 }
 514 
 515 /*
 516  * dsl_pool must not be held when this is called.
 517  * Upon successful return, there will be a longhold on the dataset,
 518  * and the dsl_pool will not be held.
 519  */
 520 int
 521 dmu_objset_own(const char *name, dmu_objset_type_t type,
 522     boolean_t readonly, void *tag, objset_t **osp)
 523 {
 524         dsl_pool_t *dp;
 525         dsl_dataset_t *ds;
 526         int err;
 527 
 528         err = dsl_pool_hold(name, FTAG, &dp);
 529         if (err != 0)
 530                 return (err);
 531         err = dsl_dataset_own(dp, name, tag, &ds);
 532         if (err != 0) {
 533                 dsl_pool_rele(dp, FTAG);
 534                 return (err);
 535         }
 536         err = dmu_objset_own_common(ds, type, readonly, tag, osp);

 537         dsl_pool_rele(dp, FTAG);
 538 
 539         return (err);
 540 }
 541 
 542 int
 543 dmu_objset_own_obj(dsl_pool_t *dp, uint64_t obj, dmu_objset_type_t type,
 544     boolean_t readonly, void *tag, objset_t **osp)
 545 {
 546         dsl_dataset_t *ds;
 547         int err;
 548 
 549         err = dsl_dataset_own_obj(dp, obj, tag, &ds);
 550         if (err != 0)
 551                 return (err);
 552 
 553         return (dmu_objset_own_common(ds, type, readonly, tag, osp));
 554 }
 555 
 556 void
 557 dmu_objset_rele(objset_t *os, void *tag)
 558 {
 559         dsl_pool_t *dp = dmu_objset_pool(os);
 560         dsl_dataset_rele(os->os_dsl_dataset, tag);
 561         dsl_pool_rele(dp, tag);
 562 }
 563 
 564 void
 565 dmu_objset_rele_obj(objset_t *os, void *tag)
 566 {
 567         dsl_dataset_rele(os->os_dsl_dataset, tag);
 568 }
 569 
 570 /*
 571  * When we are called, os MUST refer to an objset associated with a dataset
 572  * that is owned by 'tag'; that is, is held and long held by 'tag' and ds_owner
 573  * == tag.  We will then release and reacquire ownership of the dataset while
 574  * holding the pool config_rwlock to avoid intervening namespace or ownership
 575  * changes may occur.
 576  *
 577  * This exists solely to accommodate zfs_ioc_userspace_upgrade()'s desire to
 578  * release the hold on its dataset and acquire a new one on the dataset of the
 579  * same name so that it can be partially torn down and reconstructed.
 580  */
 581 void
 582 dmu_objset_refresh_ownership(objset_t *os, void *tag)
 583 {
 584         dsl_pool_t *dp;
 585         dsl_dataset_t *ds, *newds;
 586         char name[MAXNAMELEN];
 587 
 588         ds = os->os_dsl_dataset;
 589         VERIFY3P(ds, !=, NULL);


1560         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1561                 zap_cursor_fini(&cursor);
1562                 return (SET_ERROR(ENOENT));
1563         }
1564 
1565         if (strlen(attr.za_name) + 1 > namelen) {
1566                 zap_cursor_fini(&cursor);
1567                 return (SET_ERROR(ENAMETOOLONG));
1568         }
1569 
1570         (void) strcpy(name, attr.za_name);
1571         if (idp)
1572                 *idp = attr.za_first_integer;
1573         zap_cursor_advance(&cursor);
1574         *offp = zap_cursor_serialize(&cursor);
1575         zap_cursor_fini(&cursor);
1576 
1577         return (0);
1578 }
1579 
1580 typedef struct dmu_objset_find_ctx {
1581         taskq_t         *dc_tq;
1582         dsl_pool_t      *dc_dp;
1583         uint64_t        dc_obj;
1584         int             (*dc_func)(dsl_pool_t *, dsl_dataset_t *, void *);
1585         void            *dc_arg;
1586         int             dc_flags;
1587         kmutex_t        *dc_error_lock;
1588         int             *dc_error;
1589 } dmu_objset_find_ctx_t;
1590 
1591 static void
1592 dmu_objset_find_dp_impl(void *arg)
1593 {
1594         dmu_objset_find_ctx_t *dcp = arg;
1595         dsl_pool_t *dp = dcp->dc_dp;
1596         dmu_objset_find_ctx_t *child_dcp;
1597         dsl_dir_t *dd;
1598         dsl_dataset_t *ds;
1599         zap_cursor_t zc;
1600         zap_attribute_t *attr;
1601         uint64_t thisobj;
1602         int err;
1603 
1604         dsl_pool_config_enter(dp, FTAG);
1605 
1606         /* don't process if there already was an error */
1607         if (*dcp->dc_error)
1608                 goto out;
1609 
1610         err = dsl_dir_hold_obj(dp, dcp->dc_obj, NULL, FTAG, &dd);
1611         if (err != 0)
1612                 goto fail;
1613 
1614         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1615         if (dd->dd_myname[0] == '$') {
1616                 dsl_dir_rele(dd, FTAG);
1617                 goto out;
1618         }
1619 
1620         thisobj = dd->dd_phys->dd_head_dataset_obj;
1621         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1622 
1623         /*
1624          * Iterate over all children.
1625          */
1626         if (dcp->dc_flags & DS_FIND_CHILDREN) {
1627                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1628                     dd->dd_phys->dd_child_dir_zapobj);
1629                     zap_cursor_retrieve(&zc, attr) == 0;
1630                     (void) zap_cursor_advance(&zc)) {
1631                         ASSERT3U(attr->za_integer_length, ==,
1632                             sizeof (uint64_t));
1633                         ASSERT3U(attr->za_num_integers, ==, 1);
1634 
1635                         child_dcp = kmem_alloc(sizeof(*child_dcp), KM_SLEEP);
1636                         *child_dcp = *dcp;
1637                         child_dcp->dc_obj = attr->za_first_integer;
1638                         taskq_dispatch(dcp->dc_tq, dmu_objset_find_dp_impl,
1639                             child_dcp, TQ_SLEEP);
1640                 }
1641                 zap_cursor_fini(&zc);






1642         }
1643 
1644         /*
1645          * Iterate over all snapshots.
1646          */
1647         if (dcp->dc_flags & DS_FIND_SNAPSHOTS) {
1648                 dsl_dataset_t *ds;
1649                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1650 
1651                 if (err == 0) {
1652                         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
1653                         dsl_dataset_rele(ds, FTAG);
1654 
1655                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1656                             zap_cursor_retrieve(&zc, attr) == 0;
1657                             (void) zap_cursor_advance(&zc)) {
1658                                 ASSERT3U(attr->za_integer_length, ==,
1659                                     sizeof (uint64_t));
1660                                 ASSERT3U(attr->za_num_integers, ==, 1);
1661 
1662                                 err = dsl_dataset_hold_obj(dp,
1663                                     attr->za_first_integer, FTAG, &ds);
1664                                 if (err != 0)
1665                                         break;
1666                                 err = dcp->dc_func(dp, ds, dcp->dc_arg);
1667                                 dsl_dataset_rele(ds, FTAG);
1668                                 if (err != 0)
1669                                         break;
1670                         }
1671                         zap_cursor_fini(&zc);
1672                 }
1673         }
1674 
1675         dsl_dir_rele(dd, FTAG);
1676         kmem_free(attr, sizeof (zap_attribute_t));
1677 
1678         if (err != 0)
1679                 goto fail;
1680 
1681         /*
1682          * Apply to self.
1683          */
1684         err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1685         if (err != 0)
1686                 goto fail;
1687         err = dcp->dc_func(dp, ds, dcp->dc_arg);
1688         dsl_dataset_rele(ds, FTAG);
1689 
1690 fail:
1691         if (err) {
1692                 mutex_enter(dcp->dc_error_lock);
1693                 /* only keep first error */
1694                 if (*dcp->dc_error == 0)
1695                         *dcp->dc_error = err;
1696                 mutex_exit(dcp->dc_error_lock);
1697         }
1698 
1699 out:
1700         dsl_pool_config_exit(dp, FTAG);
1701         kmem_free(dcp, sizeof(*dcp));
1702 }
1703 
1704 /*
1705  * Find objsets under and including ddobj, call func(ds) on each.
1706  * The order for the enumeration is completely undefined.
1707  * func is called with dsl_pool_config held.
1708  */
1709 int
1710 dmu_objset_find_dp(dsl_pool_t *dp, uint64_t ddobj,
1711     int func(dsl_pool_t *, dsl_dataset_t *, void *), void *arg, int flags)
1712 {
1713         int error = 0;
1714         taskq_t *tq = NULL;
1715         int ntasks;
1716         dmu_objset_find_ctx_t *dcp;
1717         kmutex_t err_lock;
1718 
1719         ntasks = vdev_count_leaves(dp->dp_spa) * 4;
1720         tq = taskq_create("dmu_objset_find", ntasks, minclsyspri, ntasks,
1721             INT_MAX, 0);
1722         if (!tq)
1723                 return (SET_ERROR(ENOMEM));
1724 
1725         mutex_init(&err_lock, NULL, MUTEX_DEFAULT, NULL);
1726         dcp = kmem_alloc(sizeof(*dcp), KM_SLEEP);
1727         dcp->dc_tq = tq;
1728         dcp->dc_dp = dp;
1729         dcp->dc_obj = ddobj;
1730         dcp->dc_func = func;
1731         dcp->dc_arg = arg;
1732         dcp->dc_flags = flags;
1733         dcp->dc_error_lock = &err_lock;
1734         dcp->dc_error = &error;
1735         /* dcp and dc_name will be freed by task */
1736         taskq_dispatch(tq, dmu_objset_find_dp_impl, dcp, TQ_SLEEP);
1737 
1738         taskq_wait(tq);
1739         taskq_destroy(tq);
1740         mutex_destroy(&err_lock);
1741 
1742         return (error);
1743 }
1744 
1745 /*
1746  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1747  * The dp_config_rwlock must not be held when this is called, and it
1748  * will not be held when the callback is called.
1749  * Therefore this function should only be used when the pool is not changing
1750  * (e.g. in syncing context), or the callback can deal with the possible races.
1751  */
1752 static int
1753 dmu_objset_find_impl(spa_t *spa, const char *name,
1754     int func(const char *, void *), void *arg, int flags)
1755 {
1756         dsl_dir_t *dd;
1757         dsl_pool_t *dp = spa_get_dsl(spa);
1758         dsl_dataset_t *ds;
1759         zap_cursor_t zc;
1760         zap_attribute_t *attr;
1761         char *child;
1762         uint64_t thisobj;