Print this page
5269 zfs: zpool import slow
PORTING: this code relies on the property of taskq_wait to wait
until no more tasks are queued and no more tasks are active. As
we always queue new tasks from within other tasks, task_wait
reliably waits for the full recursion to finish, even though we
enqueue new tasks after taskq_wait has been called.
On platforms other than illumos, taskq_wait may not have this
property.
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Dan McDonald <danmcd@omniti.com>
Reviewed by: George Wilson <george.wilson@delphix.com>


   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
  24  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  25  * Copyright (c) 2013, Joyent, Inc. All rights reserved.
  26  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
  27  * Copyright 2015 Nexenta Systems, Inc. All rights reserved.

  28  */
  29 
  30 /* Portions Copyright 2010 Robert Milkowski */
  31 
  32 #include <sys/cred.h>
  33 #include <sys/zfs_context.h>
  34 #include <sys/dmu_objset.h>
  35 #include <sys/dsl_dir.h>
  36 #include <sys/dsl_dataset.h>
  37 #include <sys/dsl_prop.h>
  38 #include <sys/dsl_pool.h>
  39 #include <sys/dsl_synctask.h>
  40 #include <sys/dsl_deleg.h>
  41 #include <sys/dnode.h>
  42 #include <sys/dbuf.h>
  43 #include <sys/zvol.h>
  44 #include <sys/dmu_tx.h>
  45 #include <sys/zap.h>
  46 #include <sys/zil.h>
  47 #include <sys/dmu_impl.h>
  48 #include <sys/zfs_ioctl.h>
  49 #include <sys/sa.h>
  50 #include <sys/zfs_onexit.h>
  51 #include <sys/dsl_destroy.h>

  52 
  53 /*
  54  * Needed to close a window in dnode_move() that allows the objset to be freed
  55  * before it can be safely accessed.
  56  */
  57 krwlock_t os_lock;
  58 










  59 void
  60 dmu_objset_init(void)
  61 {
  62         rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
  63 }
  64 
  65 void
  66 dmu_objset_fini(void)
  67 {
  68         rw_destroy(&os_lock);
  69 }
  70 
  71 spa_t *
  72 dmu_objset_spa(objset_t *os)
  73 {
  74         return (os->os_spa);
  75 }
  76 
  77 zilog_t *
  78 dmu_objset_zil(objset_t *os)


 487         int err;
 488 
 489         err = dsl_pool_hold(name, tag, &dp);
 490         if (err != 0)
 491                 return (err);
 492         err = dsl_dataset_hold(dp, name, tag, &ds);
 493         if (err != 0) {
 494                 dsl_pool_rele(dp, tag);
 495                 return (err);
 496         }
 497 
 498         err = dmu_objset_from_ds(ds, osp);
 499         if (err != 0) {
 500                 dsl_dataset_rele(ds, tag);
 501                 dsl_pool_rele(dp, tag);
 502         }
 503 
 504         return (err);
 505 }
 506 



















 507 /*
 508  * dsl_pool must not be held when this is called.
 509  * Upon successful return, there will be a longhold on the dataset,
 510  * and the dsl_pool will not be held.
 511  */
 512 int
 513 dmu_objset_own(const char *name, dmu_objset_type_t type,
 514     boolean_t readonly, void *tag, objset_t **osp)
 515 {
 516         dsl_pool_t *dp;
 517         dsl_dataset_t *ds;
 518         int err;
 519 
 520         err = dsl_pool_hold(name, FTAG, &dp);
 521         if (err != 0)
 522                 return (err);
 523         err = dsl_dataset_own(dp, name, tag, &ds);
 524         if (err != 0) {
 525                 dsl_pool_rele(dp, FTAG);
 526                 return (err);
 527         }
 528 
 529         err = dmu_objset_from_ds(ds, osp);
 530         dsl_pool_rele(dp, FTAG);
 531         if (err != 0) {
 532                 dsl_dataset_disown(ds, tag);
 533         } else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
 534                 dsl_dataset_disown(ds, tag);
 535                 return (SET_ERROR(EINVAL));
 536         } else if (!readonly && ds->ds_is_snapshot) {
 537                 dsl_dataset_disown(ds, tag);
 538                 return (SET_ERROR(EROFS));
 539         }
 540         return (err);
 541 }
 542 














 543 void
 544 dmu_objset_rele(objset_t *os, void *tag)
 545 {
 546         dsl_pool_t *dp = dmu_objset_pool(os);
 547         dsl_dataset_rele(os->os_dsl_dataset, tag);
 548         dsl_pool_rele(dp, tag);
 549 }
 550 
 551 /*
 552  * When we are called, os MUST refer to an objset associated with a dataset
 553  * that is owned by 'tag'; that is, is held and long held by 'tag' and ds_owner
 554  * == tag.  We will then release and reacquire ownership of the dataset while
 555  * holding the pool config_rwlock to avoid intervening namespace or ownership
 556  * changes may occur.
 557  *
 558  * This exists solely to accommodate zfs_ioc_userspace_upgrade()'s desire to
 559  * release the hold on its dataset and acquire a new one on the dataset of the
 560  * same name so that it can be partially torn down and reconstructed.
 561  */
 562 void


1563         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1564                 zap_cursor_fini(&cursor);
1565                 return (SET_ERROR(ENOENT));
1566         }
1567 
1568         if (strlen(attr.za_name) + 1 > namelen) {
1569                 zap_cursor_fini(&cursor);
1570                 return (SET_ERROR(ENAMETOOLONG));
1571         }
1572 
1573         (void) strcpy(name, attr.za_name);
1574         if (idp)
1575                 *idp = attr.za_first_integer;
1576         zap_cursor_advance(&cursor);
1577         *offp = zap_cursor_serialize(&cursor);
1578         zap_cursor_fini(&cursor);
1579 
1580         return (0);
1581 }
1582 
1583 /*
1584  * Find objsets under and including ddobj, call func(ds) on each.
1585  */
1586 int
1587 dmu_objset_find_dp(dsl_pool_t *dp, uint64_t ddobj,
1588     int func(dsl_pool_t *, dsl_dataset_t *, void *), void *arg, int flags)







1589 {


1590         dsl_dir_t *dd;
1591         dsl_dataset_t *ds;
1592         zap_cursor_t zc;
1593         zap_attribute_t *attr;
1594         uint64_t thisobj;
1595         int err;
1596 
1597         ASSERT(dsl_pool_config_held(dp));


1598 
1599         err = dsl_dir_hold_obj(dp, ddobj, NULL, FTAG, &dd);
1600         if (err != 0)
1601                 return (err);
1602 
1603         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1604         if (dd->dd_myname[0] == '$') {
1605                 dsl_dir_rele(dd, FTAG);
1606                 return (0);
1607         }
1608 
1609         thisobj = dsl_dir_phys(dd)->dd_head_dataset_obj;
1610         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1611 
1612         /*
1613          * Iterate over all children.
1614          */
1615         if (flags & DS_FIND_CHILDREN) {
1616                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1617                     dsl_dir_phys(dd)->dd_child_dir_zapobj);
1618                     zap_cursor_retrieve(&zc, attr) == 0;
1619                     (void) zap_cursor_advance(&zc)) {
1620                         ASSERT3U(attr->za_integer_length, ==,
1621                             sizeof (uint64_t));
1622                         ASSERT3U(attr->za_num_integers, ==, 1);
1623 
1624                         err = dmu_objset_find_dp(dp, attr->za_first_integer,
1625                             func, arg, flags);
1626                         if (err != 0)
1627                                 break;




1628                 }
1629                 zap_cursor_fini(&zc);
1630 
1631                 if (err != 0) {
1632                         dsl_dir_rele(dd, FTAG);
1633                         kmem_free(attr, sizeof (zap_attribute_t));
1634                         return (err);
1635                 }
1636         }
1637 
1638         /*
1639          * Iterate over all snapshots.
1640          */
1641         if (flags & DS_FIND_SNAPSHOTS) {
1642                 dsl_dataset_t *ds;
1643                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1644 
1645                 if (err == 0) {
1646                         uint64_t snapobj;
1647 
1648                         snapobj = dsl_dataset_phys(ds)->ds_snapnames_zapobj;
1649                         dsl_dataset_rele(ds, FTAG);
1650 
1651                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1652                             zap_cursor_retrieve(&zc, attr) == 0;
1653                             (void) zap_cursor_advance(&zc)) {
1654                                 ASSERT3U(attr->za_integer_length, ==,
1655                                     sizeof (uint64_t));
1656                                 ASSERT3U(attr->za_num_integers, ==, 1);
1657 
1658                                 err = dsl_dataset_hold_obj(dp,
1659                                     attr->za_first_integer, FTAG, &ds);
1660                                 if (err != 0)
1661                                         break;
1662                                 err = func(dp, ds, arg);
1663                                 dsl_dataset_rele(ds, FTAG);
1664                                 if (err != 0)
1665                                         break;
1666                         }
1667                         zap_cursor_fini(&zc);
1668                 }
1669         }
1670 
1671         dsl_dir_rele(dd, FTAG);
1672         kmem_free(attr, sizeof (zap_attribute_t));
1673 
1674         if (err != 0)
1675                 return (err);
1676 
1677         /*
1678          * Apply to self.
1679          */
1680         err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1681         if (err != 0)
1682                 return (err);
1683         err = func(dp, ds, arg);
1684         dsl_dataset_rele(ds, FTAG);
1685         return (err);


































































































1686 }
1687 
1688 /*
1689  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1690  * The dp_config_rwlock must not be held when this is called, and it
1691  * will not be held when the callback is called.
1692  * Therefore this function should only be used when the pool is not changing
1693  * (e.g. in syncing context), or the callback can deal with the possible races.
1694  */
1695 static int
1696 dmu_objset_find_impl(spa_t *spa, const char *name,
1697     int func(const char *, void *), void *arg, int flags)
1698 {
1699         dsl_dir_t *dd;
1700         dsl_pool_t *dp = spa_get_dsl(spa);
1701         dsl_dataset_t *ds;
1702         zap_cursor_t zc;
1703         zap_attribute_t *attr;
1704         char *child;
1705         uint64_t thisobj;




   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
  24  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  25  * Copyright (c) 2013, Joyent, Inc. All rights reserved.
  26  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
  27  * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
  28  * Copyright (c) 2015, STRATO AG, Inc. All rights reserved.
  29  */
  30 
  31 /* Portions Copyright 2010 Robert Milkowski */
  32 
  33 #include <sys/cred.h>
  34 #include <sys/zfs_context.h>
  35 #include <sys/dmu_objset.h>
  36 #include <sys/dsl_dir.h>
  37 #include <sys/dsl_dataset.h>
  38 #include <sys/dsl_prop.h>
  39 #include <sys/dsl_pool.h>
  40 #include <sys/dsl_synctask.h>
  41 #include <sys/dsl_deleg.h>
  42 #include <sys/dnode.h>
  43 #include <sys/dbuf.h>
  44 #include <sys/zvol.h>
  45 #include <sys/dmu_tx.h>
  46 #include <sys/zap.h>
  47 #include <sys/zil.h>
  48 #include <sys/dmu_impl.h>
  49 #include <sys/zfs_ioctl.h>
  50 #include <sys/sa.h>
  51 #include <sys/zfs_onexit.h>
  52 #include <sys/dsl_destroy.h>
  53 #include <sys/vdev.h>
  54 
  55 /*
  56  * Needed to close a window in dnode_move() that allows the objset to be freed
  57  * before it can be safely accessed.
  58  */
  59 krwlock_t os_lock;
  60 
  61 /*
  62  * Tunable to overwrite the maximum number of threads for the parallization
  63  * of dmu_objset_find_dp, needed to speed up the import of pools with many
  64  * datasets.
  65  * Default is 4 times the number of leaf vdevs.
  66  */
  67 int dmu_find_threads = 0;
  68 
  69 static void dmu_objset_find_dp_cb(void *arg);
  70 
  71 void
  72 dmu_objset_init(void)
  73 {
  74         rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
  75 }
  76 
  77 void
  78 dmu_objset_fini(void)
  79 {
  80         rw_destroy(&os_lock);
  81 }
  82 
  83 spa_t *
  84 dmu_objset_spa(objset_t *os)
  85 {
  86         return (os->os_spa);
  87 }
  88 
  89 zilog_t *
  90 dmu_objset_zil(objset_t *os)


 499         int err;
 500 
 501         err = dsl_pool_hold(name, tag, &dp);
 502         if (err != 0)
 503                 return (err);
 504         err = dsl_dataset_hold(dp, name, tag, &ds);
 505         if (err != 0) {
 506                 dsl_pool_rele(dp, tag);
 507                 return (err);
 508         }
 509 
 510         err = dmu_objset_from_ds(ds, osp);
 511         if (err != 0) {
 512                 dsl_dataset_rele(ds, tag);
 513                 dsl_pool_rele(dp, tag);
 514         }
 515 
 516         return (err);
 517 }
 518 
 519 static int
 520 dmu_objset_own_impl(dsl_dataset_t *ds, dmu_objset_type_t type,
 521     boolean_t readonly, void *tag, objset_t **osp)
 522 {
 523         int err;
 524 
 525         err = dmu_objset_from_ds(ds, osp);
 526         if (err != 0) {
 527                 dsl_dataset_disown(ds, tag);
 528         } else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
 529                 dsl_dataset_disown(ds, tag);
 530                 return (SET_ERROR(EINVAL));
 531         } else if (!readonly && dsl_dataset_is_snapshot(ds)) {
 532                 dsl_dataset_disown(ds, tag);
 533                 return (SET_ERROR(EROFS));
 534         }
 535         return (err);
 536 }
 537 
 538 /*
 539  * dsl_pool must not be held when this is called.
 540  * Upon successful return, there will be a longhold on the dataset,
 541  * and the dsl_pool will not be held.
 542  */
 543 int
 544 dmu_objset_own(const char *name, dmu_objset_type_t type,
 545     boolean_t readonly, void *tag, objset_t **osp)
 546 {
 547         dsl_pool_t *dp;
 548         dsl_dataset_t *ds;
 549         int err;
 550 
 551         err = dsl_pool_hold(name, FTAG, &dp);
 552         if (err != 0)
 553                 return (err);
 554         err = dsl_dataset_own(dp, name, tag, &ds);
 555         if (err != 0) {
 556                 dsl_pool_rele(dp, FTAG);
 557                 return (err);
 558         }
 559         err = dmu_objset_own_impl(ds, type, readonly, tag, osp);

 560         dsl_pool_rele(dp, FTAG);
 561 








 562         return (err);
 563 }
 564 
 565 int
 566 dmu_objset_own_obj(dsl_pool_t *dp, uint64_t obj, dmu_objset_type_t type,
 567     boolean_t readonly, void *tag, objset_t **osp)
 568 {
 569         dsl_dataset_t *ds;
 570         int err;
 571 
 572         err = dsl_dataset_own_obj(dp, obj, tag, &ds);
 573         if (err != 0)
 574                 return (err);
 575 
 576         return (dmu_objset_own_impl(ds, type, readonly, tag, osp));
 577 }
 578 
 579 void
 580 dmu_objset_rele(objset_t *os, void *tag)
 581 {
 582         dsl_pool_t *dp = dmu_objset_pool(os);
 583         dsl_dataset_rele(os->os_dsl_dataset, tag);
 584         dsl_pool_rele(dp, tag);
 585 }
 586 
 587 /*
 588  * When we are called, os MUST refer to an objset associated with a dataset
 589  * that is owned by 'tag'; that is, is held and long held by 'tag' and ds_owner
 590  * == tag.  We will then release and reacquire ownership of the dataset while
 591  * holding the pool config_rwlock to avoid intervening namespace or ownership
 592  * changes may occur.
 593  *
 594  * This exists solely to accommodate zfs_ioc_userspace_upgrade()'s desire to
 595  * release the hold on its dataset and acquire a new one on the dataset of the
 596  * same name so that it can be partially torn down and reconstructed.
 597  */
 598 void


1599         if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1600                 zap_cursor_fini(&cursor);
1601                 return (SET_ERROR(ENOENT));
1602         }
1603 
1604         if (strlen(attr.za_name) + 1 > namelen) {
1605                 zap_cursor_fini(&cursor);
1606                 return (SET_ERROR(ENAMETOOLONG));
1607         }
1608 
1609         (void) strcpy(name, attr.za_name);
1610         if (idp)
1611                 *idp = attr.za_first_integer;
1612         zap_cursor_advance(&cursor);
1613         *offp = zap_cursor_serialize(&cursor);
1614         zap_cursor_fini(&cursor);
1615 
1616         return (0);
1617 }
1618 
1619 typedef struct dmu_objset_find_ctx {
1620         taskq_t         *dc_tq;
1621         dsl_pool_t      *dc_dp;
1622         uint64_t        dc_ddobj;
1623         int             (*dc_func)(dsl_pool_t *, dsl_dataset_t *, void *);
1624         void            *dc_arg;
1625         int             dc_flags;
1626         kmutex_t        *dc_error_lock;
1627         int             *dc_error;
1628 } dmu_objset_find_ctx_t;
1629 
1630 static void
1631 dmu_objset_find_dp_impl(dmu_objset_find_ctx_t *dcp)
1632 {
1633         dsl_pool_t *dp = dcp->dc_dp;
1634         dmu_objset_find_ctx_t *child_dcp;
1635         dsl_dir_t *dd;
1636         dsl_dataset_t *ds;
1637         zap_cursor_t zc;
1638         zap_attribute_t *attr;
1639         uint64_t thisobj;
1640         int err = 0;
1641 
1642         /* don't process if there already was an error */
1643         if (*dcp->dc_error != 0)
1644                 goto out;
1645 
1646         err = dsl_dir_hold_obj(dp, dcp->dc_ddobj, NULL, FTAG, &dd);
1647         if (err != 0)
1648                 goto out;
1649 
1650         /* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1651         if (dd->dd_myname[0] == '$') {
1652                 dsl_dir_rele(dd, FTAG);
1653                 goto out;
1654         }
1655 
1656         thisobj = dsl_dir_phys(dd)->dd_head_dataset_obj;
1657         attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1658 
1659         /*
1660          * Iterate over all children.
1661          */
1662         if (dcp->dc_flags & DS_FIND_CHILDREN) {
1663                 for (zap_cursor_init(&zc, dp->dp_meta_objset,
1664                     dsl_dir_phys(dd)->dd_child_dir_zapobj);
1665                     zap_cursor_retrieve(&zc, attr) == 0;
1666                     (void) zap_cursor_advance(&zc)) {
1667                         ASSERT3U(attr->za_integer_length, ==,
1668                             sizeof (uint64_t));
1669                         ASSERT3U(attr->za_num_integers, ==, 1);
1670 
1671                         child_dcp = kmem_alloc(sizeof(*child_dcp), KM_SLEEP);
1672                         *child_dcp = *dcp;
1673                         child_dcp->dc_ddobj = attr->za_first_integer;
1674                         if (dcp->dc_tq != NULL)
1675                                 (void) taskq_dispatch(dcp->dc_tq,
1676                                     dmu_objset_find_dp_cb, child_dcp, TQ_SLEEP);
1677                         else
1678                                 dmu_objset_find_dp_impl(child_dcp);
1679                 }
1680                 zap_cursor_fini(&zc);






1681         }
1682 
1683         /*
1684          * Iterate over all snapshots.
1685          */
1686         if (dcp->dc_flags & DS_FIND_SNAPSHOTS) {
1687                 dsl_dataset_t *ds;
1688                 err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1689 
1690                 if (err == 0) {
1691                         uint64_t snapobj;
1692 
1693                         snapobj = dsl_dataset_phys(ds)->ds_snapnames_zapobj;
1694                         dsl_dataset_rele(ds, FTAG);
1695 
1696                         for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1697                             zap_cursor_retrieve(&zc, attr) == 0;
1698                             (void) zap_cursor_advance(&zc)) {
1699                                 ASSERT3U(attr->za_integer_length, ==,
1700                                     sizeof (uint64_t));
1701                                 ASSERT3U(attr->za_num_integers, ==, 1);
1702 
1703                                 err = dsl_dataset_hold_obj(dp,
1704                                     attr->za_first_integer, FTAG, &ds);
1705                                 if (err != 0)
1706                                         break;
1707                                 err = dcp->dc_func(dp, ds, dcp->dc_arg);
1708                                 dsl_dataset_rele(ds, FTAG);
1709                                 if (err != 0)
1710                                         break;
1711                         }
1712                         zap_cursor_fini(&zc);
1713                 }
1714         }
1715 
1716         dsl_dir_rele(dd, FTAG);
1717         kmem_free(attr, sizeof (zap_attribute_t));
1718 
1719         if (err != 0)
1720                 goto out;
1721 
1722         /*
1723          * Apply to self.
1724          */
1725         err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1726         if (err != 0)
1727                 goto out;
1728         err = dcp->dc_func(dp, ds, dcp->dc_arg);
1729         dsl_dataset_rele(ds, FTAG);
1730 
1731 out:
1732         if (err != 0) {
1733                 mutex_enter(dcp->dc_error_lock);
1734                 /* only keep first error */
1735                 if (*dcp->dc_error == 0)
1736                         *dcp->dc_error = err;
1737                 mutex_exit(dcp->dc_error_lock);
1738         }
1739 
1740         kmem_free(dcp, sizeof(*dcp));
1741 }
1742 
1743 static void
1744 dmu_objset_find_dp_cb(void *arg)
1745 {
1746         dmu_objset_find_ctx_t *dcp = arg;
1747         dsl_pool_t *dp = dcp->dc_dp;
1748 
1749         dsl_pool_config_enter(dp, FTAG);
1750 
1751         dmu_objset_find_dp_impl(dcp);
1752 
1753         dsl_pool_config_exit(dp, FTAG);
1754 }
1755 
1756 /*
1757  * Find objsets under and including ddobj, call func(ds) on each.
1758  * The order for the enumeration is completely undefined.
1759  * func is called with dsl_pool_config held.
1760  */
1761 int
1762 dmu_objset_find_dp(dsl_pool_t *dp, uint64_t ddobj,
1763     int func(dsl_pool_t *, dsl_dataset_t *, void *), void *arg, int flags)
1764 {
1765         int error = 0;
1766         taskq_t *tq = NULL;
1767         int ntasks;
1768         dmu_objset_find_ctx_t *dcp;
1769         kmutex_t err_lock;
1770 
1771         mutex_init(&err_lock, NULL, MUTEX_DEFAULT, NULL);
1772         dcp = kmem_alloc(sizeof(*dcp), KM_SLEEP);
1773         dcp->dc_tq = NULL;
1774         dcp->dc_dp = dp;
1775         dcp->dc_ddobj = ddobj;
1776         dcp->dc_func = func;
1777         dcp->dc_arg = arg;
1778         dcp->dc_flags = flags;
1779         dcp->dc_error_lock = &err_lock;
1780         dcp->dc_error = &error;
1781 
1782         if ((flags & DS_FIND_SERIALIZE) || dsl_pool_config_held_writer(dp)) {
1783                 /*
1784                  * In case a write lock is held we can't make use of
1785                  * parallelism, as down the stack of the worker threads
1786                  * the lock is asserted via dsl_pool_config_held.
1787                  * In case of a read lock this is solved by getting a read
1788                  * lock in each worker thread, which isn't possible in case
1789                  * of a writer lock. So we fall back to the synchronous path
1790                  * here.
1791                  * In the future it might be possible to get some magic into
1792                  * dsl_pool_config_held in a way that it returns true for
1793                  * the worker threads so that a single lock held from this
1794                  * thread suffices. For now, stay single threaded.
1795                  */
1796                 dmu_objset_find_dp_impl(dcp);
1797 
1798                 return (error);
1799         }
1800 
1801         ntasks = dmu_find_threads;
1802         if (ntasks == 0)
1803                 ntasks = vdev_count_leaves(dp->dp_spa) * 4;
1804         tq = taskq_create("dmu_objset_find", ntasks, minclsyspri, ntasks,
1805             INT_MAX, 0);
1806         if (tq == NULL) {
1807                 kmem_free(dcp, sizeof(*dcp));
1808                 return (SET_ERROR(ENOMEM));
1809         }
1810         dcp->dc_tq = tq;
1811 
1812         /* dcp will be freed by task */
1813         (void) taskq_dispatch(tq, dmu_objset_find_dp_cb, dcp, TQ_SLEEP);
1814 
1815         /*
1816          * PORTING: this code relies on the property of taskq_wait to wait
1817          * until no more tasks are queued and no more tasks are active. As
1818          * we always queue new tasks from within other tasks, task_wait
1819          * reliably waits for the full recursion to finish, even though we
1820          * enqueue new tasks after taskq_wait has been called.
1821          * On platforms other than illumos, taskq_wait may not have this
1822          * property.
1823          */
1824         taskq_wait(tq);
1825         taskq_destroy(tq);
1826         mutex_destroy(&err_lock);
1827 
1828         return (error);
1829 }
1830 
1831 /*
1832  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1833  * The dp_config_rwlock must not be held when this is called, and it
1834  * will not be held when the callback is called.
1835  * Therefore this function should only be used when the pool is not changing
1836  * (e.g. in syncing context), or the callback can deal with the possible races.
1837  */
1838 static int
1839 dmu_objset_find_impl(spa_t *spa, const char *name,
1840     int func(const char *, void *), void *arg, int flags)
1841 {
1842         dsl_dir_t *dd;
1843         dsl_pool_t *dp = spa_get_dsl(spa);
1844         dsl_dataset_t *ds;
1845         zap_cursor_t zc;
1846         zap_attribute_t *attr;
1847         char *child;
1848         uint64_t thisobj;